[FLINK-8238][tests] Forbid multiple setups of StreamTaskTestHarness
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e898781 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e898781 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e898781 Branch: refs/heads/master Commit: 6e89878166c0b8b8193ae69a675b6d085ffa9fe7 Parents: 5a545db Author: zentol <[email protected]> Authored: Mon Dec 11 15:26:55 2017 +0100 Committer: zentol <[email protected]> Committed: Tue Dec 12 19:09:17 2017 +0100 ---------------------------------------------------------------------- .../runtime/tasks/StreamTaskTestHarness.java | 7 ++ .../tasks/StreamTaskTestHarnessTest.java | 79 ++++++++++++++++++++ 2 files changed, 86 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6e898781/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 1187d66..2700a70 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.util.Preconditions; import org.junit.Assert; @@ -89,6 +90,8 @@ public class StreamTaskTestHarness<OUT> { protected int numInputGates; protected int numInputChannelsPerGate; + private boolean setupCalled = false; + @SuppressWarnings("rawtypes") protected StreamTestSingleInputGate[] inputGates; @@ -134,6 +137,8 @@ public class StreamTaskTestHarness<OUT> { * please manually configure the stream config. */ public void setupOutputForSingletonOperatorChain() { + Preconditions.checkState(!setupCalled, "This harness was already setup."); + setupCalled = true; streamConfig.setChainStart(); streamConfig.setBufferTimeout(0); streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime); @@ -369,6 +374,8 @@ public class StreamTaskTestHarness<OUT> { } public StreamConfigChainer setupOperatorChain(OperatorID headOperatorId, OneInputStreamOperator<?, ?> headOperator) { + Preconditions.checkState(!setupCalled, "This harness was already setup."); + setupCalled = true; return new StreamConfigChainer(headOperatorId, headOperator, getStreamConfig()); } http://git-wip-us.apache.org/repos/asf/flink/blob/6e898781/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java new file mode 100644 index 0000000..9fba5f8 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for the {@link StreamTaskTestHarness}. + */ +public class StreamTaskTestHarnessTest { + + @Test + public void testMultipleSetupsThrowsException() { + StreamTaskTestHarness<String> harness; + + harness = new StreamTaskTestHarness<>(new OneInputStreamTask<>(), BasicTypeInfo.STRING_TYPE_INFO); + harness.setupOutputForSingletonOperatorChain(); + + try { + harness.setupOutputForSingletonOperatorChain(); + Assert.fail(); + } catch (IllegalStateException expected) { + // expected + } + try { + harness.setupOperatorChain(new OperatorID(), new TestOperator()); + Assert.fail(); + } catch (IllegalStateException expected) { + // expected + } + + harness = new StreamTaskTestHarness<>(new OneInputStreamTask<>(), BasicTypeInfo.STRING_TYPE_INFO); + harness.setupOperatorChain(new OperatorID(), new TestOperator()) + .chain(new OperatorID(), new TestOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())); + + try { + harness.setupOutputForSingletonOperatorChain(); + Assert.fail(); + } catch (IllegalStateException expected) { + // expected + } + try { + harness.setupOperatorChain(new OperatorID(), new TestOperator()); + Assert.fail(); + } catch (IllegalStateException expected) { + // expected + } + } + + private static class TestOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> { + @Override + public void processElement(StreamRecord<String> element) throws Exception { + } + } +}
