pnowojski commented on a change in pull request #16589:
URL: https://github.com/apache/flink/pull/16589#discussion_r678949633
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -287,13 +292,13 @@ public DataInputStatus emitNext(DataOutput<OUT> output)
throws Exception {
// short circuit the common case (every invocation except the first)
if (currentMainOutput != null) {
- return
convertToInternalStatus(sourceReader.pollNext(currentMainOutput));
+ return
convertToInternalStatus(readingReader.pollNext(currentMainOutput));
Review comment:
I'm a bit confused by this `readingReader` concept co-existing with the
`sourceReader`.
1. I think if anything either it would be better to wrap the existing
reader `sourceReader = new DrainedSourceReader(sourceReader)` and
proxy/overwrite the relevant calls, but...
2. I think it would seem simpler/cleaner to me, to just return here `return
END_OF_INPUT;` directly, and drop the `DrainedSourceReader` class
2. Has the additional benefit of avoiding presence of another `SourceReader`
instance in the class loader. With just one or two instances JVM is able to
devirtualise and inline `pollNext()` calls, so I would avoid adding
`DrainedSourceReader` just for the sake of this argument.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -114,6 +114,8 @@
/** The source reader that does most of the work. */
private SourceReader<OUT, SplitT> sourceReader;
+ private SourceReader<OUT, SplitT> readingReader;
Review comment:
Missing close in `close()`
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -267,6 +270,8 @@ public void finish() throws Exception {
if (eventTimeLogic != null) {
eventTimeLogic.stopPeriodicWatermarkEmits();
}
+ dataFinished = true;
Review comment:
nit: shouldn't this belong to the previous commit?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -68,6 +68,12 @@
*/
private volatile boolean wasStoppedExternally = false;
+ /**
+ * Indicates whether the source operator has been cancelled for
stop-with-savepoint --drain, in
+ * this case we want to ignore interrupt exceptions thrown when stopping.
+ */
+ private volatile boolean wasDrained = false;
+
Review comment:
what's the difference between `wasStoppedExternally` and `wasDrained`?
According to javadoc first one is for `stop-with-savepoint` with or without
drain, while the latter is only for the drain?
If you really need to add another flag, it's getting a bit too complicated
for my taste with all of those flags: `canceled`, `failing`,
`wasStoppedExternally`, `wasDrained`, `isRunning`.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -119,9 +113,13 @@ protected void finishTask() throws Exception {
}
@Override
- public Future<Boolean> triggerCheckpointAsync(
+ public CompletableFuture<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData, CheckpointOptions
checkpointOptions) {
if (!isExternallyInducedSource) {
+ if (checkpointOptions.getCheckpointType().shouldDrain()) {
+ setSynchronousSavepoint(checkpointMetaData.getCheckpointId(),
true);
+ mainMailboxExecutor.execute(this::endData, "Drain pipeline on
stop-with-savepoint");
+ }
return super.triggerCheckpointAsync(checkpointMetaData,
checkpointOptions);
Review comment:
Is it ok that `endData()` and `triggerCheckpointAsyncXXX()` will be two
different mailbox actions? With a potential that something happens in between
them?
Even if that's ok for now, it would be somehow safer for the future to avoid
such race conditions. Is there an easy way to execute both things inside single
mailbox action?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
##########
@@ -120,9 +121,8 @@ void setNext(StreamOperatorWrapper next) {
* MailboxExecutor#yield()} to take the mails of closing operator and
running timers and run
* them.
*/
- public void finish(StreamTaskActionExecutor actionExecutor, boolean
isStoppingBySyncSavepoint)
- throws Exception {
- if (!isHead && !isStoppingBySyncSavepoint) {
+ public void finish(StreamTaskActionExecutor actionExecutor) throws
Exception {
+ if (!isHead || wrapped instanceof StreamSource) {
Review comment:
`StreamSource` getting `endInput()` call? Do we need that?
If yes, can we place this logic inside `SourceStreamTask` maybe? It will be
easier to clean up the code once we drop legacy sources.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -1277,10 +1291,14 @@ public final ExecutorService
getAsyncOperationsThreadPool() {
private void notifyCheckpointComplete(long checkpointId) throws Exception {
subtaskCheckpointCoordinator.notifyCheckpointComplete(
checkpointId, operatorChain, this::isRunning);
- if (isRunning && isSynchronousSavepointId(checkpointId)) {
- finishTask();
- // Reset to "notify" the internal synchronous savepoint mailbox
loop.
- resetSynchronousSavepointId(checkpointId, true);
+ if (isRunning) {
+ if (isCurrentSavepointWithoutDrain(checkpointId)) {
+ finishTask();
+ // Reset to "notify" the internal synchronous savepoint
mailbox loop.
+ syncSavepointWithoutDrain = null;
+ } else if (isCurrentSavepointWithDrain(checkpointId)) {
+ savepointCompletedFuture.complete(null);
+ }
Review comment:
Do we have a ticket for unifying this behaviour as per offline
discussion?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -249,10 +248,28 @@ private void interruptSourceThread(boolean interrupt) {
// ------------------------------------------------------------------------
@Override
- public Future<Boolean> triggerCheckpointAsync(
+ public CompletableFuture<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData, CheckpointOptions
checkpointOptions) {
if (!externallyInducedCheckpoints) {
- return super.triggerCheckpointAsync(checkpointMetaData,
checkpointOptions);
+ if (checkpointOptions.getCheckpointType().shouldDrain()) {
+ mainMailboxExecutor.execute(
+ () -> {
+
setSynchronousSavepoint(checkpointMetaData.getCheckpointId(), true);
+ wasDrained = true;
+ if (mainOperator != null) {
+ mainOperator.stop();
+ }
+ },
+ "stop legacy source and set synchronous savepoint");
+ return sourceThread
+ .getCompletionFuture()
+ .thenCompose(
+ ignore ->
+ super.triggerCheckpointAsync(
+ checkpointMetaData,
checkpointOptions));
Review comment:
What if the source doesn't complete? I think it would behave a little
bit differently compared to right now?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -486,40 +489,48 @@ protected void endData() throws Exception {
this.endOfDataReceived = true;
}
- private void resetSynchronousSavepointId(long id, boolean succeeded) {
- if (!succeeded && activeSyncSavepointId != null &&
activeSyncSavepointId == id) {
- // allow to process further EndOfPartition events
- activeSyncSavepointId = null;
- operatorChain.setIgnoreEndOfInput(false);
- }
- syncSavepointId = null;
- }
-
- private void setSynchronousSavepointId(long checkpointId, boolean
ignoreEndOfInput) {
+ protected void setSynchronousSavepoint(long checkpointId, boolean isDrain)
{
checkState(
- syncSavepointId == null,
+ syncSavepointWithoutDrain == null
+ && (syncSavepointWithDrain == null
+ || (isDrain && syncSavepointWithDrain ==
checkpointId)),
"at most one stop-with-savepoint checkpoint at a time is
allowed");
- syncSavepointId = checkpointId;
- activeSyncSavepointId = checkpointId;
- operatorChain.setIgnoreEndOfInput(ignoreEndOfInput);
+ if (isDrain) {
+ if (syncSavepointWithDrain == null) {
+ syncSavepointWithDrain = checkpointId;
+ savepointCompletedFuture = new CompletableFuture<>();
+ }
+ } else {
+ syncSavepointWithoutDrain = checkpointId;
+ }
}
@VisibleForTesting
OptionalLong getSynchronousSavepointId() {
- return syncSavepointId != null ? OptionalLong.of(syncSavepointId) :
OptionalLong.empty();
+ if (syncSavepointWithoutDrain != null) {
+ return OptionalLong.of(syncSavepointWithoutDrain);
+ } else if (syncSavepointWithDrain != null) {
+ return OptionalLong.of(syncSavepointWithDrain);
+ } else {
+ return OptionalLong.empty();
+ }
+ }
+
+ private boolean isCurrentSavepointWithDrain(long checkpointId) {
+ return syncSavepointWithDrain != null && syncSavepointWithDrain ==
checkpointId;
Review comment:
nit: do we need the null check in front? If so `Object.equals()`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -281,25 +298,47 @@ protected void declineCheckpoint(long checkpointId) {
public void run() {
try {
mainOperator.run(lock, operatorChain);
- if (!wasStoppedExternally && !isCanceled() && !isFailing()) {
- synchronized (lock) {
- operatorChain.setIgnoreEndOfInput(false);
- }
- CompletableFuture<Void> endOfDataProcessed = new
CompletableFuture<>();
- mainMailboxExecutor.execute(
- () -> {
- endData();
- endOfDataProcessed.complete(null);
- },
- "SourceStreamTask finished processing data.");
-
- // wait until all operators are finished
- endOfDataProcessed.get();
- }
+ completeProcessing();
completionFuture.complete(null);
} catch (Throwable t) {
// Note, t can be also an InterruptedException
- completionFuture.completeExceptionally(t);
+ if (isCanceled()
+ && ExceptionUtils.findThrowable(t,
InterruptedException.class)
+ .isPresent()) {
+ completionFuture.completeExceptionally(new
CancelTaskException(t));
+ } else if (wasDrained
+ && ExceptionUtils.findThrowable(t,
InterruptedException.class)
+ .isPresent()) {
+ // if we are stopping the source thread for
stop-with-savepoint
+ // we may actually return from run with an
InterruptedException which
+ // should be ignored
+ try {
+ // clear the interrupted status for the thread
+ Thread.interrupted();
+ completeProcessing();
+ completionFuture.complete(null);
+ } catch (Throwable e) {
+ completionFuture.completeExceptionally(e);
+ }
Review comment:
Why do we need this special treatment? Is this a workaround of [the
kinesis bug](
https://issues.apache.org/jira/browse/FLINK-23528)?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -281,25 +298,47 @@ protected void declineCheckpoint(long checkpointId) {
public void run() {
try {
mainOperator.run(lock, operatorChain);
- if (!wasStoppedExternally && !isCanceled() && !isFailing()) {
- synchronized (lock) {
- operatorChain.setIgnoreEndOfInput(false);
- }
- CompletableFuture<Void> endOfDataProcessed = new
CompletableFuture<>();
- mainMailboxExecutor.execute(
- () -> {
- endData();
- endOfDataProcessed.complete(null);
- },
- "SourceStreamTask finished processing data.");
-
- // wait until all operators are finished
- endOfDataProcessed.get();
- }
Review comment:
Haven't you introduced this code earlier in this PR? If so, can you
place it inside the `completeProcessing()` method from the beginning without
this refactor in the last commit?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -251,8 +252,10 @@
/** TODO it might be replaced by the global IO executor on TaskManager
level future. */
private final ExecutorService channelIOExecutor;
- private Long syncSavepointId = null;
- private Long activeSyncSavepointId = null;
+ private Long syncSavepointWithoutDrain = null;
+
+ private Long syncSavepointWithDrain = null;
+ private CompletableFuture<Void> savepointCompletedFuture = null;
Review comment:
`syncSavepointWithDrainCompletionFuture`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -249,10 +248,28 @@ private void interruptSourceThread(boolean interrupt) {
// ------------------------------------------------------------------------
@Override
- public Future<Boolean> triggerCheckpointAsync(
+ public CompletableFuture<Boolean> triggerCheckpointAsync(
Review comment:
nit: this method grew a bit too much and I would split it somehow. Maybe
it's good enough to have mailbox action as named function instead of lambda.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -172,14 +178,7 @@ protected void
processInput(MailboxDefaultAction.Controller controller) throws E
.getCompletionFuture()
.whenComplete(
(Void ignore, Throwable sourceThreadThrowable) -> {
- if (isCanceled()
- && ExceptionUtils.findThrowable(
- sourceThreadThrowable,
- InterruptedException.class)
- .isPresent()) {
- mailboxProcessor.reportThrowable(
- new
CancelTaskException(sourceThreadThrowable));
- } else if (!wasStoppedExternally &&
sourceThreadThrowable != null) {
Review comment:
What has happened with this code?
edit: ok, I see you've moved it mostly. I think. So partially this is just a
refactor? Can you extract refactor part as an independent commit?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -249,10 +248,28 @@ private void interruptSourceThread(boolean interrupt) {
// ------------------------------------------------------------------------
@Override
- public Future<Boolean> triggerCheckpointAsync(
+ public CompletableFuture<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData, CheckpointOptions
checkpointOptions) {
if (!externallyInducedCheckpoints) {
- return super.triggerCheckpointAsync(checkpointMetaData,
checkpointOptions);
+ if (checkpointOptions.getCheckpointType().shouldDrain()) {
+ mainMailboxExecutor.execute(
+ () -> {
+
setSynchronousSavepoint(checkpointMetaData.getCheckpointId(), true);
+ wasDrained = true;
+ if (mainOperator != null) {
+ mainOperator.stop();
+ }
+ },
+ "stop legacy source and set synchronous savepoint");
+ return sourceThread
+ .getCompletionFuture()
+ .thenCompose(
Review comment:
the change from `Future` to `CompletableFuture` in
`triggerCheckpointAsync` is because of this compose?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/AbstractCollectResultBuffer.java
##########
@@ -128,6 +128,9 @@ private void addResults(CollectCoordinationResponse
response, long responseOffse
if (!results.isEmpty()) {
// response contains some data, add them to buffer
int addStart = (int) (offset - responseOffset);
+ if (addStart > results.size()) {
+ return;
+ }
Review comment:
missing unit test?
I'm not sure if I understand this fix, maybe unit test will help me
understand?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -281,11 +277,4 @@ public void abortCheckpointOnBarrier(long checkpointId,
CheckpointException caus
}
super.abortCheckpointOnBarrier(checkpointId, cause);
}
-
- @Override
- protected void advanceToEndOfEventTime() throws Exception {
- for (Output<StreamRecord<?>> sourceOutput :
operatorChain.getChainedSourceOutputs()) {
- sourceOutput.emitWatermark(Watermark.MAX_WATERMARK);
- }
- }
Review comment:
?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -148,8 +149,8 @@
* +----> initialize-operator-states()
* +----> open-operators()
* +----> run()
+ * +----> finish-operators()
* +----> close-operators()
- * +----> dispose-operators()
Review comment:
nitty nit: should have been a separate hotfix?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -486,40 +489,48 @@ protected void endData() throws Exception {
this.endOfDataReceived = true;
}
- private void resetSynchronousSavepointId(long id, boolean succeeded) {
- if (!succeeded && activeSyncSavepointId != null &&
activeSyncSavepointId == id) {
- // allow to process further EndOfPartition events
- activeSyncSavepointId = null;
- operatorChain.setIgnoreEndOfInput(false);
- }
- syncSavepointId = null;
- }
-
- private void setSynchronousSavepointId(long checkpointId, boolean
ignoreEndOfInput) {
+ protected void setSynchronousSavepoint(long checkpointId, boolean isDrain)
{
checkState(
- syncSavepointId == null,
+ syncSavepointWithoutDrain == null
+ && (syncSavepointWithDrain == null
+ || (isDrain && syncSavepointWithDrain ==
checkpointId)),
Review comment:
why this `|| (isDrain && syncSavepointWithDrain == checkpointId)),` part?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]