rkhachatryan commented on a change in pull request #13465:
URL: https://github.com/apache/flink/pull/13465#discussion_r494181467
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/AvailabilityProvider.java
##########
@@ -66,6 +66,32 @@ default boolean isApproximatelyAvailable() {
return getAvailableFuture() == AVAILABLE;
}
+ static CompletableFuture<?> and(CompletableFuture<?> first,
CompletableFuture<?> second) {
+ if (first == AVAILABLE) {
+ if (second == AVAILABLE) {
Review comment:
I think less `if` nesting would be more readabile:
```
if (first == AVAILABLE && second == AVAILABLE) return AVAILABLE;
else if (first == AVAILABLE) return second;
else if (second == AVAILABLE) return first;
else return allOf(first, second);
```
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -101,10 +101,8 @@ public void processBarrier(CheckpointBarrier barrier,
InputChannelInfo channelIn
allBarriersReceivedFuture = new CompletableFuture<>();
checkpointCoordinator.initCheckpoint(barrierId,
barrier.getCheckpointOptions());
- for (final InputGate gate : inputGates) {
- for (int index = 0, numChannels =
gate.getNumberOfInputChannels(); index < numChannels; index++) {
-
gate.getChannel(index).checkpointStarted(barrier);
- }
+ for (final BlockableInput input : inputs) {
+ input.checkpointStarted(barrier);
Review comment:
Why do we block source inputs when we receive a barrier from a
non-source input?
(maybe a comment is missing here or for
`StreamTaskSourceInput.checkpointStarted`)
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
##########
@@ -249,25 +259,34 @@ private void sendControlMail(RunnableWithException mail,
String descriptionForma
}
/**
- * This helper method handles all special actions from the mailbox. It
returns true if the mailbox loop should
- * continue running, false if it should stop. In the current design,
this method also evaluates all control flag
- * changes. This keeps the hot path in {@link #runMailboxLoop()} free
from any other flag checking, at the cost
+ * This helper method handles all special actions from the mailbox.
+ * In the current design, this method also evaluates all control flag
changes.
+ * This keeps the hot path in {@link #runMailboxLoop()} free from any
other flag checking, at the cost
* that all flag changes must make sure that the mailbox signals
mailbox#hasMail.
+ *
+ * @return true if a mail has been processed.
*/
- private boolean processMail(TaskMailbox mailbox) throws Exception {
-
+ private boolean processMail(TaskMailbox mailbox, boolean singleStep)
throws Exception {
+ boolean processed = false;
// Doing this check is an optimization to only have a volatile
read in the expected hot path, locks are only
// acquired after this point.
if (!mailbox.createBatch()) {
// We can also directly return true because all changes
to #isMailboxLoopRunning must be connected to
// mailbox.hasMail() == true.
- return true;
+ return processed;
Review comment:
Why not just return false here?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
##########
@@ -34,30 +40,75 @@
* unavailable or finished.
*/
@Internal
-public final class StreamTaskSourceInput<T> implements StreamTaskInput<T> {
+public final class StreamTaskSourceInput<T> implements StreamTaskInput<T>,
BlockableInput {
private final SourceOperator<T, ?> operator;
+ private final int inputGateIndex;
+ private final AvailabilityHelper isBlockedAvailability = new
AvailabilityHelper();
+ private final List<InputChannelInfo> inputChannelInfos;
- public StreamTaskSourceInput(SourceOperator<T, ?> operator) {
+ public StreamTaskSourceInput(SourceOperator<T, ?> operator, int
inputGateIndex) {
this.operator = checkNotNull(operator);
+ this.inputGateIndex = inputGateIndex;
+ inputChannelInfos = Collections.singletonList(new
InputChannelInfo(inputGateIndex, 0));
+ isBlockedAvailability.resetAvailable();
}
@Override
public InputStatus emitNext(DataOutput<T> output) throws Exception {
+ if (!isBlockedAvailability.isApproximatelyAvailable()) {
Review comment:
nit: this is a bit difficult to read (maybe just invert the condition?)
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -56,12 +59,13 @@ public static CheckpointedInputGate
createCheckpointedInputGate(
taskIOMetricGroup,
taskName,
mailboxExecutor,
- Arrays.asList(inputGates));
+ new List[]{ Arrays.asList(inputGates) },
Review comment:
How about using vararg parameter and replacing here with just
`Arrays.asList(inputGates)`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
##########
@@ -249,25 +259,34 @@ private void sendControlMail(RunnableWithException mail,
String descriptionForma
}
/**
- * This helper method handles all special actions from the mailbox. It
returns true if the mailbox loop should
- * continue running, false if it should stop. In the current design,
this method also evaluates all control flag
- * changes. This keeps the hot path in {@link #runMailboxLoop()} free
from any other flag checking, at the cost
+ * This helper method handles all special actions from the mailbox.
+ * In the current design, this method also evaluates all control flag
changes.
+ * This keeps the hot path in {@link #runMailboxLoop()} free from any
other flag checking, at the cost
* that all flag changes must make sure that the mailbox signals
mailbox#hasMail.
+ *
+ * @return true if a mail has been processed.
*/
- private boolean processMail(TaskMailbox mailbox) throws Exception {
-
+ private boolean processMail(TaskMailbox mailbox, boolean singleStep)
throws Exception {
+ boolean processed = false;
// Doing this check is an optimization to only have a volatile
read in the expected hot path, locks are only
// acquired after this point.
if (!mailbox.createBatch()) {
// We can also directly return true because all changes
to #isMailboxLoopRunning must be connected to
// mailbox.hasMail() == true.
- return true;
+ return processed;
}
// Take mails in a non-blockingly and execute them.
Optional<Mail> maybeMail;
while (isMailboxLoopRunning() && (maybeMail =
mailbox.tryTakeFromBatch()).isPresent()) {
maybeMail.get().run();
+ processed = true;
+ if (singleStep) {
+ break;
+ }
+ }
Review comment:
Why not just return `true` here?
And then after the loop `if singleStep == true` we can also return false.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
##########
@@ -34,30 +40,75 @@
* unavailable or finished.
*/
@Internal
-public final class StreamTaskSourceInput<T> implements StreamTaskInput<T> {
+public final class StreamTaskSourceInput<T> implements StreamTaskInput<T>,
BlockableInput {
private final SourceOperator<T, ?> operator;
+ private final int inputGateIndex;
+ private final AvailabilityHelper isBlockedAvailability = new
AvailabilityHelper();
+ private final List<InputChannelInfo> inputChannelInfos;
- public StreamTaskSourceInput(SourceOperator<T, ?> operator) {
+ public StreamTaskSourceInput(SourceOperator<T, ?> operator, int
inputGateIndex) {
this.operator = checkNotNull(operator);
+ this.inputGateIndex = inputGateIndex;
+ inputChannelInfos = Collections.singletonList(new
InputChannelInfo(inputGateIndex, 0));
+ isBlockedAvailability.resetAvailable();
}
@Override
public InputStatus emitNext(DataOutput<T> output) throws Exception {
+ if (!isBlockedAvailability.isApproximatelyAvailable()) {
+ // Safe guard
Review comment:
nit: this comment doesn't say much to me
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -114,4 +134,67 @@ protected void createInputProcessor(
operatorChain,
setupNumRecordsInCounter(mainOperator));
}
+
+ @Override
+ public Future<Boolean> triggerCheckpointAsync(
+ CheckpointMetaData metadata,
+ CheckpointOptions options,
+ boolean advanceToEndOfEventTime) {
+
+ CompletableFuture<Boolean> resultFuture = new
CompletableFuture<>();
+ mainMailboxExecutor.execute(
+ () -> {
+ try {
Review comment:
Shouldn't we also update `super.latestAsyncCheckpointStartDelayNanos`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -18,32 +18,48 @@
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
+import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkState;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
/**
* A {@link StreamTask} for executing a {@link MultipleInputStreamOperator}
and supporting
* the {@link MultipleInputStreamOperator} to select input for reading.
*/
@Internal
public class MultipleInputStreamTask<OUT> extends StreamTask<OUT,
MultipleInputStreamOperator<OUT>> {
+ private final HashMap<Long, CompletableFuture<Boolean>>
pendingCheckpointCompletedFutures = new HashMap<>();
Review comment:
I'm concerned about the cleanup of this map.
From the code, I see it's assumed at least one triggerOnBarrier or
abortOnBarrier after triggerAsync, right?
But can abort come after triggerAsync?
Should we state these ordering assumptions?
Or maybe we can just remove the map? I see the future result is only used by
`SourceStreamTask` which is irrelevant here.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
##########
@@ -178,17 +178,27 @@ public void runMailboxLoop() throws Exception {
final MailboxController defaultActionContext = new
MailboxController(this);
- while (runMailboxStep(localMailbox, defaultActionContext)) {
+ while (isMailboxLoopRunning()) {
Review comment:
This is a change in the production code, so I think it's better to not
mark it as `[test]` in commit message (even though the motivation is to fix
tests).
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -114,4 +134,67 @@ protected void createInputProcessor(
operatorChain,
setupNumRecordsInCounter(mainOperator));
}
+
+ @Override
+ public Future<Boolean> triggerCheckpointAsync(
+ CheckpointMetaData metadata,
+ CheckpointOptions options,
+ boolean advanceToEndOfEventTime) {
+
+ CompletableFuture<Boolean> resultFuture = new
CompletableFuture<>();
+ mainMailboxExecutor.execute(
+ () -> {
+ try {
+
pendingCheckpointCompletedFutures.put(metadata.getCheckpointId(), resultFuture);
+ triggerSourcesCheckpoint(new
CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(),
options));
+ }
+ catch (Exception ex) {
+ // Report the failure both via the
Future result but also to the mailbox
+
pendingCheckpointCompletedFutures.remove(metadata.getCheckpointId());
+ resultFuture.completeExceptionally(ex);
+ throw ex;
+ }
+ },
+ "checkpoint %s with %s",
+ metadata,
+ options);
+ return resultFuture;
+ }
+
+ private void triggerSourcesCheckpoint(CheckpointBarrier
checkpointBarrier) throws IOException {
+ for (StreamTaskSourceInput<?> sourceInput :
operatorChain.getSourceTaskInputs()) {
Review comment:
Shouldn't we differentiate for which `sourceInput` current barrier is
(and call `processBarrier` only for it)?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]