[FLINK-8239][tests] StreamTaskTestHarness supports 2-input head operators

This closes #5153.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1665c12
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1665c12
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1665c12

Branch: refs/heads/master
Commit: c1665c12b49752af2f4d2c624095dcec432efe8e
Parents: 6e89878
Author: zentol <[email protected]>
Authored: Mon Dec 11 15:28:07 2017 +0100
Committer: zentol <[email protected]>
Committed: Tue Dec 12 19:09:18 2017 +0100

----------------------------------------------------------------------
 .../runtime/tasks/StreamTaskTestHarness.java    |  7 +++
 .../tasks/StreamTaskTestHarnessTest.java        | 46 ++++++++++++++++++++
 2 files changed, 53 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c1665c12/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 2700a70..3c8dd0b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
@@ -379,6 +380,12 @@ public class StreamTaskTestHarness<OUT> {
                return new StreamConfigChainer(headOperatorId, headOperator, 
getStreamConfig());
        }
 
+       public StreamConfigChainer setupOperatorChain(OperatorID 
headOperatorId, TwoInputStreamOperator<?, ?, ?> headOperator) {
+               Preconditions.checkState(!setupCalled, "This harness was 
already setup.");
+               setupCalled = true;
+               return new StreamConfigChainer(headOperatorId, headOperator, 
getStreamConfig());
+       }
+
        // 
------------------------------------------------------------------------
 
        private class TaskThread extends Thread {

http://git-wip-us.apache.org/repos/asf/flink/blob/c1665c12/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
index 9fba5f8..249a326 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import org.junit.Assert;
@@ -52,6 +53,12 @@ public class StreamTaskTestHarnessTest {
                } catch (IllegalStateException expected) {
                        // expected
                }
+               try {
+                       harness.setupOperatorChain(new OperatorID(), new 
TwoInputTestOperator());
+                       Assert.fail();
+               } catch (IllegalStateException expected) {
+                       // expected
+               }
 
                harness = new StreamTaskTestHarness<>(new 
OneInputStreamTask<>(), BasicTypeInfo.STRING_TYPE_INFO);
                harness.setupOperatorChain(new OperatorID(), new TestOperator())
@@ -69,6 +76,35 @@ public class StreamTaskTestHarnessTest {
                } catch (IllegalStateException expected) {
                        // expected
                }
+               try {
+                       harness.setupOperatorChain(new OperatorID(), new 
TwoInputTestOperator());
+                       Assert.fail();
+               } catch (IllegalStateException expected) {
+                       // expected
+               }
+
+               harness = new StreamTaskTestHarness<>(new 
TwoInputStreamTask<>(), BasicTypeInfo.STRING_TYPE_INFO);
+               harness.setupOperatorChain(new OperatorID(), new 
TwoInputTestOperator())
+                       .chain(new OperatorID(), new TestOperator(), 
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()));
+
+               try {
+                       harness.setupOutputForSingletonOperatorChain();
+                       Assert.fail();
+               } catch (IllegalStateException expected) {
+                       // expected
+               }
+               try {
+                       harness.setupOperatorChain(new OperatorID(), new 
TestOperator());
+                       Assert.fail();
+               } catch (IllegalStateException expected) {
+                       // expected
+               }
+               try {
+                       harness.setupOperatorChain(new OperatorID(), new 
TwoInputTestOperator());
+                       Assert.fail();
+               } catch (IllegalStateException expected) {
+                       // expected
+               }
        }
 
        private static class TestOperator extends 
AbstractStreamOperator<String> implements OneInputStreamOperator<String, 
String> {
@@ -76,4 +112,14 @@ public class StreamTaskTestHarnessTest {
                public void processElement(StreamRecord<String> element) throws 
Exception {
                }
        }
+
+       private static class TwoInputTestOperator extends 
AbstractStreamOperator<String> implements TwoInputStreamOperator<String, 
String, String> {
+               @Override
+               public void processElement1(StreamRecord<String> element) 
throws Exception {
+               }
+
+               @Override
+               public void processElement2(StreamRecord<String> element) 
throws Exception {
+               }
+       }
 }

Reply via email to