pnowojski commented on a change in pull request #13465:
URL: https://github.com/apache/flink/pull/13465#discussion_r494275631



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -56,12 +59,13 @@ public static CheckpointedInputGate 
createCheckpointedInputGate(
                        taskIOMetricGroup,
                        taskName,
                        mailboxExecutor,
-                       Arrays.asList(inputGates));
+                       new List[]{ Arrays.asList(inputGates) },

Review comment:
       ? It used to be like that, but now we are submitting two different 
lists, so having a vararg for just one of them would be a bit inconsistent.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -114,4 +134,67 @@ protected void createInputProcessor(
                        operatorChain,
                        setupNumRecordsInCounter(mainOperator));
        }
+
+       @Override
+       public Future<Boolean> triggerCheckpointAsync(
+                       CheckpointMetaData metadata,
+                       CheckpointOptions options,
+                       boolean advanceToEndOfEventTime) {
+
+               CompletableFuture<Boolean> resultFuture = new 
CompletableFuture<>();
+               mainMailboxExecutor.execute(
+                       () -> {
+                               try {

Review comment:
       No. This field is used only in the `SourceStreamTask`. 
`MetricNames#CHECKPOINT_START_DELAY_TIME` metric is defined in two ways:
   
   1. in `SourceStreamTask` via `latestAsyncCheckpointStartDelayNanos`
   2. everywhere else via 
`CheckpointBarrierHandler#getCheckpointStartDelayNanos`
   
   `MultipleInputStreamTask` is using the second way.
   
   I've added a comment about that.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
##########
@@ -34,30 +40,75 @@
  * unavailable or finished.
  */
 @Internal
-public final class StreamTaskSourceInput<T> implements StreamTaskInput<T> {
+public final class StreamTaskSourceInput<T> implements StreamTaskInput<T>, 
BlockableInput {
 
        private final SourceOperator<T, ?> operator;
+       private final int inputGateIndex;
+       private final AvailabilityHelper isBlockedAvailability = new 
AvailabilityHelper();
+       private final List<InputChannelInfo> inputChannelInfos;
 
-       public StreamTaskSourceInput(SourceOperator<T, ?> operator) {
+       public StreamTaskSourceInput(SourceOperator<T, ?> operator, int 
inputGateIndex) {
                this.operator = checkNotNull(operator);
+               this.inputGateIndex = inputGateIndex;
+               inputChannelInfos = Collections.singletonList(new 
InputChannelInfo(inputGateIndex, 0));
+               isBlockedAvailability.resetAvailable();
        }
 
        @Override
        public InputStatus emitNext(DataOutput<T> output) throws Exception {
+               if (!isBlockedAvailability.isApproximatelyAvailable()) {
+                       // Safe guard

Review comment:
       expanded

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -101,10 +101,8 @@ public void processBarrier(CheckpointBarrier barrier, 
InputChannelInfo channelIn
                        allBarriersReceivedFuture = new CompletableFuture<>();
                        checkpointCoordinator.initCheckpoint(barrierId, 
barrier.getCheckpointOptions());
 
-                       for (final InputGate gate : inputGates) {
-                               for (int index = 0, numChannels = 
gate.getNumberOfInputChannels(); index < numChannels; index++) {
-                                       
gate.getChannel(index).checkpointStarted(barrier);
-                               }
+                       for (final BlockableInput input : inputs) {
+                               input.checkpointStarted(barrier);

Review comment:
       Good question. I've added a larger explanation why in 
`StreamTaskSourceInput#checkpointStarted`

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -114,4 +134,67 @@ protected void createInputProcessor(
                        operatorChain,
                        setupNumRecordsInCounter(mainOperator));
        }
+
+       @Override
+       public Future<Boolean> triggerCheckpointAsync(
+                       CheckpointMetaData metadata,
+                       CheckpointOptions options,
+                       boolean advanceToEndOfEventTime) {
+
+               CompletableFuture<Boolean> resultFuture = new 
CompletableFuture<>();
+               mainMailboxExecutor.execute(
+                       () -> {
+                               try {
+                                       
pendingCheckpointCompletedFutures.put(metadata.getCheckpointId(), resultFuture);
+                                       triggerSourcesCheckpoint(new 
CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), 
options));
+                               }
+                               catch (Exception ex) {
+                                       // Report the failure both via the 
Future result but also to the mailbox
+                                       
pendingCheckpointCompletedFutures.remove(metadata.getCheckpointId());
+                                       resultFuture.completeExceptionally(ex);
+                                       throw ex;
+                               }
+                       },
+                       "checkpoint %s with %s",
+                       metadata,
+                       options);
+               return resultFuture;
+       }
+
+       private void triggerSourcesCheckpoint(CheckpointBarrier 
checkpointBarrier) throws IOException {
+               for (StreamTaskSourceInput<?> sourceInput : 
operatorChain.getSourceTaskInputs()) {

Review comment:
       There is no need to do that. Please check the updated java doc in 
`StreamTaskSourceInput#checkpointStarted`.
   
   Runtime has a flexibility to checkpoint the sources at any point of time, as 
long as it is in sync with network inputs.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to