pnowojski commented on a change in pull request #13529:
URL: https://github.com/apache/flink/pull/13529#discussion_r501001585



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -260,20 +259,6 @@ private boolean allStreamStatusesAreIdle() {
                return true;
        }
 
-       private static class SourceInputProcessor<T> extends 
StreamOneInputProcessor<T> {

Review comment:
       Nit, change commit title to:
   > [hotfix] Remove SourceInputProcessor in StreamMultipleInputProcessor

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A factory for {@link StreamMultipleInputProcessor}.
+ */
+@Internal
+public class StreamMultipleInputProcessorFactory {
+
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       public static StreamMultipleInputProcessor create(
+                       CheckpointedInputGate[] checkpointedInputGates,
+                       StreamConfig.InputConfig[] configuredInputs,
+                       IOManager ioManager,
+                       TaskIOMetricGroup ioMetricGroup,
+                       Counter mainOperatorRecordsIn,
+                       StreamStatusMaintainer streamStatusMaintainer,
+                       MultipleInputStreamOperator<?> mainOperator,
+                       MultipleInputSelectionHandler inputSelectionHandler,
+                       WatermarkGauge[] inputWatermarkGauges,
+                       OperatorChain<?, ?> operatorChain) {
+               checkNotNull(operatorChain);
+               checkNotNull(inputSelectionHandler);
+
+               List<Input> operatorInputs = mainOperator.getInputs();
+               int inputsCount = operatorInputs.size();
+
+               StreamOneInputProcessor<?>[] inputProcessors = new 
StreamOneInputProcessor[inputsCount];
+               Counter networkRecordsIn = new SimpleCounter();
+               ioMetricGroup.reuseRecordsInputCounter(networkRecordsIn);
+
+               MultiStreamStreamStatusTracker streamStatusTracker = new 
MultiStreamStreamStatusTracker(inputsCount);
+               checkState(
+                       configuredInputs.length == inputsCount,
+                       "Number of configured inputs in StreamConfig [%s] 
doesn't match the main operator's number of inputs [%s]",
+                       configuredInputs.length,
+                       inputsCount);
+               for (int i = 0; i < inputsCount; i++) {
+                       StreamConfig.InputConfig configuredInput = 
configuredInputs[i];
+                       if (configuredInput instanceof 
StreamConfig.NetworkInputConfig) {
+                               StreamConfig.NetworkInputConfig networkInput = 
(StreamConfig.NetworkInputConfig) configuredInput;
+                               StreamTaskNetworkOutput dataOutput = new 
StreamTaskNetworkOutput<>(
+                                       operatorInputs.get(i),
+                                       streamStatusMaintainer,
+                                       inputWatermarkGauges[i],
+                                       inputSelectionHandler, 
streamStatusTracker,
+                                       i,
+                                       mainOperatorRecordsIn,
+                                       networkRecordsIn);
+
+                               inputProcessors[i] = new 
StreamOneInputProcessor(
+                                       new StreamTaskNetworkInput<>(
+                                               
checkpointedInputGates[networkInput.getInputGateIndex()],
+                                               
networkInput.getTypeSerializer(),
+                                               ioManager,
+                                               new 
StatusWatermarkValve(checkpointedInputGates[networkInput.getInputGateIndex()].getNumberOfInputChannels()),
+                                               i),
+                                       dataOutput,
+                                       operatorChain);
+                       }
+                       else if (configuredInput instanceof 
StreamConfig.SourceInputConfig) {
+                               StreamConfig.SourceInputConfig sourceInput = 
(StreamConfig.SourceInputConfig) configuredInput;
+                               Output<StreamRecord<?>> chainedSourceOutput = 
operatorChain.getChainedSourceOutput(sourceInput);
+                               StreamTaskSourceInput<?> sourceTaskInput = 
operatorChain.getSourceTaskInput(sourceInput);
+
+                               inputProcessors[i] = new 
StreamOneInputProcessor(
+                                       sourceTaskInput,
+                                       new 
StreamTaskSourceOutput(chainedSourceOutput, streamStatusMaintainer, 
inputWatermarkGauges[i],
+                                               streamStatusTracker,
+                                               i),
+                                       operatorChain);
+                       }
+                       else {
+                               throw new 
UnsupportedOperationException("Unknown input type: " + configuredInput);
+                       }
+               }
+
+               return new StreamMultipleInputProcessor(
+                       inputSelectionHandler,
+                       inputProcessors
+               );
+       }
+
+
+       /**
+        * Stream status tracker for the inputs. We need to keep track for 
determining when
+        * to forward stream status changes downstream.
+        */
+       private static class MultiStreamStreamStatusTracker {
+               private final StreamStatus[] streamStatuses;
+
+               private MultiStreamStreamStatusTracker(int numberOfInputs) {
+                       this.streamStatuses = new StreamStatus[numberOfInputs];
+                       Arrays.fill(streamStatuses, StreamStatus.ACTIVE);
+               }
+
+               public void setStreamStatus(int index, StreamStatus 
streamStatus) {
+                       streamStatuses[index] = streamStatus;
+               }
+
+               public StreamStatus getStreamStatus(int index) {
+                       return streamStatuses[index];
+               }
+
+               public boolean allStreamStatusesAreIdle() {
+                       for (StreamStatus streamStatus : streamStatuses) {
+                               if (streamStatus.isActive()) {
+                                       return false;
+                               }
+                       }
+                       return true;
+               }
+       }

Review comment:
       Do I understand correctly, that previously this functionality was 
hidden/implemented by non static accesses between 
`StreamMultipleInputProcessor#StreamTaskNetworkOutput` (non static class) and 
`StreamMultipleInputProcessor`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -171,6 +168,10 @@ private int selectNextReadingInputIndex() throws 
IOException {
                updateAvailability();
                checkInputSelectionAgainstIsFinished();
 
+               if (inputSelectionHandler.isInputUnavailable(0) && 
inputSelectionHandler.isInputUnavailable(1)) {
+                       fullCheckAndSetAvailable();
+               }

Review comment:
       Again as explained above, `StreamTwoInputProcessor#updateAvailability()` 
should do the trick (% the starvation check in L181)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/TwoInputSelectionHandler.java
##########
@@ -70,6 +70,10 @@ boolean areAllInputsSelected() {
                return inputSelection.areAllInputsSelected();
        }
 
+       boolean isInputUnavailable(int inputIndex) {

Review comment:
       nit: I would flip the method to `isInputAvailable(...)` and use it with 
`!` operator

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -124,6 +120,7 @@ private int selectFirstReadingInputIndex() throws 
IOException {
        }
 
        private void checkFinished(InputStatus status) throws Exception {
+               updateAvailability();

Review comment:
       why do we need this? The currently processed input should be updated via 
the returned `InputStatus`, no need to check it. If the other input is 
unavailable, and both will become unavailable, we would do the full check via 
waiting on availability future and we update the inputs (via cheap 
`isAproximatelyAvailable()`) once future is completed.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInputs.java
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.io.StreamTaskInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MutableObjectIterator;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.PriorityQueue;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An entry class for creating coupled, sorting inputs. The inputs are sorted 
independently and afterwards
+ * the inputs are being merged in order. It is done, by reporting availability 
only for the input which is
+ * current head for the sorted inputs.
+ */
+public class MultiInputSortingDataInputs<K> {

Review comment:
       Is this class being used anywhere in this commit besides in tests?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/EndOfInputAware.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+
+/**
+ * An interface for {@link OperatorChain} to extract the feature of ending the 
input.
+ *
+ * <p>It's purpose is mainly to make it easier to instantiate {@link 
StreamOneInputProcessor} which needs to
+ * notify the chain that an input has ended.
+ */
+@Internal
+public interface EndOfInputAware {
+
+       /**
+        * Ends the main operator input specified by {@code inputId}).
+        *
+        * @param inputId the input ID starts from 1 which indicates the first 
input.
+        */
+       void endMainOperatorInput(int inputId) throws Exception;
+}

Review comment:
       1. name `MainOperator` doesn't fit here I think. I would rename it to 
`endInput`, but...
   2 why not use `BoundedMultiInput` interface instead? It's basically the same 
thing 😅 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -23,16 +23,12 @@
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;

Review comment:
       re
   > [hotfix] Fix availability handling in TwoInputStreamOperator
   
   0. Can you explain a bit more what's the actual issue (I don't see what is 
this commit fixing)?
   1. Can you create a bug ticket in the jira about about this and change 
commit from hotfix to [FLINK-XYZ]?
   2. Definitely this commit is missing some test. If you will struggle to 
reproduce (`StreamTwoInputProcessor` is tested on the task level in 
`StreamTaskSelectiveReadingTest` and `TwoInputStreamTaskTest`. Note that 
`TwoInputStreamTaskTestHarness` that is used there is in the process of being 
deprecated and:
     a) it would be just nice to implement the test based on 
`StreamTaskMailboxTestHarness`. So far it's mostly used for multiple input 
tasks tests, but it has been used in source tasks and one input tasks tests as 
well (for example `StreamTaskTest#testNotifyCheckpointOnClosedOperator`, so it 
should be easy to use for the two inputs as well.
     b) mailbox version of the test harness might be much much better at 
reproducing a problem that depends on timing as you can execute it step by step
   3. it would be best actually to merge this commit in separate PR to verify 
the performance impact of this bug fix alone (to narrow down suspects in case 
of regression detected after merging)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A factory for {@link StreamMultipleInputProcessor}.
+ */
+@Internal
+public class StreamMultipleInputProcessorFactory {
+
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       public static StreamMultipleInputProcessor create(
+                       CheckpointedInputGate[] checkpointedInputGates,
+                       StreamConfig.InputConfig[] configuredInputs,
+                       IOManager ioManager,
+                       TaskIOMetricGroup ioMetricGroup,
+                       Counter mainOperatorRecordsIn,
+                       StreamStatusMaintainer streamStatusMaintainer,
+                       MultipleInputStreamOperator<?> mainOperator,
+                       MultipleInputSelectionHandler inputSelectionHandler,
+                       WatermarkGauge[] inputWatermarkGauges,
+                       OperatorChain<?, ?> operatorChain) {
+               checkNotNull(operatorChain);
+               checkNotNull(inputSelectionHandler);
+
+               List<Input> operatorInputs = mainOperator.getInputs();
+               int inputsCount = operatorInputs.size();
+
+               StreamOneInputProcessor<?>[] inputProcessors = new 
StreamOneInputProcessor[inputsCount];
+               Counter networkRecordsIn = new SimpleCounter();
+               ioMetricGroup.reuseRecordsInputCounter(networkRecordsIn);
+
+               MultiStreamStreamStatusTracker streamStatusTracker = new 
MultiStreamStreamStatusTracker(inputsCount);
+               checkState(
+                       configuredInputs.length == inputsCount,
+                       "Number of configured inputs in StreamConfig [%s] 
doesn't match the main operator's number of inputs [%s]",
+                       configuredInputs.length,
+                       inputsCount);
+               for (int i = 0; i < inputsCount; i++) {
+                       StreamConfig.InputConfig configuredInput = 
configuredInputs[i];
+                       if (configuredInput instanceof 
StreamConfig.NetworkInputConfig) {
+                               StreamConfig.NetworkInputConfig networkInput = 
(StreamConfig.NetworkInputConfig) configuredInput;
+                               StreamTaskNetworkOutput dataOutput = new 
StreamTaskNetworkOutput<>(
+                                       operatorInputs.get(i),
+                                       streamStatusMaintainer,
+                                       inputWatermarkGauges[i],
+                                       inputSelectionHandler, 
streamStatusTracker,
+                                       i,
+                                       mainOperatorRecordsIn,
+                                       networkRecordsIn);
+
+                               inputProcessors[i] = new 
StreamOneInputProcessor(
+                                       new StreamTaskNetworkInput<>(
+                                               
checkpointedInputGates[networkInput.getInputGateIndex()],
+                                               
networkInput.getTypeSerializer(),
+                                               ioManager,
+                                               new 
StatusWatermarkValve(checkpointedInputGates[networkInput.getInputGateIndex()].getNumberOfInputChannels()),
+                                               i),
+                                       dataOutput,
+                                       operatorChain);
+                       }
+                       else if (configuredInput instanceof 
StreamConfig.SourceInputConfig) {
+                               StreamConfig.SourceInputConfig sourceInput = 
(StreamConfig.SourceInputConfig) configuredInput;
+                               Output<StreamRecord<?>> chainedSourceOutput = 
operatorChain.getChainedSourceOutput(sourceInput);
+                               StreamTaskSourceInput<?> sourceTaskInput = 
operatorChain.getSourceTaskInput(sourceInput);
+
+                               inputProcessors[i] = new 
StreamOneInputProcessor(
+                                       sourceTaskInput,
+                                       new 
StreamTaskSourceOutput(chainedSourceOutput, streamStatusMaintainer, 
inputWatermarkGauges[i],
+                                               streamStatusTracker,
+                                               i),

Review comment:
       nit: formatting

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/SortedInputITCase.java
##########
@@ -52,13 +56,16 @@
 import static org.junit.Assert.assertThat;
 
 /**
- * Longer running IT tests for {@link SortingDataInputTest}. For quicker smoke 
tests see {@link SortingDataInputTest}.
+ * Longer running IT tests for {@link SortingDataInput} and {@link 
MultiInputSortingDataInputs}.
+ *
+ * @see SortingDataInputTest
+ * @see MultiInputSortingDataInputsTest
  */
-public class SortingDataInputITCase {
+public class SortedInputITCase {

Review comment:
       Names are here a bit confusing. `Sorted` vs `Sorting` and "longer 
running" is encoded as `ITCase` vs `Test`.
   
   Maybe:
   `SortedInputITCase` -> `LargeSortedInputITCase` (or 
`LargeSortingInputITCase`)
   
   Also maybe unify `SortingDataInput` vs `SortedInput`, or does it make sense 
as it is?
   

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
##########
@@ -40,38 +48,78 @@
  */
 public class StreamTwoInputProcessorFactory {
        public static <IN1, IN2> StreamTwoInputProcessor<IN1, IN2> create(
+                       AbstractInvokable ownerTask,
                        CheckpointedInputGate[] checkpointedInputGates,
-                       TypeSerializer<IN1> inputSerializer1,
-                       TypeSerializer<IN2> inputSerializer2,
                        IOManager ioManager,
+                       MemoryManager memoryManager,
                        TaskIOMetricGroup taskIOMetricGroup,
                        StreamStatusMaintainer streamStatusMaintainer,
                        TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
                        TwoInputSelectionHandler inputSelectionHandler,
                        WatermarkGauge input1WatermarkGauge,
                        WatermarkGauge input2WatermarkGauge,
                        OperatorChain<?, ?> operatorChain,
+                       StreamConfig streamConfig,
+                       Configuration taskManagerConfig,
+                       Configuration jobConfig,
+                       ExecutionConfig executionConfig,
+                       ClassLoader userClassloader,
                        Counter numRecordsIn) {
 
                checkNotNull(operatorChain);
                checkNotNull(inputSelectionHandler);
+
                StreamStatusTracker statusTracker = new StreamStatusTracker();
                taskIOMetricGroup.reuseRecordsInputCounter(numRecordsIn);
+               TypeSerializer<IN1> typeSerializer1 = 
streamConfig.getTypeSerializerIn(0, userClassloader);
+               StreamTaskInput<IN1> input1 = new StreamTaskNetworkInput<>(
+                       checkpointedInputGates[0],
+                       typeSerializer1,
+                       ioManager,
+                       new 
StatusWatermarkValve(checkpointedInputGates[0].getNumberOfInputChannels()),
+                       0);
+               TypeSerializer<IN2> typeSerializer2 = 
streamConfig.getTypeSerializerIn(1, userClassloader);
+               StreamTaskInput<IN2> input2 = new StreamTaskNetworkInput<>(
+                       checkpointedInputGates[1],
+                       typeSerializer2,
+                       ioManager,
+                       new 
StatusWatermarkValve(checkpointedInputGates[1].getNumberOfInputChannels()),
+                       1);
+
+               if (streamConfig.shouldSortInputs()) {
+                       @SuppressWarnings("unchecked")
+                       MultiInputSortingDataInputs<Object> multiInputs = new 
MultiInputSortingDataInputs<Object>(

Review comment:
       (I remember someone complaining about unchecked warnings 🙈 and `Object`? 
😈  )

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -67,7 +63,8 @@ public StreamTwoInputProcessor(
 
        @Override
        public CompletableFuture<?> getAvailableFuture() {
-               if (inputSelectionHandler.areAllInputsSelected()) {
+               if (inputSelectionHandler.areAllInputsSelected() ||
+                               (inputSelectionHandler.isInputUnavailable(0) && 
!inputSelectionHandler.isInputUnavailable(1))) {

Review comment:
       I don't get this condition. Why first has to be unavailable, while 
second available?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to