pnowojski commented on a change in pull request #13465:
URL: https://github.com/apache/flink/pull/13465#discussion_r494275631
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -56,12 +59,13 @@ public static CheckpointedInputGate
createCheckpointedInputGate(
taskIOMetricGroup,
taskName,
mailboxExecutor,
- Arrays.asList(inputGates));
+ new List[]{ Arrays.asList(inputGates) },
Review comment:
? It used to be like that, but now we are submitting two different
lists, so having a vararg for just one of them would be a bit inconsistent.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -114,4 +134,67 @@ protected void createInputProcessor(
operatorChain,
setupNumRecordsInCounter(mainOperator));
}
+
+ @Override
+ public Future<Boolean> triggerCheckpointAsync(
+ CheckpointMetaData metadata,
+ CheckpointOptions options,
+ boolean advanceToEndOfEventTime) {
+
+ CompletableFuture<Boolean> resultFuture = new
CompletableFuture<>();
+ mainMailboxExecutor.execute(
+ () -> {
+ try {
Review comment:
No. This field is used only in the `SourceStreamTask`.
`MetricNames#CHECKPOINT_START_DELAY_TIME` metric is defined in two ways:
1. in `SourceStreamTask` via `latestAsyncCheckpointStartDelayNanos`
2. everywhere else via
`CheckpointBarrierHandler#getCheckpointStartDelayNanos`
`MultipleInputStreamTask` is using the second way.
I've added a comment about that.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
##########
@@ -34,30 +40,75 @@
* unavailable or finished.
*/
@Internal
-public final class StreamTaskSourceInput<T> implements StreamTaskInput<T> {
+public final class StreamTaskSourceInput<T> implements StreamTaskInput<T>,
BlockableInput {
private final SourceOperator<T, ?> operator;
+ private final int inputGateIndex;
+ private final AvailabilityHelper isBlockedAvailability = new
AvailabilityHelper();
+ private final List<InputChannelInfo> inputChannelInfos;
- public StreamTaskSourceInput(SourceOperator<T, ?> operator) {
+ public StreamTaskSourceInput(SourceOperator<T, ?> operator, int
inputGateIndex) {
this.operator = checkNotNull(operator);
+ this.inputGateIndex = inputGateIndex;
+ inputChannelInfos = Collections.singletonList(new
InputChannelInfo(inputGateIndex, 0));
+ isBlockedAvailability.resetAvailable();
}
@Override
public InputStatus emitNext(DataOutput<T> output) throws Exception {
+ if (!isBlockedAvailability.isApproximatelyAvailable()) {
+ // Safe guard
Review comment:
expanded
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -101,10 +101,8 @@ public void processBarrier(CheckpointBarrier barrier,
InputChannelInfo channelIn
allBarriersReceivedFuture = new CompletableFuture<>();
checkpointCoordinator.initCheckpoint(barrierId,
barrier.getCheckpointOptions());
- for (final InputGate gate : inputGates) {
- for (int index = 0, numChannels =
gate.getNumberOfInputChannels(); index < numChannels; index++) {
-
gate.getChannel(index).checkpointStarted(barrier);
- }
+ for (final BlockableInput input : inputs) {
+ input.checkpointStarted(barrier);
Review comment:
Good question. I've added a larger explanation why in
`StreamTaskSourceInput#checkpointStarted`
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -114,4 +134,67 @@ protected void createInputProcessor(
operatorChain,
setupNumRecordsInCounter(mainOperator));
}
+
+ @Override
+ public Future<Boolean> triggerCheckpointAsync(
+ CheckpointMetaData metadata,
+ CheckpointOptions options,
+ boolean advanceToEndOfEventTime) {
+
+ CompletableFuture<Boolean> resultFuture = new
CompletableFuture<>();
+ mainMailboxExecutor.execute(
+ () -> {
+ try {
+
pendingCheckpointCompletedFutures.put(metadata.getCheckpointId(), resultFuture);
+ triggerSourcesCheckpoint(new
CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(),
options));
+ }
+ catch (Exception ex) {
+ // Report the failure both via the
Future result but also to the mailbox
+
pendingCheckpointCompletedFutures.remove(metadata.getCheckpointId());
+ resultFuture.completeExceptionally(ex);
+ throw ex;
+ }
+ },
+ "checkpoint %s with %s",
+ metadata,
+ options);
+ return resultFuture;
+ }
+
+ private void triggerSourcesCheckpoint(CheckpointBarrier
checkpointBarrier) throws IOException {
+ for (StreamTaskSourceInput<?> sourceInput :
operatorChain.getSourceTaskInputs()) {
Review comment:
There is no need to do that. Please check the updated java doc in
`StreamTaskSourceInput#checkpointStarted`.
Runtime has a flexibility to checkpoint the sources at any point of time, as
long as it is in sync with network inputs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]