[hotfix][tests] Introduce MockEnvironmentBuilder to deduplicate MockEnvironment constructors
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1f64d87 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1f64d87 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1f64d87 Branch: refs/heads/release-1.5 Commit: e1f64d8701535e265f3151d1a9838d01031023ea Parents: 7b680c9 Author: Piotr Nowojski <[email protected]> Authored: Wed May 9 11:12:18 2018 +0200 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue May 22 16:54:23 2018 +0800 ---------------------------------------------------------------------- .../kafka/FlinkKafkaConsumerBaseTest.java | 13 +- .../kinesis/testutils/TestRuntimeContext.java | 13 +- .../operators/testutils/MockEnvironment.java | 102 +-------------- .../testutils/MockEnvironmentBuilder.java | 125 +++++++++++++++++++ .../operators/testutils/TaskTestBase.java | 8 +- .../source/InputFormatSourceFunctionTest.java | 13 +- .../StreamOperatorSnapshotRestoreTest.java | 29 ++--- .../operators/async/AsyncWaitOperatorTest.java | 13 +- .../operators/StreamOperatorChainingTest.java | 14 +-- .../streaming/runtime/tasks/StreamTaskTest.java | 35 +++--- .../util/AbstractStreamOperatorTestHarness.java | 21 ++-- .../streaming/util/SourceFunctionUtil.java | 15 +-- .../PojoSerializerUpgradeTest.java | 25 ++-- 13 files changed, 224 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index b226ff1..4605015 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -33,10 +33,9 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; -import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; @@ -834,12 +833,10 @@ public class FlinkKafkaConsumerBaseTest { super( new MockStreamOperator(), - new MockEnvironment( - "mockTask", - 4 * MemoryManager.DEFAULT_PAGE_SIZE, - null, - 16, - new TestTaskStateManager()), + new MockEnvironmentBuilder() + .setTaskName("mockTask") + .setMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE) + .build(), Collections.emptyMap()); this.isCheckpointingEnabled = isCheckpointingEnabled; http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java index ce0bd97..740d2f2 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java @@ -21,8 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -45,12 +44,10 @@ public class TestRuntimeContext extends StreamingRuntimeContext { super( new TestStreamOperator(), - new MockEnvironment( - "mockTask", - 4 * MemoryManager.DEFAULT_PAGE_SIZE, - null, - 16, - new TestTaskStateManager()), + new MockEnvironmentBuilder() + .setTaskName("mockTask") + .setMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE) + .build(), Collections.emptyMap()); this.isCheckpointingEnabled = isCheckpointingEnabled; http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index ce19a5e..4bf94e9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -45,7 +45,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.TaskStateManager; -import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.types.Record; @@ -109,106 +108,11 @@ public class MockEnvironment implements Environment, AutoCloseable { private Optional<Throwable> actualExternalFailureCause = Optional.empty(); - public MockEnvironment() { - this( - "mock-task", - 1024 * MemoryManager.DEFAULT_PAGE_SIZE, - null, - 16, - new TestTaskStateManager()); + public static MockEnvironmentBuilder builder() { + return new MockEnvironmentBuilder(); } - public MockEnvironment( - String taskName, - long memorySize, - MockInputSplitProvider inputSplitProvider, - int bufferSize, - TaskStateManager taskStateManager) { - this( - taskName, - memorySize, - inputSplitProvider, - bufferSize, - new Configuration(), - new ExecutionConfig(), - taskStateManager); - } - - public MockEnvironment( - String taskName, - long memorySize, - MockInputSplitProvider inputSplitProvider, - int bufferSize, Configuration taskConfiguration, - ExecutionConfig executionConfig, - TaskStateManager taskStateManager) { - this( - taskName, - memorySize, - inputSplitProvider, - bufferSize, - taskConfiguration, - executionConfig, - taskStateManager, - 1, - 1, - 0); - } - - public MockEnvironment( - String taskName, - long memorySize, - MockInputSplitProvider inputSplitProvider, - int bufferSize, - Configuration taskConfiguration, - ExecutionConfig executionConfig, - TaskStateManager taskStateManager, - int maxParallelism, - int parallelism, - int subtaskIndex) { - this( - taskName, - memorySize, - inputSplitProvider, - bufferSize, - taskConfiguration, - executionConfig, - taskStateManager, - maxParallelism, - parallelism, - subtaskIndex, - Thread.currentThread().getContextClassLoader()); - - } - - public MockEnvironment( - String taskName, - long memorySize, - MockInputSplitProvider inputSplitProvider, - int bufferSize, - Configuration taskConfiguration, - ExecutionConfig executionConfig, - TaskStateManager taskStateManager, - int maxParallelism, - int parallelism, - int subtaskIndex, - ClassLoader userCodeClassLoader) { - this( - new JobID(), - new JobVertexID(), - taskName, - memorySize, - inputSplitProvider, - bufferSize, - taskConfiguration, - executionConfig, - taskStateManager, - maxParallelism, - parallelism, - subtaskIndex, - userCodeClassLoader); - } - - public MockEnvironment( + protected MockEnvironment( JobID jobID, JobVertexID jobVertexID, String taskName, http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java new file mode 100644 index 0000000..dfb10d4 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.testutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.state.TaskStateManager; +import org.apache.flink.runtime.state.TestTaskStateManager; + +public class MockEnvironmentBuilder { + private String taskName = "mock-task"; + private long memorySize = 1024 * MemoryManager.DEFAULT_PAGE_SIZE; + private MockInputSplitProvider inputSplitProvider = null; + private int bufferSize = 16; + private TaskStateManager taskStateManager = new TestTaskStateManager(); + private Configuration taskConfiguration = new Configuration(); + private ExecutionConfig executionConfig = new ExecutionConfig(); + private int maxParallelism = 1; + private int parallelism = 1; + private int subtaskIndex = 0; + private ClassLoader userCodeClassLoader = Thread.currentThread().getContextClassLoader(); + private JobID jobID = new JobID(); + private JobVertexID jobVertexID = new JobVertexID(); + + public MockEnvironmentBuilder setTaskName(String taskName) { + this.taskName = taskName; + return this; + } + + public MockEnvironmentBuilder setMemorySize(long memorySize) { + this.memorySize = memorySize; + return this; + } + + public MockEnvironmentBuilder setInputSplitProvider(MockInputSplitProvider inputSplitProvider) { + this.inputSplitProvider = inputSplitProvider; + return this; + } + + public MockEnvironmentBuilder setBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public MockEnvironmentBuilder setTaskStateManager(TaskStateManager taskStateManager) { + this.taskStateManager = taskStateManager; + return this; + } + + public MockEnvironmentBuilder setTaskConfiguration(Configuration taskConfiguration) { + this.taskConfiguration = taskConfiguration; + return this; + } + + public MockEnvironmentBuilder setExecutionConfig(ExecutionConfig executionConfig) { + this.executionConfig = executionConfig; + return this; + } + + public MockEnvironmentBuilder setMaxParallelism(int maxParallelism) { + this.maxParallelism = maxParallelism; + return this; + } + + public MockEnvironmentBuilder setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + + public MockEnvironmentBuilder setSubtaskIndex(int subtaskIndex) { + this.subtaskIndex = subtaskIndex; + return this; + } + + public MockEnvironmentBuilder setUserCodeClassLoader(ClassLoader userCodeClassLoader) { + this.userCodeClassLoader = userCodeClassLoader; + return this; + } + + public MockEnvironmentBuilder setJobID(JobID jobID) { + this.jobID = jobID; + return this; + } + + public MockEnvironmentBuilder setJobVertexID(JobVertexID jobVertexID) { + this.jobVertexID = jobVertexID; + return this; + } + + public MockEnvironment build() { + return new MockEnvironment( + jobID, + jobVertexID, + taskName, + memorySize, + inputSplitProvider, + bufferSize, + taskConfiguration, + executionConfig, + taskStateManager, + maxParallelism, + parallelism, + subtaskIndex, + userCodeClassLoader); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java index a40992c..16485ca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java @@ -54,8 +54,12 @@ public abstract class TaskTestBase extends TestLogger { public void initEnvironment(long memorySize, int bufferSize) { this.memorySize = memorySize; this.inputSplitProvider = new MockInputSplitProvider(); - TestTaskStateManager taskStateManager = new TestTaskStateManager(); - this.mockEnv = new MockEnvironment("mock task", this.memorySize, this.inputSplitProvider, bufferSize, taskStateManager); + this.mockEnv = new MockEnvironmentBuilder() + .setTaskName("mock task") + .setMemorySize(this.memorySize) + .setInputSplitProvider(this.inputSplitProvider) + .setBufferSize(bufferSize) + .build(); } public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId) { http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java index 18c8ac5..84a45d8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; @@ -66,12 +66,11 @@ public class InputFormatSourceFunctionTest { final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat(); final InputFormatSourceFunction<Integer> reader = new InputFormatSourceFunction<>(format, TypeInformation.of(Integer.class)); - try (MockEnvironment environment = new MockEnvironment( - "no", - 4 * MemoryManager.DEFAULT_PAGE_SIZE, - null, - 16, - new TestTaskStateManager())) { + try (MockEnvironment environment = + new MockEnvironmentBuilder() + .setTaskName("no") + .setMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE) + .build()) { reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits, environment)); http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java index 6d011a3..a38ffa6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.operators; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -27,7 +26,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; @@ -36,6 +34,7 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; @@ -148,20 +147,18 @@ public class StreamOperatorSnapshotRestoreTest extends TestLogger { LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(mode != ONLY_JM_RECOVERY, directoryProvider); - MockEnvironment mockEnvironment = new MockEnvironment( - jobID, - jobVertexID, - "test", - 1024L * 1024L, - new MockInputSplitProvider(), - 1024 * 1024, - new Configuration(), - new ExecutionConfig(), - new TestTaskStateManager(localRecoveryConfig), - MAX_PARALLELISM, - 1, - subtaskIdx, - getClass().getClassLoader()); + MockEnvironment mockEnvironment = new MockEnvironmentBuilder() + .setJobID(jobID) + .setJobVertexID(jobVertexID) + .setTaskName("test") + .setMemorySize(1024L * 1024L) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(1024 * 1024) + .setTaskStateManager(new TestTaskStateManager(localRecoveryConfig)) + .setMaxParallelism(MAX_PARALLELISM) + .setSubtaskIndex(subtaskIdx) + .setUserCodeClassLoader(getClass().getClassLoader()) + .build(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness = new KeyedOneInputStreamOperatorTestHarness<>( http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 35e2fbd..cdd77d3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.streaming.api.datastream.AsyncDataStream; @@ -650,12 +651,12 @@ public class AsyncWaitOperatorTest extends TestLogger { @Nonnull private MockEnvironment createMockEnvironment() { - return new MockEnvironment( - "foobarTask", - 1024 * 1024L, - new MockInputSplitProvider(), - 4 * 1024, - new TestTaskStateManager()); + return new MockEnvironmentBuilder() + .setTaskName("foobarTask") + .setMemorySize(1024 * 1024L) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(4 * 1024) + .build(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java index e980ab7..fd6a953 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java @@ -25,8 +25,8 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SplitStream; @@ -170,12 +170,12 @@ public class StreamOperatorChainingTest { } private MockEnvironment createMockEnvironment(String taskName) { - return new MockEnvironment( - taskName, - 3 * 1024 * 1024, - new MockInputSplitProvider(), - 1024, - new TestTaskStateManager()); + return new MockEnvironmentBuilder() + .setTaskName(taskName) + .setMemorySize(3 * 1024 * 1024) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(1024) + .build(); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index caea662..cb31970 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -61,6 +61,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; @@ -332,7 +333,7 @@ public class StreamTaskTest extends TestLogger { TaskInfo mockTaskInfo = mock(TaskInfo.class); when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar"); when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0); - Environment mockEnvironment = new MockEnvironment(); + Environment mockEnvironment = new MockEnvironmentBuilder().build(); StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); @@ -402,7 +403,7 @@ public class StreamTaskTest extends TestLogger { final long checkpointId = 42L; final long timestamp = 1L; - MockEnvironment mockEnvironment = new MockEnvironment(); + MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build(); StreamTask<?, ?> streamTask = spy(new EmptyStreamTask(mockEnvironment)); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); @@ -505,12 +506,10 @@ public class StreamTaskTest extends TestLogger { null, checkpointResponder); - MockEnvironment mockEnvironment = new MockEnvironment( - "mock-task", - 1024 * MemoryManager.DEFAULT_PAGE_SIZE, - null, - 16, - taskStateManager); + MockEnvironment mockEnvironment = new MockEnvironmentBuilder() + .setTaskName("mock-task") + .setTaskStateManager(taskStateManager) + .build(); StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); @@ -603,7 +602,7 @@ public class StreamTaskTest extends TestLogger { final OneShotLatch createSubtask = new OneShotLatch(); final OneShotLatch completeSubtask = new OneShotLatch(); - Environment mockEnvironment = spy(new MockEnvironment()); + Environment mockEnvironment = spy(new MockEnvironmentBuilder().build()); whenNew(OperatorSnapshotFinalizer.class). withAnyArguments(). @@ -689,7 +688,7 @@ public class StreamTaskTest extends TestLogger { final long checkpointId = 42L; final long timestamp = 1L; - Environment mockEnvironment = spy(new MockEnvironment()); + Environment mockEnvironment = spy(new MockEnvironmentBuilder().build()); // latch blocks until the async checkpoint thread acknowledges final OneShotLatch checkpointCompletedLatch = new OneShotLatch(); @@ -769,14 +768,14 @@ public class StreamTaskTest extends TestLogger { streamConfig.setStreamOperator(new BlockingCloseStreamOperator()); streamConfig.setOperatorID(new OperatorID()); - try (MockEnvironment mockEnvironment = new MockEnvironment( - "Test Task", - 32L * 1024L, - new MockInputSplitProvider(), - 1, - taskConfiguration, - new ExecutionConfig(), - new TestTaskStateManager())) { + try (MockEnvironment mockEnvironment = + new MockEnvironmentBuilder() + .setTaskName("Test Task") + .setMemorySize(32L * 1024L) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(1) + .setTaskConfiguration(taskConfiguration) + .build()) { StreamTask<Void, BlockingCloseStreamOperator> streamTask = new NoOpStreamTask<>(mockEnvironment); final AtomicReference<Throwable> atomicThrowable = new AtomicReference<>(null); http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index ed2da18..26ad3ab 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; @@ -137,17 +138,15 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { int subtaskIndex) throws Exception { this( operator, - new MockEnvironment( - "MockTask", - 3 * 1024 * 1024, - new MockInputSplitProvider(), - 1024, - new Configuration(), - new ExecutionConfig(), - new TestTaskStateManager(), - maxParallelism, - parallelism, - subtaskIndex), + new MockEnvironmentBuilder() + .setTaskName("MockTask") + .setMemorySize(3 * 1024 * 1024) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(1024) + .setMaxParallelism(maxParallelism) + .setParallelism(parallelism) + .setSubtaskIndex(subtaskIndex) + .build(), true); } http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java index 3f54081..660a333 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java @@ -22,8 +22,8 @@ import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -51,12 +51,13 @@ public class SourceFunctionUtil { } private static <T extends Serializable> List<T> runRichSourceFunction(SourceFunction<T> sourceFunction) throws Exception { - try (MockEnvironment environment = new MockEnvironment( - "MockTask", - 3 * 1024 * 1024, - new MockInputSplitProvider(), - 1024, - new TestTaskStateManager())) { + try (MockEnvironment environment = + new MockEnvironmentBuilder() + .setTaskName("MockTask") + .setMemorySize(3 * 1024 * 1024) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(1024) + .build()) { AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class); when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig()); http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java index aaa96fb..fe56782 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java @@ -37,12 +37,12 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackendLoader; -import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamMap; @@ -351,18 +351,17 @@ public class PojoSerializerUpgradeTest extends TestLogger { OperatorSubtaskState operatorSubtaskState, Iterable<Long> input) throws Exception { - try (final MockEnvironment environment = new MockEnvironment( - "test task", - 32 * 1024, - new MockInputSplitProvider(), - 256, - taskConfiguration, - executionConfig, - new TestTaskStateManager(), - 16, - 1, - 0, - classLoader)) { + try (final MockEnvironment environment = + new MockEnvironmentBuilder() + .setTaskName("test task") + .setMemorySize(32 * 1024) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(256) + .setTaskConfiguration(taskConfiguration) + .setExecutionConfig(executionConfig) + .setMaxParallelism(16) + .setUserCodeClassLoader(classLoader) + .build()) { OneInputStreamOperatorTestHarness<Long, Long> harness = null; try {
