This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 434f59f53c8fe5cd47406472cd0ddf9051081f18 Author: Piotr Nowojski <[email protected]> AuthorDate: Wed Jun 26 14:20:48 2019 +0200 [hotfix][task,test] Do not override performDefaultAction in StreamTaskTest --- .../tasks/StreamTaskCancellationBarrierTest.java | 1 + .../streaming/runtime/tasks/StreamTaskTest.java | 52 ++++++++++++++++------ 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java index 21427e0..92cf60b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java @@ -185,6 +185,7 @@ public class StreamTaskCancellationBarrierTest { @Override protected void init() throws Exception { + super.init(); synchronized (lock) { while (running) { lock.wait(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 949a75f..26c6faf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -108,6 +108,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorStateContext; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; +import org.apache.flink.streaming.runtime.io.StreamInputProcessor; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext; @@ -145,6 +146,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning; +import static org.apache.flink.util.Preconditions.checkState; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; @@ -826,11 +828,8 @@ public class StreamTaskTest extends TestLogger { } @Override - protected void init() throws Exception {} - - @Override - protected void performDefaultAction(DefaultActionContext context) throws Exception { - context.allActionsCompleted(); + protected void init() throws Exception { + inputProcessor = new EmptyInputProcessor(); } @Override @@ -1019,7 +1018,6 @@ public class StreamTaskTest extends TestLogger { private static class MockStreamTask extends StreamTask<String, AbstractStreamOperator<String>> { private final OperatorChain<String, AbstractStreamOperator<String>> overrideOperatorChain; - private volatile boolean inputFinished; MockStreamTask(Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { super(env, null, uncaughtExceptionHandler); @@ -1033,20 +1031,47 @@ public class StreamTaskTest extends TestLogger { // here for test purposes. super.operatorChain = this.overrideOperatorChain; super.headOperator = super.operatorChain.getHeadOperator(); + super.inputProcessor = new EmptyInputProcessor(false); + } + + void finishInput() { + checkState(inputProcessor != null, "Tried to finishInput before MockStreamTask was started"); + ((EmptyInputProcessor) inputProcessor).finishInput(); + } + } + + private static class EmptyInputProcessor implements StreamInputProcessor { + private volatile boolean isFinished; + + public EmptyInputProcessor() { + this(true); + } + + public EmptyInputProcessor(boolean startFinished) { + isFinished = startFinished; } @Override - protected void performDefaultAction(DefaultActionContext context) { - if (isCanceled() || inputFinished) { - context.allActionsCompleted(); - } + public boolean processInput() throws Exception { + return false; } @Override - protected void cleanup() throws Exception {} + public void close() throws IOException { + } - void finishInput() { - this.inputFinished = true; + @Override + public boolean isFinished() { + return isFinished; + } + + @Override + public CompletableFuture<?> isAvailable() { + return AVAILABLE; + } + + public void finishInput() { + isFinished = true; } } @@ -1262,6 +1287,7 @@ public class StreamTaskTest extends TestLogger { @Override protected void init() throws Exception { + super.init(); getProcessingTimeService().registerTimer(0, new ProcessingTimeCallback() { @Override public void onProcessingTime(long timestamp) throws Exception {
