[FLINK-4877] Refactor Operator TestHarnesses to use Common Base Class This also introduces KeyedTwoInputStreamOperatorTestHarness which is similar to KeyedOneInputStreamOperatorTestHarness
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f305baab Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f305baab Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f305baab Branch: refs/heads/master Commit: f305baabbcd9b035b696b8b7a4334deaf1b28d23 Parents: 0859a69 Author: Aljoscha Krettek <[email protected]> Authored: Thu Sep 29 16:04:29 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Oct 21 19:03:05 2016 +0200 ---------------------------------------------------------------------- .../runtime/streamrecord/StreamRecord.java | 2 +- .../util/AbstractStreamOperatorTestHarness.java | 366 +++++++++++++++++++ .../KeyedOneInputStreamOperatorTestHarness.java | 18 +- .../KeyedTwoInputStreamOperatorTestHarness.java | 144 ++++++++ .../util/OneInputStreamOperatorTestHarness.java | 328 +---------------- .../util/TwoInputStreamOperatorTestHarness.java | 130 +------ 6 files changed, 537 insertions(+), 451 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java index 9f75161..da606a9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java @@ -171,7 +171,7 @@ public final class StreamRecord<T> extends StreamElement { else if (o != null && getClass() == o.getClass()) { StreamRecord<?> that = (StreamRecord<?>) o; return this.hasTimestamp == that.hasTimestamp && - this.timestamp == that.timestamp && + (!this.hasTimestamp || this.timestamp == that.timestamp) && (this.value == null ? that.value == null : this.value.equals(that.value)); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java new file mode 100644 index 0000000..a61d995 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.ClosableRegistry; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.OperatorSnapshotResult; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +/** + * Base class for {@code AbstractStreamOperator} test harnesses. + */ +public class AbstractStreamOperatorTestHarness<OUT> { + + public static final int MAX_PARALLELISM = 10; + + final protected StreamOperator<OUT> operator; + + final protected ConcurrentLinkedQueue<Object> outputList; + + final protected StreamConfig config; + + final protected ExecutionConfig executionConfig; + + final protected TestProcessingTimeService processingTimeService; + + final protected StreamTask<?, ?> mockTask; + + ClosableRegistry closableRegistry; + + // use this as default for tests + protected AbstractStateBackend stateBackend = new MemoryStateBackend(); + + private final Object checkpointLock; + + /** + * Whether setup() was called on the operator. This is reset when calling close(). + */ + private boolean setupCalled = false; + private boolean initializeCalled = false; + + private volatile boolean wasFailedExternally = false; + + public AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator) throws Exception { + this(operator, new ExecutionConfig()); + } + + public AbstractStreamOperatorTestHarness( + StreamOperator<OUT> operator, + ExecutionConfig executionConfig) throws Exception { + this.operator = operator; + this.outputList = new ConcurrentLinkedQueue<>(); + Configuration underlyingConfig = new Configuration(); + this.config = new StreamConfig(underlyingConfig); + this.config.setCheckpointingEnabled(true); + this.executionConfig = executionConfig; + this.closableRegistry = new ClosableRegistry(); + this.checkpointLock = new Object(); + + final Environment env = new MockEnvironment( + "MockTask", + 3 * 1024 * 1024, + new MockInputSplitProvider(), + 1024, + underlyingConfig, + executionConfig, + MAX_PARALLELISM, + 1, 0); + + mockTask = mock(StreamTask.class); + processingTimeService = new TestProcessingTimeService(); + processingTimeService.setCurrentTime(0); + + when(mockTask.getName()).thenReturn("Mock Task"); + when(mockTask.getCheckpointLock()).thenReturn(checkpointLock); + when(mockTask.getConfiguration()).thenReturn(config); + when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig); + when(mockTask.getEnvironment()).thenReturn(env); + when(mockTask.getExecutionConfig()).thenReturn(executionConfig); + when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader()); + when(mockTask.getCancelables()).thenReturn(this.closableRegistry); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + wasFailedExternally = true; + return null; + } + }).when(mockTask).handleAsyncException(any(String.class), any(Throwable.class)); + + try { + doAnswer(new Answer<CheckpointStreamFactory>() { + @Override + public CheckpointStreamFactory answer(InvocationOnMock invocationOnMock) throws Throwable { + + final StreamOperator<?> operator = (StreamOperator<?>) invocationOnMock.getArguments()[0]; + return stateBackend.createStreamFactory(new JobID(), operator.getClass().getSimpleName()); + } + }).when(mockTask).createCheckpointStreamFactory(any(StreamOperator.class)); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + + try { + doAnswer(new Answer<OperatorStateBackend>() { + @Override + public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable { + final StreamOperator<?> operator = (StreamOperator<?>) invocationOnMock.getArguments()[0]; + final Collection<OperatorStateHandle> stateHandles = (Collection<OperatorStateHandle>) invocationOnMock.getArguments()[1]; + OperatorStateBackend osb; + if (null == stateHandles) { + osb = stateBackend.createOperatorStateBackend(env, operator.getClass().getSimpleName()); + } else { + osb = stateBackend.restoreOperatorStateBackend(env, operator.getClass().getSimpleName(), stateHandles); + } + mockTask.getCancelables().registerClosable(osb); + return osb; + } + }).when(mockTask).createOperatorStateBackend(any(StreamOperator.class), any(Collection.class)); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + + doAnswer(new Answer<ProcessingTimeService>() { + @Override + public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable { + return processingTimeService; + } + }).when(mockTask).getProcessingTimeService(); + } + + public void setStateBackend(AbstractStateBackend stateBackend) { + this.stateBackend = stateBackend; + } + + public Object getCheckpointLock() { + return mockTask.getCheckpointLock(); + } + + public Environment getEnvironment() { + return this.mockTask.getEnvironment(); + } + + /** + * Get all the output from the task. This contains StreamRecords and Events interleaved. + */ + public ConcurrentLinkedQueue<Object> getOutput() { + return outputList; + } + + /** + * Get all the output from the task and clear the output buffer. + * This contains only StreamRecords. + */ + @SuppressWarnings("unchecked") + public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() { + List<StreamRecord<? extends OUT>> resultElements = new LinkedList<>(); + for (Object e: getOutput()) { + if (e instanceof StreamRecord) { + resultElements.add((StreamRecord<OUT>) e); + } + } + return resultElements; + } + + /** + * Calls + * {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} + */ + public void setup() throws Exception { + operator.setup(mockTask, config, new MockOutput()); + setupCalled = true; + } + + /** + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}. + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} + * if it was not called before. + */ + public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { + if (!setupCalled) { + setup(); + } + operator.initializeState(operatorStateHandles); + initializeCalled = true; + } + + + /** + * Calls {@link StreamOperator#open()}. This also + * calls {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} + * if it was not called before. + */ + public void open() throws Exception { + if (!initializeCalled) { + initializeState(null); + } + operator.open(); + } + + /** + * Calls {@link StreamOperator#snapshotState(long, long, CheckpointStreamFactory)}. + */ + public OperatorSnapshotResult snapshot(long checkpointId, long timestamp) throws Exception { + + CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory( + new JobID(), + "test_op"); + + return operator.snapshotState(checkpointId, timestamp, streamFactory); + } + + /** + * Calls {@link StreamCheckpointedOperator#snapshotState(FSDataOutputStream, long, long)} if + * the operator implements this interface. + */ + @Deprecated + public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throws Exception { + + CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory( + new JobID(), + "test_op").createCheckpointStateOutputStream(checkpointId, timestamp); + if(operator instanceof StreamCheckpointedOperator) { + ((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp); + return outStream.closeAndGetHandle(); + } else { + throw new RuntimeException("Operator is not StreamCheckpointedOperator"); + } + } + + /** + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)} ()} + */ + public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { + operator.notifyOfCompletedCheckpoint(checkpointId); + } + + /** + * Calls {@link StreamCheckpointedOperator#restoreState(FSDataInputStream)} if + * the operator implements this interface. + */ @Deprecated + public void restore(StreamStateHandle snapshot) throws Exception { + if(operator instanceof StreamCheckpointedOperator) { + try (FSDataInputStream in = snapshot.openInputStream()) { + ((StreamCheckpointedOperator) operator).restoreState(in); + } + } else { + throw new RuntimeException("Operator is not StreamCheckpointedOperator"); + } + } + + /** + * Calls close and dispose on the operator. + */ + public void close() throws Exception { + operator.close(); + operator.dispose(); + if (processingTimeService != null) { + processingTimeService.shutdownService(); + } + setupCalled = false; + } + + public void setProcessingTime(long time) throws Exception { + processingTimeService.setCurrentTime(time); + } + + public long getProcessingTime() { + return processingTimeService.getCurrentProcessingTime(); + } + + public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { + this.config.setTimeCharacteristic(timeCharacteristic); + } + + public TimeCharacteristic getTimeCharacteristic() { + return this.config.getTimeCharacteristic(); + } + + public boolean wasFailedExternally() { + return wasFailedExternally; + } + + private class MockOutput implements Output<StreamRecord<OUT>> { + + private TypeSerializer<OUT> outputSerializer; + + @Override + public void emitWatermark(Watermark mark) { + outputList.add(mark); + } + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + outputList.add(latencyMarker); + } + + @Override + public void collect(StreamRecord<OUT> element) { + if (outputSerializer == null) { + outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig); + } + if (element.hasTimestamp()) { + outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()),element.getTimestamp())); + } else { + outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()))); + } + } + + @Override + public void close() { + // ignore + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index 9c9d11b..99527e7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -50,7 +50,6 @@ import static org.mockito.Mockito.doAnswer; /** * Extension of {@link OneInputStreamOperatorTestHarness} that allows the operator to get * a {@link KeyedStateBackend}. - * */ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> extends OneInputStreamOperatorTestHarness<IN, OUT> { @@ -171,7 +170,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> } /** - * + * */ @Override public void restore(StreamStateHandle snapshot) throws Exception { @@ -189,21 +188,12 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> } } - /** - * Calls close and dispose on the operator. - */ - public void close() throws Exception { - super.close(); - if (keyedStateBackend != null) { - keyedStateBackend.dispose(); - } - } - @Override public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { - if (null != operatorStateHandles) { - this.restoredKeyedState = operatorStateHandles.getManagedKeyedState(); + if (operatorStateHandles != null) { + restoredKeyedState = operatorStateHandles.getManagedKeyedState(); } + super.initializeState(operatorStateHandles); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java new file mode 100644 index 0000000..2e9885c --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Collection; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.doAnswer; + +/** + * Extension of {@link TwoInputStreamOperatorTestHarness} that allows the operator to get + * a {@link KeyedStateBackend}. + */ +public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> + extends TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> { + + // in case the operator creates one we store it here so that we + // can snapshot its state + private AbstractKeyedStateBackend<?> keyedStateBackend = null; + + // when we restore we keep the state here so that we can call restore + // when the operator requests the keyed state backend + private Collection<KeyGroupsStateHandle> restoredKeyedState = null; + + public KeyedTwoInputStreamOperatorTestHarness( + TwoInputStreamOperator<IN1, IN2, OUT> operator, + final KeySelector<IN1, K> keySelector1, + final KeySelector<IN2, K> keySelector2, + TypeInformation<K> keyType) throws Exception { + super(operator); + + ClosureCleaner.clean(keySelector1, false); + ClosureCleaner.clean(keySelector2, false); + config.setStatePartitioner(0, keySelector1); + config.setStatePartitioner(1, keySelector2); + config.setStateKeySerializer(keyType.createSerializer(executionConfig)); + config.setNumberOfKeyGroups(MAX_PARALLELISM); + + setupMockTaskCreateKeyedBackend(); + } + + public KeyedTwoInputStreamOperatorTestHarness( + TwoInputStreamOperator<IN1, IN2, OUT> operator, + ExecutionConfig executionConfig, + KeySelector<IN1, K> keySelector1, + KeySelector<IN2, K> keySelector2, + TypeInformation<K> keyType) throws Exception { + super(operator, executionConfig); + + ClosureCleaner.clean(keySelector1, false); + ClosureCleaner.clean(keySelector2, false); + config.setStatePartitioner(0, keySelector1); + config.setStatePartitioner(1, keySelector2); + config.setStateKeySerializer(keyType.createSerializer(executionConfig)); + config.setNumberOfKeyGroups(MAX_PARALLELISM); + + setupMockTaskCreateKeyedBackend(); + } + + private void setupMockTaskCreateKeyedBackend() { + + try { + doAnswer(new Answer<KeyedStateBackend>() { + @Override + public KeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable { + + final TypeSerializer keySerializer = (TypeSerializer) invocationOnMock.getArguments()[0]; + final int numberOfKeyGroups = (Integer) invocationOnMock.getArguments()[1]; + final KeyGroupRange keyGroupRange = (KeyGroupRange) invocationOnMock.getArguments()[2]; + + if(keyedStateBackend != null) { + keyedStateBackend.close(); + } + + if (restoredKeyedState == null) { + keyedStateBackend = stateBackend.createKeyedStateBackend( + mockTask.getEnvironment(), + new JobID(), + "test_op", + keySerializer, + numberOfKeyGroups, + keyGroupRange, + mockTask.getEnvironment().getTaskKvStateRegistry()); + return keyedStateBackend; + } else { + keyedStateBackend = stateBackend.restoreKeyedStateBackend( + mockTask.getEnvironment(), + new JobID(), + "test_op", + keySerializer, + numberOfKeyGroups, + keyGroupRange, + restoredKeyedState, + mockTask.getEnvironment().getTaskKvStateRegistry()); + restoredKeyedState = null; + return keyedStateBackend; + } + } + }).when(mockTask).createKeyedStateBackend(any(TypeSerializer.class), anyInt(), any(KeyGroupRange.class)); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { + if (restoredKeyedState != null) { + restoredKeyedState = operatorStateHandles.getManagedKeyedState(); + } + + super.initializeState(operatorStateHandles); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 5b277bf..a3e095a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -18,89 +18,23 @@ package org.apache.flink.streaming.util; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.ClosableRegistry; -import org.apache.flink.runtime.state.OperatorStateBackend; -import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.OperatorSnapshotResult; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; -import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.apache.flink.util.InstantiationUtil; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.util.Collection; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.RunnableFuture; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * A test harness for testing a {@link OneInputStreamOperator}. * - * <p> - * This mock task provides the operator with a basic runtime context and allows pushing elements + * <p>This mock task provides the operator with a basic runtime context and allows pushing elements * and watermarks into the operator. {@link java.util.Deque}s containing the emitted elements * and watermarks can be retrieved. You are free to modify these. */ -public class OneInputStreamOperatorTestHarness<IN, OUT> { - - public static final int MAX_PARALLELISM = 10; - - final OneInputStreamOperator<IN, OUT> operator; - - final ConcurrentLinkedQueue<Object> outputList; - - final StreamConfig config; - - final ExecutionConfig executionConfig; - - final TestProcessingTimeService processingTimeService; - - StreamTask<?, ?> mockTask; - - ClosableRegistry closableRegistry; - - // use this as default for tests - AbstractStateBackend stateBackend = new MemoryStateBackend(); +public class OneInputStreamOperatorTestHarness<IN, OUT> + extends AbstractStreamOperatorTestHarness<OUT> { - private final Object checkpointLock; - - /** - * Whether setup() was called on the operator. This is reset when calling close(). - */ - private boolean setupCalled = false; - private boolean initializeCalled = false; - - private volatile boolean wasFailedExternally = false; + private final OneInputStreamOperator<IN, OUT> oneInputOperator; public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) throws Exception { this(operator, new ExecutionConfig()); @@ -109,268 +43,24 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { public OneInputStreamOperatorTestHarness( OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig) throws Exception { - this.operator = operator; - this.outputList = new ConcurrentLinkedQueue<>(); - Configuration underlyingConfig = new Configuration(); - this.config = new StreamConfig(underlyingConfig); - this.config.setCheckpointingEnabled(true); - this.executionConfig = executionConfig; - this.closableRegistry = new ClosableRegistry(); - - this.checkpointLock = new Object(); - - final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, executionConfig, MAX_PARALLELISM, 1, 0); - mockTask = mock(StreamTask.class); - processingTimeService = new TestProcessingTimeService(); - processingTimeService.setCurrentTime(0); - - when(mockTask.getName()).thenReturn("Mock Task"); - when(mockTask.getCheckpointLock()).thenReturn(checkpointLock); - when(mockTask.getConfiguration()).thenReturn(config); - when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig); - when(mockTask.getEnvironment()).thenReturn(env); - when(mockTask.getExecutionConfig()).thenReturn(executionConfig); - when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader()); - when(mockTask.getCancelables()).thenReturn(this.closableRegistry); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - wasFailedExternally = true; - return null; - } - }).when(mockTask).handleAsyncException(any(String.class), any(Throwable.class)); - - try { - doAnswer(new Answer<CheckpointStreamFactory>() { - @Override - public CheckpointStreamFactory answer(InvocationOnMock invocationOnMock) throws Throwable { - - final StreamOperator<?> operator = (StreamOperator<?>) invocationOnMock.getArguments()[0]; - return stateBackend.createStreamFactory(new JobID(), operator.getClass().getSimpleName()); - } - }).when(mockTask).createCheckpointStreamFactory(any(StreamOperator.class)); - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - - try { - doAnswer(new Answer<OperatorStateBackend>() { - @Override - public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable { - final StreamOperator<?> operator = (StreamOperator<?>) invocationOnMock.getArguments()[0]; - final Collection<OperatorStateHandle> stateHandles = (Collection<OperatorStateHandle>) invocationOnMock.getArguments()[1]; - OperatorStateBackend osb; - if (null == stateHandles) { - osb = stateBackend.createOperatorStateBackend(env, operator.getClass().getSimpleName()); - } else { - osb = stateBackend.restoreOperatorStateBackend(env, operator.getClass().getSimpleName(), stateHandles); - } - mockTask.getCancelables().registerClosable(osb); - return osb; - } - }).when(mockTask).createOperatorStateBackend(any(StreamOperator.class), any(Collection.class)); - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - - doAnswer(new Answer<ProcessingTimeService>() { - @Override - public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable { - return processingTimeService; - } - }).when(mockTask).getProcessingTimeService(); - } - - public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { - this.config.setTimeCharacteristic(timeCharacteristic); - } - - public TimeCharacteristic getTimeCharacteristic() { - return this.config.getTimeCharacteristic(); - } - - public boolean wasFailedExternally() { - return wasFailedExternally; - } - - public void setStateBackend(AbstractStateBackend stateBackend) { - this.stateBackend = stateBackend; - } - - public Object getCheckpointLock() { - return mockTask.getCheckpointLock(); - } - - public Environment getEnvironment() { - return this.mockTask.getEnvironment(); - } - - /** - * Get all the output from the task. This contains StreamRecords and Events interleaved. - */ - public ConcurrentLinkedQueue<Object> getOutput() { - return outputList; - } - - /** - * Get all the output from the task and clear the output buffer. - * This contains only StreamRecords. - */ - @SuppressWarnings("unchecked") - public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() { - List<StreamRecord<? extends OUT>> resultElements = new LinkedList<>(); - for (Object e: getOutput()) { - if (e instanceof StreamRecord) { - resultElements.add((StreamRecord<OUT>) e); - } - } - return resultElements; - } - - /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} - */ - public void setup() throws Exception { - operator.setup(mockTask, config, new MockOutput()); - setupCalled = true; - } - - /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}. - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} - * if it was not called before. - */ - public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { - if (!setupCalled) { - setup(); - } - operator.initializeState(operatorStateHandles); - initializeCalled = true; - } - - /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)} if it - * was not called before. - */ - public void open() throws Exception { - if (!initializeCalled) { - initializeState(null); - } - operator.open(); - } - - /** - * - */ - public OperatorSnapshotResult snapshot(long checkpointId, long timestamp) throws Exception { - - CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory( - new JobID(), - "test_op"); - - return operator.snapshotState(checkpointId, timestamp, streamFactory); - } - - /** - * - */ - @Deprecated - public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throws Exception { - - CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory( - new JobID(), - "test_op").createCheckpointStateOutputStream(checkpointId, timestamp); - if(operator instanceof StreamCheckpointedOperator) { - ((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp); - return outStream.closeAndGetHandle(); - } else { - throw new RuntimeException("Operator is not StreamCheckpointedOperator"); - } - } - - /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)} ()} - */ - public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { - operator.notifyOfCompletedCheckpoint(checkpointId); - } + super(operator, executionConfig); - /** - * - */ - @Deprecated - public void restore(StreamStateHandle snapshot) throws Exception { - if(operator instanceof StreamCheckpointedOperator) { - try (FSDataInputStream in = snapshot.openInputStream()) { - ((StreamCheckpointedOperator) operator).restoreState(in); - } - } else { - throw new RuntimeException("Operator is not StreamCheckpointedOperator"); - } - } - - /** - * Calls close and dispose on the operator. - */ - public void close() throws Exception { - operator.close(); - operator.dispose(); - if (processingTimeService != null) { - processingTimeService.shutdownService(); - } - setupCalled = false; + this.oneInputOperator = operator; } public void processElement(StreamRecord<IN> element) throws Exception { operator.setKeyContextElement1(element); - operator.processElement(element); + oneInputOperator.processElement(element); } public void processElements(Collection<StreamRecord<IN>> elements) throws Exception { for (StreamRecord<IN> element: elements) { operator.setKeyContextElement1(element); - operator.processElement(element); - } - } - - public void setProcessingTime(long time) throws Exception { - synchronized (checkpointLock) { - processingTimeService.setCurrentTime(time); + oneInputOperator.processElement(element); } } public void processWatermark(Watermark mark) throws Exception { - operator.processWatermark(mark); - } - - private class MockOutput implements Output<StreamRecord<OUT>> { - - private TypeSerializer<OUT> outputSerializer; - - @Override - public void emitWatermark(Watermark mark) { - outputList.add(mark); - } - - @Override - public void emitLatencyMarker(LatencyMarker latencyMarker) { - outputList.add(latencyMarker); - } - - @Override - public void collect(StreamRecord<OUT> element) { - if (outputSerializer == null) { - outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig); - } - outputList.add(new StreamRecord<OUT>(outputSerializer.copy(element.getValue()), - element.getTimestamp())); - } - - @Override - public void close() { - // ignore - } + oneInputOperator.processWatermark(mark); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java index 7df6848..95eea98 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java @@ -19,26 +19,9 @@ package org.apache.flink.streaming.util; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.state.ClosableRegistry; -import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.runtime.tasks.StreamTask; - -import java.util.concurrent.ConcurrentLinkedQueue; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * A test harness for testing a {@link TwoInputStreamOperator}. @@ -48,122 +31,35 @@ import static org.mockito.Mockito.when; * and watermarks into the operator. {@link java.util.Deque}s containing the emitted elements * and watermarks can be retrieved. you are free to modify these. */ -public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> { - - TwoInputStreamOperator<IN1, IN2, OUT> operator; - - final ConcurrentLinkedQueue<Object> outputList; - - final ExecutionConfig executionConfig; - - final Object checkpointLock; +public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT>extends AbstractStreamOperatorTestHarness<OUT> { - final ClosableRegistry closableRegistry; + private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator; - boolean initializeCalled = false; - - public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) { - this(operator, new StreamConfig(new Configuration())); + public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) throws Exception { + this(operator, new ExecutionConfig()); } - public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator, StreamConfig config) { - this.operator = operator; - this.outputList = new ConcurrentLinkedQueue<Object>(); - this.executionConfig = new ExecutionConfig(); - this.checkpointLock = new Object(); - this.closableRegistry = new ClosableRegistry(); - - Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024); - StreamTask<?, ?> mockTask = mock(StreamTask.class); - when(mockTask.getName()).thenReturn("Mock Task"); - when(mockTask.getCheckpointLock()).thenReturn(checkpointLock); - when(mockTask.getConfiguration()).thenReturn(config); - when(mockTask.getEnvironment()).thenReturn(env); - when(mockTask.getExecutionConfig()).thenReturn(executionConfig); - when(mockTask.getCancelables()).thenReturn(this.closableRegistry); - - operator.setup(mockTask, new StreamConfig(new Configuration()), new MockOutput()); - } - - /** - * Get all the output from the task. This contains StreamRecords and Events interleaved. Use - * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)} - * to extract only the StreamRecords. - */ - public ConcurrentLinkedQueue<Object> getOutput() { - return outputList; - } + public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator, ExecutionConfig executionConfig) throws Exception { + super(operator, executionConfig); - /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}. - */ - public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { - operator.initializeState(operatorStateHandles); - initializeCalled = true; - } - - /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. - */ - public void open() throws Exception { - if(!initializeCalled) { - initializeState(mock(OperatorStateHandles.class)); - } - - operator.open(); - } - - /** - * Calls close on the operator. - */ - public void close() throws Exception { - operator.close(); + this.twoInputOperator = operator; } public void processElement1(StreamRecord<IN1> element) throws Exception { - operator.processElement1(element); + twoInputOperator.setKeyContextElement1(element); + twoInputOperator.processElement1(element); } public void processElement2(StreamRecord<IN2> element) throws Exception { - operator.processElement2(element); + twoInputOperator.setKeyContextElement2(element); + twoInputOperator.processElement2(element); } public void processWatermark1(Watermark mark) throws Exception { - operator.processWatermark1(mark); + twoInputOperator.processWatermark1(mark); } public void processWatermark2(Watermark mark) throws Exception { - operator.processWatermark2(mark); - } - - private class MockOutput implements Output<StreamRecord<OUT>> { - - private TypeSerializer<OUT> outputSerializer; - - @Override - @SuppressWarnings("unchecked") - public void emitWatermark(Watermark mark) { - outputList.add(mark); - } - - @Override - public void emitLatencyMarker(LatencyMarker latencyMarker) { - outputList.add(latencyMarker); - } - - @Override - @SuppressWarnings("unchecked") - public void collect(StreamRecord<OUT> element) { - if (outputSerializer == null) { - outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig); - } - outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()), - element.getTimestamp())); - } - - @Override - public void close() { - // ignore - } + twoInputOperator.processWatermark2(mark); } }
