pnowojski commented on a change in pull request #13465:
URL: https://github.com/apache/flink/pull/13465#discussion_r494259177
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
##########
@@ -249,25 +259,34 @@ private void sendControlMail(RunnableWithException mail,
String descriptionForma
}
/**
- * This helper method handles all special actions from the mailbox. It
returns true if the mailbox loop should
- * continue running, false if it should stop. In the current design,
this method also evaluates all control flag
- * changes. This keeps the hot path in {@link #runMailboxLoop()} free
from any other flag checking, at the cost
+ * This helper method handles all special actions from the mailbox.
+ * In the current design, this method also evaluates all control flag
changes.
+ * This keeps the hot path in {@link #runMailboxLoop()} free from any
other flag checking, at the cost
* that all flag changes must make sure that the mailbox signals
mailbox#hasMail.
+ *
+ * @return true if a mail has been processed.
*/
- private boolean processMail(TaskMailbox mailbox) throws Exception {
-
+ private boolean processMail(TaskMailbox mailbox, boolean singleStep)
throws Exception {
+ boolean processed = false;
// Doing this check is an optimization to only have a volatile
read in the expected hot path, locks are only
// acquired after this point.
if (!mailbox.createBatch()) {
// We can also directly return true because all changes
to #isMailboxLoopRunning must be connected to
// mailbox.hasMail() == true.
- return true;
+ return processed;
Review comment:
I wanted to make it consistent, but I don't care much one way or
another. Changed to `return false`
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
##########
@@ -249,25 +259,34 @@ private void sendControlMail(RunnableWithException mail,
String descriptionForma
}
/**
- * This helper method handles all special actions from the mailbox. It
returns true if the mailbox loop should
- * continue running, false if it should stop. In the current design,
this method also evaluates all control flag
- * changes. This keeps the hot path in {@link #runMailboxLoop()} free
from any other flag checking, at the cost
+ * This helper method handles all special actions from the mailbox.
+ * In the current design, this method also evaluates all control flag
changes.
+ * This keeps the hot path in {@link #runMailboxLoop()} free from any
other flag checking, at the cost
* that all flag changes must make sure that the mailbox signals
mailbox#hasMail.
+ *
+ * @return true if a mail has been processed.
*/
- private boolean processMail(TaskMailbox mailbox) throws Exception {
-
+ private boolean processMail(TaskMailbox mailbox, boolean singleStep)
throws Exception {
+ boolean processed = false;
// Doing this check is an optimization to only have a volatile
read in the expected hot path, locks are only
// acquired after this point.
if (!mailbox.createBatch()) {
// We can also directly return true because all changes
to #isMailboxLoopRunning must be connected to
// mailbox.hasMail() == true.
- return true;
+ return processed;
}
// Take mails in a non-blockingly and execute them.
Optional<Mail> maybeMail;
while (isMailboxLoopRunning() && (maybeMail =
mailbox.tryTakeFromBatch()).isPresent()) {
maybeMail.get().run();
+ processed = true;
+ if (singleStep) {
+ break;
+ }
+ }
Review comment:
Here actually I think single one less exit point would be easier to
understand
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -56,12 +59,13 @@ public static CheckpointedInputGate
createCheckpointedInputGate(
taskIOMetricGroup,
taskName,
mailboxExecutor,
- Arrays.asList(inputGates));
+ new List[]{ Arrays.asList(inputGates) },
Review comment:
? It used to be like that, but now we are submitting two different
lists, so having a vararg for just one of them would be a bit inconsistent.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -114,4 +134,67 @@ protected void createInputProcessor(
operatorChain,
setupNumRecordsInCounter(mainOperator));
}
+
+ @Override
+ public Future<Boolean> triggerCheckpointAsync(
+ CheckpointMetaData metadata,
+ CheckpointOptions options,
+ boolean advanceToEndOfEventTime) {
+
+ CompletableFuture<Boolean> resultFuture = new
CompletableFuture<>();
+ mainMailboxExecutor.execute(
+ () -> {
+ try {
Review comment:
No. This field is used only in the `SourceStreamTask`.
`MetricNames#CHECKPOINT_START_DELAY_TIME` metric is defined in two ways:
1. in `SourceStreamTask` via `latestAsyncCheckpointStartDelayNanos`
2. everywhere else via
`CheckpointBarrierHandler#getCheckpointStartDelayNanos`
`MultipleInputStreamTask` is using the second way.
I've added a comment about that.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
##########
@@ -34,30 +40,75 @@
* unavailable or finished.
*/
@Internal
-public final class StreamTaskSourceInput<T> implements StreamTaskInput<T> {
+public final class StreamTaskSourceInput<T> implements StreamTaskInput<T>,
BlockableInput {
private final SourceOperator<T, ?> operator;
+ private final int inputGateIndex;
+ private final AvailabilityHelper isBlockedAvailability = new
AvailabilityHelper();
+ private final List<InputChannelInfo> inputChannelInfos;
- public StreamTaskSourceInput(SourceOperator<T, ?> operator) {
+ public StreamTaskSourceInput(SourceOperator<T, ?> operator, int
inputGateIndex) {
this.operator = checkNotNull(operator);
+ this.inputGateIndex = inputGateIndex;
+ inputChannelInfos = Collections.singletonList(new
InputChannelInfo(inputGateIndex, 0));
+ isBlockedAvailability.resetAvailable();
}
@Override
public InputStatus emitNext(DataOutput<T> output) throws Exception {
+ if (!isBlockedAvailability.isApproximatelyAvailable()) {
+ // Safe guard
Review comment:
expanded
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -101,10 +101,8 @@ public void processBarrier(CheckpointBarrier barrier,
InputChannelInfo channelIn
allBarriersReceivedFuture = new CompletableFuture<>();
checkpointCoordinator.initCheckpoint(barrierId,
barrier.getCheckpointOptions());
- for (final InputGate gate : inputGates) {
- for (int index = 0, numChannels =
gate.getNumberOfInputChannels(); index < numChannels; index++) {
-
gate.getChannel(index).checkpointStarted(barrier);
- }
+ for (final BlockableInput input : inputs) {
+ input.checkpointStarted(barrier);
Review comment:
Good question. I've added a larger explanation why in
`StreamTaskSourceInput#checkpointStarted`
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -114,4 +134,67 @@ protected void createInputProcessor(
operatorChain,
setupNumRecordsInCounter(mainOperator));
}
+
+ @Override
+ public Future<Boolean> triggerCheckpointAsync(
+ CheckpointMetaData metadata,
+ CheckpointOptions options,
+ boolean advanceToEndOfEventTime) {
+
+ CompletableFuture<Boolean> resultFuture = new
CompletableFuture<>();
+ mainMailboxExecutor.execute(
+ () -> {
+ try {
+
pendingCheckpointCompletedFutures.put(metadata.getCheckpointId(), resultFuture);
+ triggerSourcesCheckpoint(new
CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(),
options));
+ }
+ catch (Exception ex) {
+ // Report the failure both via the
Future result but also to the mailbox
+
pendingCheckpointCompletedFutures.remove(metadata.getCheckpointId());
+ resultFuture.completeExceptionally(ex);
+ throw ex;
+ }
+ },
+ "checkpoint %s with %s",
+ metadata,
+ options);
+ return resultFuture;
+ }
+
+ private void triggerSourcesCheckpoint(CheckpointBarrier
checkpointBarrier) throws IOException {
+ for (StreamTaskSourceInput<?> sourceInput :
operatorChain.getSourceTaskInputs()) {
Review comment:
There is no need to do that. Please check the updated java doc in
`StreamTaskSourceInput#checkpointStarted`.
Runtime has a flexibility to checkpoint the sources at any point of time, as
long as it is in sync with network inputs.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -18,32 +18,48 @@
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
+import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkState;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
/**
* A {@link StreamTask} for executing a {@link MultipleInputStreamOperator}
and supporting
* the {@link MultipleInputStreamOperator} to select input for reading.
*/
@Internal
public class MultipleInputStreamTask<OUT> extends StreamTask<OUT,
MultipleInputStreamOperator<OUT>> {
+ private final HashMap<Long, CompletableFuture<Boolean>>
pendingCheckpointCompletedFutures = new HashMap<>();
Review comment:
It’s even worse. The future is currently used only for tests…
I’ve tried to refactor it in someway to avoid it for the
MultipleInputStreamTask, but it turned out more difficult then implementing
this Map.
And yes, currently `triggerOnBarrier` happens always after
`triggerCheckpointAsync`.
I also reasoned that if we accumulated some garbage in case of some problem
with a cancellation/checkpoint failure that I might have missed, it shouldn't
be a big deal, as long it's rare? But maybe we can cap it's size to 100 *
`CheckpointConfig#getMaxConcurrentCheckpoints`?
However it might be better to get rid of the future somehow...
What do you think? Do you have some idea?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -56,12 +59,13 @@ public static CheckpointedInputGate
createCheckpointedInputGate(
taskIOMetricGroup,
taskName,
mailboxExecutor,
- Arrays.asList(inputGates));
+ new List[]{ Arrays.asList(inputGates) },
Review comment:
`inputGates` probably should be migrated to `List` as well at some point
of time. I don't like using varargs if there are others lists/arrays/collection
like arguments :(
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -18,32 +18,48 @@
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
+import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkState;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
/**
* A {@link StreamTask} for executing a {@link MultipleInputStreamOperator}
and supporting
* the {@link MultipleInputStreamOperator} to select input for reading.
*/
@Internal
public class MultipleInputStreamTask<OUT> extends StreamTask<OUT,
MultipleInputStreamOperator<OUT>> {
+ private final HashMap<Long, CompletableFuture<Boolean>>
pendingCheckpointCompletedFutures = new HashMap<>();
Review comment:
As discussed offline, while it's not perfect, it's currently easiest to
go on with with the current pending map with a capped size.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -18,32 +18,51 @@
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
+import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkState;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
/**
* A {@link StreamTask} for executing a {@link MultipleInputStreamOperator}
and supporting
* the {@link MultipleInputStreamOperator} to select input for reading.
*/
@Internal
public class MultipleInputStreamTask<OUT> extends StreamTask<OUT,
MultipleInputStreamOperator<OUT>> {
+ public static final int MAX_TRACKED_CHECKPOINTS = 100_000;
Review comment:
It can
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -114,4 +137,81 @@ protected void createInputProcessor(
operatorChain,
setupNumRecordsInCounter(mainOperator));
}
+
+ @Override
+ public Future<Boolean> triggerCheckpointAsync(
+ CheckpointMetaData metadata,
+ CheckpointOptions options,
+ boolean advanceToEndOfEventTime) {
+
+ CompletableFuture<Boolean> resultFuture = new
CompletableFuture<>();
+ mainMailboxExecutor.execute(
+ () -> {
+ try {
+ /**
+ * Contrary to {@link
SourceStreamTask}, we are not using here
+ * {@link
StreamTask#latestAsyncCheckpointStartDelayNanos} to measure the start delay
+ * metric, but we will be using {@link
CheckpointBarrierHandler#getCheckpointStartDelayNanos()}
+ * instead.
+ */
+
pendingCheckpointCompletedFutures.put(metadata.getCheckpointId(), resultFuture);
+
checkPendingCheckpointCompletedFuturesSize();
+ triggerSourcesCheckpoint(new
CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(),
options));
+ }
+ catch (Exception ex) {
+ // Report the failure both via the
Future result but also to the mailbox
+
pendingCheckpointCompletedFutures.remove(metadata.getCheckpointId());
+ resultFuture.completeExceptionally(ex);
+ throw ex;
+ }
+ },
+ "checkpoint %s with %s",
+ metadata,
+ options);
+ return resultFuture;
+ }
+
+ private void checkPendingCheckpointCompletedFuturesSize() {
+ while (pendingCheckpointCompletedFutures.size() >
MAX_TRACKED_CHECKPOINTS) {
+ Long minCheckpointID =
Collections.min(pendingCheckpointCompletedFutures.keySet());
Review comment:
Ehhhh... ok, but that's overengineering a bit :)
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -114,4 +137,81 @@ protected void createInputProcessor(
operatorChain,
setupNumRecordsInCounter(mainOperator));
}
+
+ @Override
+ public Future<Boolean> triggerCheckpointAsync(
+ CheckpointMetaData metadata,
+ CheckpointOptions options,
+ boolean advanceToEndOfEventTime) {
+
+ CompletableFuture<Boolean> resultFuture = new
CompletableFuture<>();
+ mainMailboxExecutor.execute(
+ () -> {
+ try {
+ /**
+ * Contrary to {@link
SourceStreamTask}, we are not using here
+ * {@link
StreamTask#latestAsyncCheckpointStartDelayNanos} to measure the start delay
+ * metric, but we will be using {@link
CheckpointBarrierHandler#getCheckpointStartDelayNanos()}
+ * instead.
+ */
+
pendingCheckpointCompletedFutures.put(metadata.getCheckpointId(), resultFuture);
+
checkPendingCheckpointCompletedFuturesSize();
+ triggerSourcesCheckpoint(new
CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(),
options));
+ }
+ catch (Exception ex) {
+ // Report the failure both via the
Future result but also to the mailbox
+
pendingCheckpointCompletedFutures.remove(metadata.getCheckpointId());
+ resultFuture.completeExceptionally(ex);
+ throw ex;
+ }
+ },
+ "checkpoint %s with %s",
+ metadata,
+ options);
+ return resultFuture;
+ }
+
+ private void checkPendingCheckpointCompletedFuturesSize() {
+ while (pendingCheckpointCompletedFutures.size() >
MAX_TRACKED_CHECKPOINTS) {
+ Long minCheckpointID =
Collections.min(pendingCheckpointCompletedFutures.keySet());
Review comment:
Ehhhh... ok, but that's overengineering a bit
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BlockableInput.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Input, with just basic methods for blocking and resuming consumption. It
can be for example an {@link InputGate}
+ */
+@Internal
+public interface BlockableInput {
Review comment:
There is added later on `blockConsumption` method, but yes,
`CheckpointableInput` is a better name.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BlockableInput.java
##########
@@ -26,9 +26,12 @@
/**
* Input, with just basic methods for blocking and resuming consumption. It
can be for example an {@link InputGate}
+ * or a chained source.
*/
@Internal
public interface BlockableInput {
+ void blockConsumption(int inputChannelIdx);
+
void resumeConsumption(int channelIndex) throws IOException;
Review comment:
As discussed offline, we decided to keep it as it is for now.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]