pnowojski commented on a change in pull request #16589:
URL: https://github.com/apache/flink/pull/16589#discussion_r679975199
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -281,15 +320,50 @@ protected void declineCheckpoint(long checkpointId) {
public void run() {
try {
mainOperator.run(lock, operatorChain);
- if (!wasStoppedExternally && !isCanceled()) {
- synchronized (lock) {
- operatorChain.setIgnoreEndOfInput(false);
- }
- }
+ completeProcessing();
completionFuture.complete(null);
} catch (Throwable t) {
// Note, t can be also an InterruptedException
- completionFuture.completeExceptionally(t);
+ if (isCanceled()
+ && ExceptionUtils.findThrowable(t,
InterruptedException.class)
+ .isPresent()) {
+ completionFuture.completeExceptionally(new
CancelTaskException(t));
+ } else if (finishingReason ==
FinishingReason.STOP_WITH_SAVEPOINT_DRAIN
+ && ExceptionUtils.findThrowable(t,
InterruptedException.class)
+ .isPresent()) {
+ // if we are stopping the source thread for
stop-with-savepoint
+ // we may actually return from run with an
InterruptedException which
+ // should be ignored (e.g. Kinesis case see FLINK-23528)
+ try {
+ // clear the interrupted status for the thread
+ Thread.interrupted();
+ completeProcessing();
+ completionFuture.complete(null);
+ } catch (Throwable e) {
+ completionFuture.completeExceptionally(e);
+ }
+ } else if (finishingReason ==
FinishingReason.STOP_WITH_SAVEPOINT_NO_DRAIN) {
+ // swallow all exceptions if the source was stopped
without drain
+ completionFuture.complete(null);
+ } else {
+ completionFuture.completeExceptionally(t);
+ }
+ }
+ }
+
+ private void completeProcessing() throws InterruptedException,
ExecutionException {
+ if (finishingReason.shouldCallFinish() && !isCanceled() &&
!isFailing()) {
Review comment:
Can we unify `finishingReason` with `isCanceled()` or `isFailing()`? Or
you would like to avoid doing this in this PR?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -281,15 +320,50 @@ protected void declineCheckpoint(long checkpointId) {
public void run() {
try {
mainOperator.run(lock, operatorChain);
- if (!wasStoppedExternally && !isCanceled()) {
- synchronized (lock) {
- operatorChain.setIgnoreEndOfInput(false);
- }
- }
+ completeProcessing();
completionFuture.complete(null);
} catch (Throwable t) {
// Note, t can be also an InterruptedException
- completionFuture.completeExceptionally(t);
+ if (isCanceled()
+ && ExceptionUtils.findThrowable(t,
InterruptedException.class)
+ .isPresent()) {
+ completionFuture.completeExceptionally(new
CancelTaskException(t));
+ } else if (finishingReason ==
FinishingReason.STOP_WITH_SAVEPOINT_DRAIN
+ && ExceptionUtils.findThrowable(t,
InterruptedException.class)
+ .isPresent()) {
+ // if we are stopping the source thread for
stop-with-savepoint
+ // we may actually return from run with an
InterruptedException which
+ // should be ignored (e.g. Kinesis case see FLINK-23528)
+ try {
+ // clear the interrupted status for the thread
+ Thread.interrupted();
+ completeProcessing();
+ completionFuture.complete(null);
+ } catch (Throwable e) {
+ completionFuture.completeExceptionally(e);
+ }
+ } else if (finishingReason ==
FinishingReason.STOP_WITH_SAVEPOINT_NO_DRAIN) {
+ // swallow all exceptions if the source was stopped
without drain
+ completionFuture.complete(null);
+ } else {
+ completionFuture.completeExceptionally(t);
+ }
+ }
+ }
+
+ private void completeProcessing() throws InterruptedException,
ExecutionException {
+ if (finishingReason.shouldCallFinish() && !isCanceled() &&
!isFailing()) {
+ CompletableFuture<Void> endOfDataConsumed = new
CompletableFuture<>();
+ mainMailboxExecutor.execute(
+ () -> {
+ // theoretically the StreamSource can implement
BoundedOneInput, so we
+ // need to call it here
+ operatorChain.endInput(1);
+ endData();
+ endOfDataConsumed.complete(null);
+ },
+ "SourceStreamTask finished processing data.");
+ endOfDataConsumed.get();
Review comment:
`mainMailboxExecutor.submit().get()`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -281,15 +320,50 @@ protected void declineCheckpoint(long checkpointId) {
public void run() {
try {
mainOperator.run(lock, operatorChain);
- if (!wasStoppedExternally && !isCanceled()) {
- synchronized (lock) {
- operatorChain.setIgnoreEndOfInput(false);
- }
- }
+ completeProcessing();
completionFuture.complete(null);
} catch (Throwable t) {
// Note, t can be also an InterruptedException
- completionFuture.completeExceptionally(t);
+ if (isCanceled()
+ && ExceptionUtils.findThrowable(t,
InterruptedException.class)
+ .isPresent()) {
+ completionFuture.completeExceptionally(new
CancelTaskException(t));
+ } else if (finishingReason ==
FinishingReason.STOP_WITH_SAVEPOINT_DRAIN
+ && ExceptionUtils.findThrowable(t,
InterruptedException.class)
+ .isPresent()) {
+ // if we are stopping the source thread for
stop-with-savepoint
+ // we may actually return from run with an
InterruptedException which
+ // should be ignored (e.g. Kinesis case see FLINK-23528)
+ try {
+ // clear the interrupted status for the thread
+ Thread.interrupted();
+ completeProcessing();
+ completionFuture.complete(null);
+ } catch (Throwable e) {
+ completionFuture.completeExceptionally(e);
+ }
Review comment:
Do we need to support handle this case here? Can not we just ignore it
and treat it as bug?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -261,6 +277,29 @@ private void interruptSourceThread(boolean interrupt) {
}
}
+ private CompletableFuture<Boolean> triggerStopWithSavepointWithDrain(
Review comment:
nit: `triggerStopWithSavepointWithDrainAsync`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -472,40 +486,60 @@ protected void
processInput(MailboxDefaultAction.Controller controller) throws E
new
ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
}
- private void resetSynchronousSavepointId(long id, boolean succeeded) {
- if (!succeeded && activeSyncSavepointId != null &&
activeSyncSavepointId == id) {
- // allow to process further EndOfPartition events
- activeSyncSavepointId = null;
- operatorChain.setIgnoreEndOfInput(false);
+ protected void endData() throws Exception {
+ advanceToEndOfEventTime();
+ // finish all operators in a chain effect way
+ operatorChain.finishOperators(actionExecutor);
+
+ for (ResultPartitionWriter partitionWriter :
getEnvironment().getAllWriters()) {
+ partitionWriter.notifyEndOfData();
}
- syncSavepointId = null;
+
+ this.endOfDataReceived = true;
}
Review comment:
You are calling asynchronously enqueuing `endData()` in a couple of
different places. I have a feeling that if there is a race condition between
executing mailbox action and (source?) operator finishing on it's own it will
result in calling `endData()` twice and that this method doesn't handle it well?
I think I also do not like that we are calling `endInput()` in many places.
Why don't we have a common code path for calling `endInput()` when task
finishes, and let's just have a different ways how task can finish? In this
regard previous behaviour in `afterInvoke()` I think was a better one.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -486,40 +489,48 @@ protected void endData() throws Exception {
this.endOfDataReceived = true;
}
- private void resetSynchronousSavepointId(long id, boolean succeeded) {
- if (!succeeded && activeSyncSavepointId != null &&
activeSyncSavepointId == id) {
- // allow to process further EndOfPartition events
- activeSyncSavepointId = null;
- operatorChain.setIgnoreEndOfInput(false);
- }
- syncSavepointId = null;
- }
-
- private void setSynchronousSavepointId(long checkpointId, boolean
ignoreEndOfInput) {
+ protected void setSynchronousSavepoint(long checkpointId, boolean isDrain)
{
checkState(
- syncSavepointId == null,
+ syncSavepointWithoutDrain == null
+ && (syncSavepointWithDrain == null
+ || (isDrain && syncSavepointWithDrain ==
checkpointId)),
"at most one stop-with-savepoint checkpoint at a time is
allowed");
- syncSavepointId = checkpointId;
- activeSyncSavepointId = checkpointId;
- operatorChain.setIgnoreEndOfInput(ignoreEndOfInput);
+ if (isDrain) {
+ if (syncSavepointWithDrain == null) {
+ syncSavepointWithDrain = checkpointId;
+ savepointCompletedFuture = new CompletableFuture<>();
+ }
+ } else {
+ syncSavepointWithoutDrain = checkpointId;
+ }
}
@VisibleForTesting
OptionalLong getSynchronousSavepointId() {
- return syncSavepointId != null ? OptionalLong.of(syncSavepointId) :
OptionalLong.empty();
+ if (syncSavepointWithoutDrain != null) {
+ return OptionalLong.of(syncSavepointWithoutDrain);
+ } else if (syncSavepointWithDrain != null) {
+ return OptionalLong.of(syncSavepointWithDrain);
+ } else {
+ return OptionalLong.empty();
+ }
+ }
+
+ private boolean isCurrentSavepointWithDrain(long checkpointId) {
+ return syncSavepointWithDrain != null && syncSavepointWithDrain ==
checkpointId;
Review comment:
As you prefer
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]