wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r792294286
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -55,17 +77,24 @@ public StreamMultipleInputProcessor(
|| inputSelectionHandler.areAllInputsFinished()) {
return AVAILABLE;
}
- final CompletableFuture<?> anyInputAvailable = new
CompletableFuture<>();
for (int i = 0; i < inputProcessors.length; i++) {
if (!inputSelectionHandler.isInputFinished(i)
- && inputSelectionHandler.isInputSelected(i)) {
- assertNoException(
- inputProcessors[i]
- .getAvailableFuture()
- .thenRun(() ->
anyInputAvailable.complete(null)));
+ && inputSelectionHandler.isInputSelected(i)
+ && inputProcessors[i].getAvailableFuture() == AVAILABLE) {
+ return AVAILABLE;
Review comment:
Maybe this shortcut is useless. I will remove it.
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -101,10 +101,15 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;
Review comment:
Since this issue is related to overwhelming objects creation and holding
a reference to completed future objects.
So, testing this behavior is hard to achieve without mockito.
Any good suggestion for this? I'm ok to remove mockito if we can verify no
extra future objects created on idle input.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
}
}
}
+
+ /** Visible for testing only. Do not use out side
StreamMultipleInputProcessor. */
+ @VisibleForTesting
+ public static class MultipleInputAvailabilityHelper {
+ private final CompletableFuture<?>[] cachedAvailableFutures;
+ private final Consumer[] onCompletion;
+ private CompletableFuture<?> availableFuture;
+
+ public CompletableFuture<?> getAvailableFuture() {
+ return availableFuture;
+ }
+
+ public static MultipleInputAvailabilityHelper newInstance(int
inputSize) {
+ MultipleInputAvailabilityHelper obj = new
MultipleInputAvailabilityHelper(inputSize);
+ return obj;
+ }
+
+ private MultipleInputAvailabilityHelper(int inputSize) {
+ this.cachedAvailableFutures = new CompletableFuture[inputSize];
+ this.onCompletion = new Consumer[inputSize];
+ }
+
+ @VisibleForTesting
+ public void init() {
+ for (int i = 0; i < cachedAvailableFutures.length; i++) {
+ final int inputIdx = i;
+ onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+ }
+ }
+
+ public boolean isInvalid() {
+ return availableFuture == null || availableFuture.isDone();
+ }
+
+ public void resetToUnavailable() {
+ availableFuture = new CompletableFuture<>();
+ }
+
+ private void notifyCompletion(int idx) {
+ if (availableFuture != null && !availableFuture.isDone()) {
+ availableFuture.complete(null);
+ }
+ cachedAvailableFutures[idx] = AVAILABLE;
+ }
Review comment:
I confirmed that the benchmark of MultipleInputStream got blocked during
test.
This may be related to the competing between `notifyCompletion` calls and
`resetToUnavailable` calls.
I'm working on a solution.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -47,6 +49,26 @@ public StreamMultipleInputProcessor(
StreamOneInputProcessor<?>[] inputProcessors) {
this.inputSelectionHandler = inputSelectionHandler;
this.inputProcessors = inputProcessors;
+ this.availabilityHelper =
+
MultipleInputAvailabilityHelper.newInstance(inputProcessors.length);
+ this.availabilityHelper.init();
+ }
+
+ @Override
+ public boolean isAvailable() {
+ if (inputSelectionHandler.isAnyInputAvailable()
+ || inputSelectionHandler.areAllInputsFinished()) {
+ return true;
+ } else {
+ boolean isAvailable = false;
+ for (int i = 0; i < inputProcessors.length; i++) {
+ isAvailable =
+ !inputSelectionHandler.isInputFinished(i)
+ && inputSelectionHandler.isInputSelected(i)
+ && inputProcessors[i].isAvailable();
+ }
+ return isAvailable;
Review comment:
I see you point. The `availableInputsMask` inside
`inputSelectionHandler` and the `future` both indicate that wheher the
corresponding input is available or not. I will make some modification.
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -101,10 +101,15 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;
Review comment:
I see.
You mean just remove this unit test? Since, the internal private
implementation details of `StreamMultipleInputProcessor` can not be tested.
Can I understand it that way?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -55,17 +77,24 @@ public StreamMultipleInputProcessor(
|| inputSelectionHandler.areAllInputsFinished()) {
return AVAILABLE;
}
- final CompletableFuture<?> anyInputAvailable = new
CompletableFuture<>();
for (int i = 0; i < inputProcessors.length; i++) {
if (!inputSelectionHandler.isInputFinished(i)
- && inputSelectionHandler.isInputSelected(i)) {
- assertNoException(
- inputProcessors[i]
- .getAvailableFuture()
- .thenRun(() ->
anyInputAvailable.complete(null)));
+ && inputSelectionHandler.isInputSelected(i)
+ && inputProcessors[i].getAvailableFuture() == AVAILABLE) {
+ return AVAILABLE;
Review comment:
Yeah, you were right.
The shortcut for-loop is redundant with
`inputSelectionHandler.isAnyInputAvailable()`.
I will make a change.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -47,6 +49,26 @@ public StreamMultipleInputProcessor(
StreamOneInputProcessor<?>[] inputProcessors) {
this.inputSelectionHandler = inputSelectionHandler;
this.inputProcessors = inputProcessors;
+ this.availabilityHelper =
+
MultipleInputAvailabilityHelper.newInstance(inputProcessors.length);
+ this.availabilityHelper.init();
+ }
+
+ @Override
+ public boolean isAvailable() {
+ if (inputSelectionHandler.isAnyInputAvailable()
+ || inputSelectionHandler.areAllInputsFinished()) {
+ return true;
+ } else {
+ boolean isAvailable = false;
+ for (int i = 0; i < inputProcessors.length; i++) {
+ isAvailable =
+ !inputSelectionHandler.isInputFinished(i)
+ && inputSelectionHandler.isInputSelected(i)
+ && inputProcessors[i].isAvailable();
+ }
+ return isAvailable;
Review comment:
The original code does not override the `isAvailable` method from
`AvailabilityProvider` which will call the `getAvailableFuture` first and
determing the availability status according to the status of future. the
`getAvailableFuture` creates a new instance each time some one calls it. So,
duplicate code to avoid calling `getAvailableFuture` inorder to reducing memory
footprints.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
}
}
}
+
+ /** Visible for testing only. Do not use out side
StreamMultipleInputProcessor. */
+ @VisibleForTesting
+ public static class MultipleInputAvailabilityHelper {
+ private final CompletableFuture<?>[] cachedAvailableFutures;
+ private final Consumer[] onCompletion;
+ private CompletableFuture<?> availableFuture;
+
+ public CompletableFuture<?> getAvailableFuture() {
+ return availableFuture;
+ }
+
+ public static MultipleInputAvailabilityHelper newInstance(int
inputSize) {
+ MultipleInputAvailabilityHelper obj = new
MultipleInputAvailabilityHelper(inputSize);
+ return obj;
+ }
+
+ private MultipleInputAvailabilityHelper(int inputSize) {
+ this.cachedAvailableFutures = new CompletableFuture[inputSize];
+ this.onCompletion = new Consumer[inputSize];
+ }
+
+ @VisibleForTesting
+ public void init() {
+ for (int i = 0; i < cachedAvailableFutures.length; i++) {
+ final int inputIdx = i;
+ onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+ }
+ }
+
+ public boolean isInvalid() {
+ return availableFuture == null || availableFuture.isDone();
+ }
+
+ public void resetToUnavailable() {
+ availableFuture = new CompletableFuture<>();
+ }
+
+ private void notifyCompletion(int idx) {
+ if (availableFuture != null && !availableFuture.isDone()) {
+ availableFuture.complete(null);
+ }
+ cachedAvailableFutures[idx] = AVAILABLE;
+ }
Review comment:
I think the competing will cause duplicate future complete calls. Do not
know how it will get stucked.
I will check these benchmarks, and try to find what cause these deadlocks.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -47,6 +49,26 @@ public StreamMultipleInputProcessor(
StreamOneInputProcessor<?>[] inputProcessors) {
this.inputSelectionHandler = inputSelectionHandler;
this.inputProcessors = inputProcessors;
+ this.availabilityHelper =
+
MultipleInputAvailabilityHelper.newInstance(inputProcessors.length);
+ this.availabilityHelper.init();
+ }
+
+ @Override
+ public boolean isAvailable() {
+ if (inputSelectionHandler.isAnyInputAvailable()
+ || inputSelectionHandler.areAllInputsFinished()) {
+ return true;
+ } else {
+ boolean isAvailable = false;
+ for (int i = 0; i < inputProcessors.length; i++) {
+ isAvailable =
+ !inputSelectionHandler.isInputFinished(i)
+ && inputSelectionHandler.isInputSelected(i)
+ && inputProcessors[i].isAvailable();
+ }
+ return isAvailable;
Review comment:
This implementation may not be optimal. I may not fully understand the
difference between the `inputSelectionHandler.isAnyInputAvailable` and the
`for` loop bellow.
This paragraph is borrowed from the original code, replacing the `thenRun`
part, with aggregating the result of `inputProcessors[i].isAvailable()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]