This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new bf16a7d [FLINK-20079][task] Initialize operator chain before upstream
partition request
bf16a7d is described below
commit bf16a7dd2a2786847abd440c69ab8ade59853a1d
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu Oct 29 17:51:49 2020 +0100
[FLINK-20079][task] Initialize operator chain before upstream partition
request
---
.../flink/streaming/runtime/tasks/StreamTask.java | 37 ++++++++++------------
.../checkpointing/UnalignedCheckpointITCase.java | 11 +++++--
2 files changed, 25 insertions(+), 23 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index c33772e..0eef744 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -499,32 +499,29 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
// we need to make sure that any triggers scheduled in open()
cannot be
// executed before all operators are opened
actionExecutor.runThrowing(() -> {
- // both the following operations are protected by the
lock
- // so that we avoid race conditions in the case that
initializeState()
- // registers a timer, that fires before the open() is
called.
- readRecoveredChannelState(); // WARN: should be done
before operatorChain.initializeStateAndOpenOperators (see FLINK-19907)
+
+ SequentialChannelStateReader reader =
getEnvironment().getTaskStateManager().getSequentialChannelStateReader();
+ reader.readOutputData(getEnvironment().getAllWriters(),
!configuration.isGraphContainingLoops());
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
- });
- isRunning = true;
- }
+ channelIOExecutor.execute(() -> {
+ try {
+
reader.readInputData(getEnvironment().getAllInputGates());
+ } catch (Exception e) {
+
asyncExceptionHandler.handleAsyncException("Unable to read channel state", e);
+ }
+ });
- private void readRecoveredChannelState() throws IOException,
InterruptedException {
- SequentialChannelStateReader reader =
getEnvironment().getTaskStateManager().getSequentialChannelStateReader();
- reader.readOutputData(getEnvironment().getAllWriters(),
!configuration.isGraphContainingLoops());
- channelIOExecutor.execute(() -> {
- try {
-
reader.readInputData(getEnvironment().getAllInputGates());
- } catch (Exception e) {
-
asyncExceptionHandler.handleAsyncException("Unable to read channel state", e);
+ for (InputGate inputGate :
getEnvironment().getAllInputGates()) {
+ inputGate
+ .getStateConsumedFuture()
+ .thenRun(() ->
mainMailboxExecutor.execute(inputGate::requestPartitions, "Input gate request
partitions"));
}
+
});
- for (InputGate inputGate : getEnvironment().getAllInputGates())
{
- inputGate
- .getStateConsumedFuture()
- .thenRun(() ->
mainMailboxExecutor.execute(inputGate::requestPartitions, "Input gate request
partitions"));
- }
+
+ isRunning = true;
}
@Override
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index 5a036db..239243c 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -80,6 +80,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -211,7 +212,7 @@ public class UnalignedCheckpointITCase extends TestLogger {
final LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(parallelism, conf);
env.enableCheckpointing(100);
- env.getCheckpointConfig().setAlignmentTimeout(0);
+ env.getCheckpointConfig().setAlignmentTimeout(1);
env.setParallelism(parallelism);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(EXPECTED_FAILURES,
Time.milliseconds(100)));
env.getCheckpointConfig().enableUnalignedCheckpoints(true);
@@ -462,6 +463,7 @@ public class UnalignedCheckpointITCase extends TestLogger {
private ListState<State> stateList;
private State state;
private final long minCheckpoints;
+ private Random random = new Random();
private VerifyingSink(long minCheckpoints) {
this.minCheckpoints = minCheckpoints;
@@ -470,6 +472,7 @@ public class UnalignedCheckpointITCase extends TestLogger {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
+ random = new Random();
getRuntimeContext().addAccumulator(NUM_OUTPUTS,
numOutputCounter);
getRuntimeContext().addAccumulator(NUM_OUT_OF_ORDER,
outOfOrderCounter);
getRuntimeContext().addAccumulator(NUM_DUPLICATES,
duplicatesCounter);
@@ -531,8 +534,10 @@ public class UnalignedCheckpointITCase extends TestLogger {
state.numOutput++;
if (state.completedCheckpoints < minCheckpoints) {
- // induce heavy backpressure until enough
checkpoints have been written
- Thread.sleep(0, 100_000);
+ // induce backpressure until enough checkpoints
have been written
+ if (random.nextInt(1000) == 42) {
+ Thread.sleep(1);
+ }
}
// after all checkpoints have been completed, the
remaining data should be flushed out fairly quickly
}