[FLINK-8239][tests] StreamTaskTestHarness supports 2-input head operators This closes #5153.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1665c12 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1665c12 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1665c12 Branch: refs/heads/master Commit: c1665c12b49752af2f4d2c624095dcec432efe8e Parents: 6e89878 Author: zentol <[email protected]> Authored: Mon Dec 11 15:28:07 2017 +0100 Committer: zentol <[email protected]> Committed: Tue Dec 12 19:09:18 2017 +0100 ---------------------------------------------------------------------- .../runtime/tasks/StreamTaskTestHarness.java | 7 +++ .../tasks/StreamTaskTestHarnessTest.java | 46 ++++++++++++++++++++ 2 files changed, 53 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c1665c12/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 2700a70..3c8dd0b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; @@ -379,6 +380,12 @@ public class StreamTaskTestHarness<OUT> { return new StreamConfigChainer(headOperatorId, headOperator, getStreamConfig()); } + public StreamConfigChainer setupOperatorChain(OperatorID headOperatorId, TwoInputStreamOperator<?, ?, ?> headOperator) { + Preconditions.checkState(!setupCalled, "This harness was already setup."); + setupCalled = true; + return new StreamConfigChainer(headOperatorId, headOperator, getStreamConfig()); + } + // ------------------------------------------------------------------------ private class TaskThread extends Thread { http://git-wip-us.apache.org/repos/asf/flink/blob/c1665c12/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java index 9fba5f8..249a326 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.junit.Assert; @@ -52,6 +53,12 @@ public class StreamTaskTestHarnessTest { } catch (IllegalStateException expected) { // expected } + try { + harness.setupOperatorChain(new OperatorID(), new TwoInputTestOperator()); + Assert.fail(); + } catch (IllegalStateException expected) { + // expected + } harness = new StreamTaskTestHarness<>(new OneInputStreamTask<>(), BasicTypeInfo.STRING_TYPE_INFO); harness.setupOperatorChain(new OperatorID(), new TestOperator()) @@ -69,6 +76,35 @@ public class StreamTaskTestHarnessTest { } catch (IllegalStateException expected) { // expected } + try { + harness.setupOperatorChain(new OperatorID(), new TwoInputTestOperator()); + Assert.fail(); + } catch (IllegalStateException expected) { + // expected + } + + harness = new StreamTaskTestHarness<>(new TwoInputStreamTask<>(), BasicTypeInfo.STRING_TYPE_INFO); + harness.setupOperatorChain(new OperatorID(), new TwoInputTestOperator()) + .chain(new OperatorID(), new TestOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())); + + try { + harness.setupOutputForSingletonOperatorChain(); + Assert.fail(); + } catch (IllegalStateException expected) { + // expected + } + try { + harness.setupOperatorChain(new OperatorID(), new TestOperator()); + Assert.fail(); + } catch (IllegalStateException expected) { + // expected + } + try { + harness.setupOperatorChain(new OperatorID(), new TwoInputTestOperator()); + Assert.fail(); + } catch (IllegalStateException expected) { + // expected + } } private static class TestOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> { @@ -76,4 +112,14 @@ public class StreamTaskTestHarnessTest { public void processElement(StreamRecord<String> element) throws Exception { } } + + private static class TwoInputTestOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, String, String> { + @Override + public void processElement1(StreamRecord<String> element) throws Exception { + } + + @Override + public void processElement2(StreamRecord<String> element) throws Exception { + } + } }
