mxm commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r945871802


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   The above removed lines can be re-added if we  make sure to renew 
`availabilityFuture` after each completion, e.g. add the following after line 
242:
   
   ```
                               availabilityFuture = new CompletableFuture<>();
   ``` 
   
   So this becomes:
   
   ```java
           currentReader
                   .isAvailable()
                   .whenComplete(
                           (result, ex) -> {
                               if (ex == null) {
                                   availabilityFuture.complete(result);
                               } else {
                                   availabilityFuture.completeExceptionally(ex);
                               }
                               availabilityFuture = new CompletableFuture<>();
                           });
   ```



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -133,7 +132,17 @@ public void notifyCheckpointAborted(long checkpointId) 
throws Exception {
 
     @Override
     public CompletableFuture<Void> isAvailable() {
-        return availabilityFuture;
+        availabilityHelper.resetToUnAvailable();
+        if (currentReader == null) {
+            return (CompletableFuture<Void>) 
availabilityHelper.getAvailableFuture();
+        } else {
+            Preconditions.checkArgument(
+                    availabilityHelper.getSize() == 1,
+                    "Availability helper is out of sync for current reader: 
%s",
+                    currentReader);
+            availabilityHelper.anyOf(0, currentReader.isAvailable());
+            return (CompletableFuture<Void>) 
availabilityHelper.getAvailableFuture();
+        }

Review Comment:
   ```suggestion
   ```



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });
+        completeAndResetAvailabilityHelper();

Review Comment:
   ```suggestion
   ```



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -133,7 +132,17 @@ public void notifyCheckpointAborted(long checkpointId) 
throws Exception {
 
     @Override
     public CompletableFuture<Void> isAvailable() {
-        return availabilityFuture;
+        availabilityHelper.resetToUnAvailable();
+        if (currentReader == null) {
+            return (CompletableFuture<Void>) 
availabilityHelper.getAvailableFuture();
+        } else {
+            Preconditions.checkArgument(
+                    availabilityHelper.getSize() == 1,
+                    "Availability helper is out of sync for current reader: 
%s",
+                    currentReader);
+            availabilityHelper.anyOf(0, currentReader.isAvailable());
+            return (CompletableFuture<Void>) 
availabilityHelper.getAvailableFuture();
+        }

Review Comment:
   Simply return `availabilityFuture`.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -58,8 +60,9 @@
     private int currentSourceIndex = -1;
     private boolean isFinalSource;
     private SourceReader<T, ? extends SourceSplit> currentReader;
-    private CompletableFuture<Void> availabilityFuture = new 
CompletableFuture<>();
     private List<HybridSourceSplit> restoredSplits = new ArrayList<>();
+    private MultipleFuturesAvailabilityHelper availabilityHelper =
+            new MultipleFuturesAvailabilityHelper(0);

Review Comment:
   I think we can keep using `CompletableFuture` directly with a few 
modifications. Please see the other diffs for how I think this could be done.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   Note that this works also in case of switch events because the reader will 
be closed then which will complete its availability future.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });
+        completeAndResetAvailabilityHelper();

Review Comment:
   This logic won't be required anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to