pnowojski commented on a change in pull request #16589:
URL: https://github.com/apache/flink/pull/16589#discussion_r678383894



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/TwoInputSelectionHandler.java
##########
@@ -44,18 +49,27 @@ public TwoInputSelectionHandler(@Nullable InputSelectable 
inputSelectable) {
 
     void nextSelection() {
         if (inputSelectable == null) {
-            inputSelection = InputSelection.ALL;
+            selectedInputsMask = (int) InputSelection.ALL.getInputMask();
+        } else if (dataFinishedButNotPartition != 0) {
+            selectedInputsMask =
+                    ((int) inputSelectable.nextSelection().getInputMask()
+                            | dataFinishedButNotPartition);
         } else {
-            inputSelection = inputSelectable.nextSelection();
+            selectedInputsMask = (int) 
inputSelectable.nextSelection().getInputMask();
         }
     }
 
+    public boolean allInputsReceivedEndOfData() {
+        return (dataFinishedButNotPartition | inputsFinishedMask) == 3;
+    }
+
     int selectNextInputIndex(int lastReadInputIndex) {
-        return inputSelection.fairSelectNextIndexOutOf2(availableInputsMask, 
lastReadInputIndex);
+        return InputSelection.fairSelectNextIndexOutOf2(
+                selectedInputsMask, availableInputsMask, lastReadInputIndex);

Review comment:
       why are you un-encapsulating `selectedInputsMask` and inline'ing 
`InputSelection`? 
   
   First of all, currently in your proposal all of the production use cases of 
`InputSelection.fariSelectNextIndex` are static, so assuming we want to go this 
direction, you should make `InputSelection` stateless.
   
   But why can not we keep this encapsulation? Why not add 
`dataFinishedButNotPartition` into the `InputSelection` as for example 
`enforcedInputSelection`/`overriddenInputSelection`?
   
   edit:
   Ok, I see. `InputSelection` is `@PublicEvolving`. I think we need to clean 
this up. `@PublicEvolving` part of the `InputSelection` should be heavily 
limited. For example `fairSelectNextIndexXXX` shouldn't be part of it. Probably 
only `ALL`/`FIRST`/`SECOND` and builder should stay. Rest should be moved to 
another wrapper class. (`XXXInputSelectionHandler` might be a good choice after 
all).

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -203,8 +221,11 @@ private void updateAvailability() {
 
     private void updateAvailability(
             DataInputStatus status, StreamOneInputProcessor<?> input, int 
inputIdx) {
-        if (status == DataInputStatus.MORE_AVAILABLE
-                || (status != DataInputStatus.END_OF_INPUT && 
input.isApproximatelyAvailable())) {
+        if (status == DataInputStatus.END_OF_DATA) {
+            inputSelectionHandler.setDataFinishedOnInput(inputIdx);
+        } else if (status == DataInputStatus.END_OF_INPUT) {
+            inputSelectionHandler.setEndOfPartition(inputIdx);
+        } else if (status == DataInputStatus.MORE_AVAILABLE || 
input.isApproximatelyAvailable()) {

Review comment:
       You are potentially changing order of evaluation of the conditions, 
pushing down `MORE_AVAILALBE` hot path.
   
   Again, I'm not certain of that, but I wouldn't personally risk that (given 
instabilities of our two/multi input benchmarks). Performance regression of 
couple % would be virtually impossible to track down.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
##########
@@ -151,7 +152,11 @@ protected DataInputStatus processEvent(BufferOrEvent 
bufferOrEvent) {
         final AbstractEvent event = bufferOrEvent.getEvent();
         // TODO: with checkpointedInputGate.isFinished() we might not need to 
support any events on
         // this level.
-        if (event.getClass() == EndOfPartitionEvent.class) {
+        if (event.getClass() == EndOfData.class) {
+            if (checkpointedInputGate.hasReceivedEndOfData()) {
+                return DataInputStatus.END_OF_DATA;
+            }

Review comment:
       Why is this check on this level and not inside the input gate? If for 
nothing else, just for the sake of consistency with `EndOfPartitionEvent`? But 
it also sounds wrong. We are receiving `EndOfData` but not really.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -712,21 +740,15 @@ protected void afterInvoke() throws Exception {
         LOG.debug("Finished task {}", getName());
         getCompletionFuture().exceptionally(unused -> null).join();
 
-        final CompletableFuture<Void> timersFinishedFuture = new 
CompletableFuture<>();
-
-        // close all operators in a chain effect way
-        operatorChain.finishOperators(actionExecutor);
-

Review comment:
       You are going to conflict with 
https://github.com/apache/flink/pull/16556/commits/1655ec94600fcc7bc9ae4722e5783041784510dd
 (not yet merged, but soon part of https://github.com/apache/flink/pull/16556)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -84,8 +84,9 @@ public DataInputStatus processInput() throws Exception {
 
         lastReadInputIndex = readingInputIndex;
         DataInputStatus inputStatus = 
inputProcessors[readingInputIndex].processInput();
+        inputSelectionHandler.updateStatus(inputStatus, readingInputIndex);
         inputSelectionHandler.nextSelection();
-        return inputSelectionHandler.updateStatus(inputStatus, 
readingInputIndex);
+        return inputSelectionHandler.calculateOverallStatus(inputStatus);

Review comment:
       1. Why did you have to reverse `updateStatus()`/`nextSelection()` order?
   2. I hope that JIT will be clever enough to inline and optimise this code 
for `MORE_AVAILABLE` case (I don't know that. Maybe 1. will prevent that).
   3. Can not we keep the `MORE_AVAILABLE` hot path as it was before? I'm not 
feeling confident if that's an issue or not, but I intentionally made 
`updateStatus()` the way it was, to ensure `MORE_AVAILABLE` path is as simple 
as possible.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -463,6 +475,22 @@ protected void 
processInput(MailboxDefaultAction.Controller controller) throws E
                         new 
ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
     }
 
+    protected void endData() throws Exception {
+        // Suspend the mailbox processor, it would be resumed in afterInvoke 
and finished
+        // after all records processed by the downstream tasks. We also 
suspend the default

Review comment:
       On a second thought. Are really disabling default action and suspending 
anything?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -463,6 +475,22 @@ protected void 
processInput(MailboxDefaultAction.Controller controller) throws E
                         new 
ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
     }
 
+    protected void endData() throws Exception {
+        // Suspend the mailbox processor, it would be resumed in afterInvoke 
and finished
+        // after all records processed by the downstream tasks. We also 
suspend the default

Review comment:
       nitty nit:
   `after all records processed by the downstream tasks` ->
   `after all records are/will be processed by the downstream tasks`

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/MultipleInputSelectionHandler.java
##########
@@ -101,20 +114,26 @@ public DataInputStatus calculateOverallStatus() throws 
IOException {
     }
 
     void nextSelection() {
-        if (inputSelectable == null) {
-            inputSelection = InputSelection.ALL;
+        if (inputSelectable == null || areAllDataInputsFinished()) {
+            selectedInputsMask = InputSelection.ALL.getInputMask();
+        } else if (dataFinishedButNotPartition != 0) {
+            selectedInputsMask =
+                    (inputSelectable.nextSelection().getInputMask() | 
dataFinishedButNotPartition)
+                            & allSelectedMask;
         } else {
-            inputSelection = inputSelectable.nextSelection();
+            selectedInputsMask = 
inputSelectable.nextSelection().getInputMask();

Review comment:
       This is adding quite a bit extra checks on the hot path. I have a 
feeling we could streamline it by adding some explicit state transition enum, 
especially on the `inputSelectable != null` path. `NORMAL` -> 
`NOT_ALL_DATA_INPUTS_FINISHED` -> `ALL_DATA_INPUTS_FINISHED`?. We would still 
have a switch case instead of single if, but it should keep single check for 
`inputSelectable == null` hot path, and just add a single extra check for 
`NORMAL` 
   (pseudo code)
   ```
   switch (...) { 
     case inputSelectable==null:
       foo();
     case NORMAL:
       bar();
     (...)
   }
   ```
   I would guess those are two simple if checks, while currently for `NORMAL` 
you have to evaluate the following condition:
   ```
   !(inputSelectable == null || areAllDataInputsFinished()) && 
dataFinishedButNotPartition == 0
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to