mxm commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r945871802
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
reader.start();
currentSourceIndex = index;
currentReader = reader;
- currentReader
- .isAvailable()
- .whenComplete(
- (result, ex) -> {
- if (ex == null) {
- availabilityFuture.complete(result);
- } else {
- availabilityFuture.completeExceptionally(ex);
- }
- });
Review Comment:
The above removed lines can be re-added if we make sure to renew
`availabilityFuture` after each completion, e.g. add the following after line
242:
```
availabilityFuture = new CompletableFuture<>();
```
So this becomes:
```java
currentReader
.isAvailable()
.whenComplete(
(result, ex) -> {
if (ex == null) {
availabilityFuture.complete(result);
} else {
availabilityFuture.completeExceptionally(ex);
}
availabilityFuture = new CompletableFuture<>();
});
```
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -133,7 +132,17 @@ public void notifyCheckpointAborted(long checkpointId)
throws Exception {
@Override
public CompletableFuture<Void> isAvailable() {
- return availabilityFuture;
+ availabilityHelper.resetToUnAvailable();
+ if (currentReader == null) {
+ return (CompletableFuture<Void>)
availabilityHelper.getAvailableFuture();
+ } else {
+ Preconditions.checkArgument(
+ availabilityHelper.getSize() == 1,
+ "Availability helper is out of sync for current reader:
%s",
+ currentReader);
+ availabilityHelper.anyOf(0, currentReader.isAvailable());
+ return (CompletableFuture<Void>)
availabilityHelper.getAvailableFuture();
+ }
Review Comment:
```suggestion
```
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
reader.start();
currentSourceIndex = index;
currentReader = reader;
- currentReader
- .isAvailable()
- .whenComplete(
- (result, ex) -> {
- if (ex == null) {
- availabilityFuture.complete(result);
- } else {
- availabilityFuture.completeExceptionally(ex);
- }
- });
+ completeAndResetAvailabilityHelper();
Review Comment:
```suggestion
```
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -133,7 +132,17 @@ public void notifyCheckpointAborted(long checkpointId)
throws Exception {
@Override
public CompletableFuture<Void> isAvailable() {
- return availabilityFuture;
+ availabilityHelper.resetToUnAvailable();
+ if (currentReader == null) {
+ return (CompletableFuture<Void>)
availabilityHelper.getAvailableFuture();
+ } else {
+ Preconditions.checkArgument(
+ availabilityHelper.getSize() == 1,
+ "Availability helper is out of sync for current reader:
%s",
+ currentReader);
+ availabilityHelper.anyOf(0, currentReader.isAvailable());
+ return (CompletableFuture<Void>)
availabilityHelper.getAvailableFuture();
+ }
Review Comment:
Simply return `availabilityFuture`.
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -58,8 +60,9 @@
private int currentSourceIndex = -1;
private boolean isFinalSource;
private SourceReader<T, ? extends SourceSplit> currentReader;
- private CompletableFuture<Void> availabilityFuture = new
CompletableFuture<>();
private List<HybridSourceSplit> restoredSplits = new ArrayList<>();
+ private MultipleFuturesAvailabilityHelper availabilityHelper =
+ new MultipleFuturesAvailabilityHelper(0);
Review Comment:
I think we can keep using `CompletableFuture` directly with a few
modifications. Please see the other diffs for how I think this could be done.
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
reader.start();
currentSourceIndex = index;
currentReader = reader;
- currentReader
- .isAvailable()
- .whenComplete(
- (result, ex) -> {
- if (ex == null) {
- availabilityFuture.complete(result);
- } else {
- availabilityFuture.completeExceptionally(ex);
- }
- });
Review Comment:
Note that this works also in case of switch events because the reader will
be closed then which will complete its availability future.
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
reader.start();
currentSourceIndex = index;
currentReader = reader;
- currentReader
- .isAvailable()
- .whenComplete(
- (result, ex) -> {
- if (ex == null) {
- availabilityFuture.complete(result);
- } else {
- availabilityFuture.completeExceptionally(ex);
- }
- });
+ completeAndResetAvailabilityHelper();
Review Comment:
This logic won't be required anymore.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]