[hotfix][tests] Introduce MockEnvironmentBuilder to deduplicate MockEnvironment 
constructors


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1f64d87
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1f64d87
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1f64d87

Branch: refs/heads/release-1.5
Commit: e1f64d8701535e265f3151d1a9838d01031023ea
Parents: 7b680c9
Author: Piotr Nowojski <[email protected]>
Authored: Wed May 9 11:12:18 2018 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue May 22 16:54:23 2018 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  13 +-
 .../kinesis/testutils/TestRuntimeContext.java   |  13 +-
 .../operators/testutils/MockEnvironment.java    | 102 +--------------
 .../testutils/MockEnvironmentBuilder.java       | 125 +++++++++++++++++++
 .../operators/testutils/TaskTestBase.java       |   8 +-
 .../source/InputFormatSourceFunctionTest.java   |  13 +-
 .../StreamOperatorSnapshotRestoreTest.java      |  29 ++---
 .../operators/async/AsyncWaitOperatorTest.java  |  13 +-
 .../operators/StreamOperatorChainingTest.java   |  14 +--
 .../streaming/runtime/tasks/StreamTaskTest.java |  35 +++---
 .../util/AbstractStreamOperatorTestHarness.java |  21 ++--
 .../streaming/util/SourceFunctionUtil.java      |  15 +--
 .../PojoSerializerUpgradeTest.java              |  25 ++--
 13 files changed, 224 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index b226ff1..4605015 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -33,10 +33,9 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -834,12 +833,10 @@ public class FlinkKafkaConsumerBaseTest {
 
                        super(
                                new MockStreamOperator(),
-                               new MockEnvironment(
-                                       "mockTask",
-                                       4 * MemoryManager.DEFAULT_PAGE_SIZE,
-                                       null,
-                                       16,
-                                       new TestTaskStateManager()),
+                               new MockEnvironmentBuilder()
+                                       .setTaskName("mockTask")
+                                       .setMemorySize(4 * 
MemoryManager.DEFAULT_PAGE_SIZE)
+                                       .build(),
                                Collections.emptyMap());
 
                        this.isCheckpointingEnabled = isCheckpointingEnabled;

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
index ce0bd97..740d2f2 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
@@ -21,8 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
@@ -45,12 +44,10 @@ public class TestRuntimeContext extends 
StreamingRuntimeContext {
 
                super(
                        new TestStreamOperator(),
-                       new MockEnvironment(
-                               "mockTask",
-                               4 * MemoryManager.DEFAULT_PAGE_SIZE,
-                               null,
-                               16,
-                               new TestTaskStateManager()),
+                       new MockEnvironmentBuilder()
+                               .setTaskName("mockTask")
+                               .setMemorySize(4 * 
MemoryManager.DEFAULT_PAGE_SIZE)
+                               .build(),
                        Collections.emptyMap());
 
                this.isCheckpointingEnabled = isCheckpointingEnabled;

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index ce19a5e..4bf94e9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -45,7 +45,6 @@ import 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateManager;
-import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
@@ -109,106 +108,11 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
 
        private Optional<Throwable> actualExternalFailureCause = 
Optional.empty();
 
-       public MockEnvironment() {
-               this(
-                       "mock-task",
-                       1024 * MemoryManager.DEFAULT_PAGE_SIZE,
-                       null,
-                       16,
-                       new TestTaskStateManager());
+       public static MockEnvironmentBuilder builder() {
+               return new MockEnvironmentBuilder();
        }
 
-       public MockEnvironment(
-               String taskName,
-               long memorySize,
-               MockInputSplitProvider inputSplitProvider,
-               int bufferSize,
-               TaskStateManager taskStateManager) {
-               this(
-                       taskName,
-                       memorySize,
-                       inputSplitProvider,
-                       bufferSize,
-                       new Configuration(),
-                       new ExecutionConfig(),
-                       taskStateManager);
-       }
-
-       public MockEnvironment(
-               String taskName,
-               long memorySize,
-               MockInputSplitProvider inputSplitProvider,
-               int bufferSize, Configuration taskConfiguration,
-               ExecutionConfig executionConfig,
-               TaskStateManager taskStateManager) {
-               this(
-                       taskName,
-                       memorySize,
-                       inputSplitProvider,
-                       bufferSize,
-                       taskConfiguration,
-                       executionConfig,
-                       taskStateManager,
-                       1,
-                       1,
-                       0);
-       }
-
-       public MockEnvironment(
-                       String taskName,
-                       long memorySize,
-                       MockInputSplitProvider inputSplitProvider,
-                       int bufferSize,
-                       Configuration taskConfiguration,
-                       ExecutionConfig executionConfig,
-                       TaskStateManager taskStateManager,
-                       int maxParallelism,
-                       int parallelism,
-                       int subtaskIndex) {
-               this(
-                       taskName,
-                       memorySize,
-                       inputSplitProvider,
-                       bufferSize,
-                       taskConfiguration,
-                       executionConfig,
-                       taskStateManager,
-                       maxParallelism,
-                       parallelism,
-                       subtaskIndex,
-                       Thread.currentThread().getContextClassLoader());
-
-       }
-
-       public MockEnvironment(
-                       String taskName,
-                       long memorySize,
-                       MockInputSplitProvider inputSplitProvider,
-                       int bufferSize,
-                       Configuration taskConfiguration,
-                       ExecutionConfig executionConfig,
-                       TaskStateManager taskStateManager,
-                       int maxParallelism,
-                       int parallelism,
-                       int subtaskIndex,
-                       ClassLoader userCodeClassLoader) {
-               this(
-                       new JobID(),
-                       new JobVertexID(),
-                       taskName,
-                       memorySize,
-                       inputSplitProvider,
-                       bufferSize,
-                       taskConfiguration,
-                       executionConfig,
-                       taskStateManager,
-                       maxParallelism,
-                       parallelism,
-                       subtaskIndex,
-                       userCodeClassLoader);
-       }
-
-       public MockEnvironment(
+       protected MockEnvironment(
                JobID jobID,
                JobVertexID jobVertexID,
                String taskName,

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
new file mode 100644
index 0000000..dfb10d4
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+
+public class MockEnvironmentBuilder {
+       private String taskName = "mock-task";
+       private long memorySize = 1024 * MemoryManager.DEFAULT_PAGE_SIZE;
+       private MockInputSplitProvider inputSplitProvider = null;
+       private int bufferSize = 16;
+       private TaskStateManager taskStateManager = new TestTaskStateManager();
+       private Configuration taskConfiguration = new Configuration();
+       private ExecutionConfig executionConfig = new ExecutionConfig();
+       private int maxParallelism = 1;
+       private int parallelism = 1;
+       private int subtaskIndex = 0;
+       private ClassLoader userCodeClassLoader = 
Thread.currentThread().getContextClassLoader();
+       private JobID jobID = new JobID();
+       private JobVertexID jobVertexID = new JobVertexID();
+
+       public MockEnvironmentBuilder setTaskName(String taskName) {
+               this.taskName = taskName;
+               return this;
+       }
+
+       public MockEnvironmentBuilder setMemorySize(long memorySize) {
+               this.memorySize = memorySize;
+               return this;
+       }
+
+       public MockEnvironmentBuilder 
setInputSplitProvider(MockInputSplitProvider inputSplitProvider) {
+               this.inputSplitProvider = inputSplitProvider;
+               return this;
+       }
+
+       public MockEnvironmentBuilder setBufferSize(int bufferSize) {
+               this.bufferSize = bufferSize;
+               return this;
+       }
+
+       public MockEnvironmentBuilder setTaskStateManager(TaskStateManager 
taskStateManager) {
+               this.taskStateManager = taskStateManager;
+               return this;
+       }
+
+       public MockEnvironmentBuilder setTaskConfiguration(Configuration 
taskConfiguration) {
+               this.taskConfiguration = taskConfiguration;
+               return this;
+       }
+
+       public MockEnvironmentBuilder setExecutionConfig(ExecutionConfig 
executionConfig) {
+               this.executionConfig = executionConfig;
+               return this;
+       }
+
+       public MockEnvironmentBuilder setMaxParallelism(int maxParallelism) {
+               this.maxParallelism = maxParallelism;
+               return this;
+       }
+
+       public MockEnvironmentBuilder setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+               return this;
+       }
+
+       public MockEnvironmentBuilder setSubtaskIndex(int subtaskIndex) {
+               this.subtaskIndex = subtaskIndex;
+               return this;
+       }
+
+       public MockEnvironmentBuilder setUserCodeClassLoader(ClassLoader 
userCodeClassLoader) {
+               this.userCodeClassLoader = userCodeClassLoader;
+               return this;
+       }
+
+       public MockEnvironmentBuilder setJobID(JobID jobID) {
+               this.jobID = jobID;
+               return this;
+       }
+
+       public MockEnvironmentBuilder setJobVertexID(JobVertexID jobVertexID) {
+               this.jobVertexID = jobVertexID;
+               return this;
+       }
+
+       public MockEnvironment build() {
+               return new MockEnvironment(
+                       jobID,
+                       jobVertexID,
+                       taskName,
+                       memorySize,
+                       inputSplitProvider,
+                       bufferSize,
+                       taskConfiguration,
+                       executionConfig,
+                       taskStateManager,
+                       maxParallelism,
+                       parallelism,
+                       subtaskIndex,
+                       userCodeClassLoader);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index a40992c..16485ca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -54,8 +54,12 @@ public abstract class TaskTestBase extends TestLogger {
        public void initEnvironment(long memorySize, int bufferSize) {
                this.memorySize = memorySize;
                this.inputSplitProvider = new MockInputSplitProvider();
-               TestTaskStateManager taskStateManager = new 
TestTaskStateManager();
-               this.mockEnv = new MockEnvironment("mock task", 
this.memorySize, this.inputSplitProvider, bufferSize, taskStateManager);
+               this.mockEnv = new MockEnvironmentBuilder()
+                       .setTaskName("mock task")
+                       .setMemorySize(this.memorySize)
+                       .setInputSplitProvider(this.inputSplitProvider)
+                       .setBufferSize(bufferSize)
+                       .build();
        }
 
        public IteratorWrappingTestSingleInputGate<Record> 
addInput(MutableObjectIterator<Record> input, int groupId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
index 18c8ac5..84a45d8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -66,12 +66,11 @@ public class InputFormatSourceFunctionTest {
                final LifeCycleTestInputFormat format = new 
LifeCycleTestInputFormat();
                final InputFormatSourceFunction<Integer> reader = new 
InputFormatSourceFunction<>(format, TypeInformation.of(Integer.class));
 
-               try (MockEnvironment environment = new MockEnvironment(
-                       "no",
-                       4 * MemoryManager.DEFAULT_PAGE_SIZE,
-                       null,
-                       16,
-                       new TestTaskStateManager())) {
+               try (MockEnvironment environment =
+                               new MockEnvironmentBuilder()
+                                       .setTaskName("no")
+                                       .setMemorySize(4 * 
MemoryManager.DEFAULT_PAGE_SIZE)
+                                       .build()) {
 
                        reader.setRuntimeContext(new MockRuntimeContext(format, 
noOfSplits, environment));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index 6d011a3..a38ffa6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -27,7 +26,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
@@ -36,6 +34,7 @@ import 
org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
@@ -148,20 +147,18 @@ public class StreamOperatorSnapshotRestoreTest extends 
TestLogger {
                LocalRecoveryConfig localRecoveryConfig =
                        new LocalRecoveryConfig(mode != ONLY_JM_RECOVERY, 
directoryProvider);
 
-               MockEnvironment mockEnvironment = new MockEnvironment(
-                       jobID,
-                       jobVertexID,
-                       "test",
-                       1024L * 1024L,
-                       new MockInputSplitProvider(),
-                       1024 * 1024,
-                       new Configuration(),
-                       new ExecutionConfig(),
-                       new TestTaskStateManager(localRecoveryConfig),
-                       MAX_PARALLELISM,
-                       1,
-                       subtaskIdx,
-                       getClass().getClassLoader());
+               MockEnvironment mockEnvironment = new MockEnvironmentBuilder()
+                       .setJobID(jobID)
+                       .setJobVertexID(jobVertexID)
+                       .setTaskName("test")
+                       .setMemorySize(1024L * 1024L)
+                       .setInputSplitProvider(new MockInputSplitProvider())
+                       .setBufferSize(1024 * 1024)
+                       .setTaskStateManager(new 
TestTaskStateManager(localRecoveryConfig))
+                       .setMaxParallelism(MAX_PARALLELISM)
+                       .setSubtaskIndex(subtaskIdx)
+                       .setUserCodeClassLoader(getClass().getClassLoader())
+                       .build();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, 
Integer> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 35e2fbd..cdd77d3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
@@ -650,12 +651,12 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
        @Nonnull
        private MockEnvironment createMockEnvironment() {
-               return new MockEnvironment(
-                       "foobarTask",
-                       1024 * 1024L,
-                       new MockInputSplitProvider(),
-                       4 * 1024,
-                       new TestTaskStateManager());
+               return new MockEnvironmentBuilder()
+                       .setTaskName("foobarTask")
+                       .setMemorySize(1024 * 1024L)
+                       .setInputSplitProvider(new MockInputSplitProvider())
+                       .setBufferSize(4 * 1024)
+                       .build();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
index e980ab7..fd6a953 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SplitStream;
@@ -170,12 +170,12 @@ public class StreamOperatorChainingTest {
        }
 
        private MockEnvironment createMockEnvironment(String taskName) {
-               return new MockEnvironment(
-                       taskName,
-                       3 * 1024 * 1024,
-                       new MockInputSplitProvider(),
-                       1024,
-                       new TestTaskStateManager());
+               return new MockEnvironmentBuilder()
+                       .setTaskName(taskName)
+                       .setMemorySize(3 * 1024 * 1024)
+                       .setInputSplitProvider(new MockInputSplitProvider())
+                       .setBufferSize(1024)
+                       .build();
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index caea662..cb31970 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -61,6 +61,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -332,7 +333,7 @@ public class StreamTaskTest extends TestLogger {
                TaskInfo mockTaskInfo = mock(TaskInfo.class);
                
when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
                when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
-               Environment mockEnvironment = new MockEnvironment();
+               Environment mockEnvironment = new 
MockEnvironmentBuilder().build();
 
                StreamTask<?, ?> streamTask = new 
EmptyStreamTask(mockEnvironment);
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
@@ -402,7 +403,7 @@ public class StreamTaskTest extends TestLogger {
                final long checkpointId = 42L;
                final long timestamp = 1L;
 
-               MockEnvironment mockEnvironment = new MockEnvironment();
+               MockEnvironment mockEnvironment = new 
MockEnvironmentBuilder().build();
                StreamTask<?, ?> streamTask = spy(new 
EmptyStreamTask(mockEnvironment));
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
 
@@ -505,12 +506,10 @@ public class StreamTaskTest extends TestLogger {
                        null,
                        checkpointResponder);
 
-               MockEnvironment mockEnvironment = new MockEnvironment(
-                       "mock-task",
-                       1024 * MemoryManager.DEFAULT_PAGE_SIZE,
-                       null,
-                       16,
-                       taskStateManager);
+               MockEnvironment mockEnvironment = new MockEnvironmentBuilder()
+                       .setTaskName("mock-task")
+                       .setTaskStateManager(taskStateManager)
+                       .build();
 
                StreamTask<?, ?> streamTask = new 
EmptyStreamTask(mockEnvironment);
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
@@ -603,7 +602,7 @@ public class StreamTaskTest extends TestLogger {
                final OneShotLatch createSubtask = new OneShotLatch();
                final OneShotLatch completeSubtask = new OneShotLatch();
 
-               Environment mockEnvironment = spy(new MockEnvironment());
+               Environment mockEnvironment = spy(new 
MockEnvironmentBuilder().build());
 
                whenNew(OperatorSnapshotFinalizer.class).
                        withAnyArguments().
@@ -689,7 +688,7 @@ public class StreamTaskTest extends TestLogger {
                final long checkpointId = 42L;
                final long timestamp = 1L;
 
-               Environment mockEnvironment = spy(new MockEnvironment());
+               Environment mockEnvironment = spy(new 
MockEnvironmentBuilder().build());
 
                // latch blocks until the async checkpoint thread acknowledges
                final OneShotLatch checkpointCompletedLatch = new 
OneShotLatch();
@@ -769,14 +768,14 @@ public class StreamTaskTest extends TestLogger {
                streamConfig.setStreamOperator(new 
BlockingCloseStreamOperator());
                streamConfig.setOperatorID(new OperatorID());
 
-               try (MockEnvironment mockEnvironment = new MockEnvironment(
-                               "Test Task",
-                               32L * 1024L,
-                               new MockInputSplitProvider(),
-                               1,
-                               taskConfiguration,
-                               new ExecutionConfig(),
-                               new TestTaskStateManager())) {
+               try (MockEnvironment mockEnvironment =
+                               new MockEnvironmentBuilder()
+                                       .setTaskName("Test Task")
+                                       .setMemorySize(32L * 1024L)
+                                       .setInputSplitProvider(new 
MockInputSplitProvider())
+                                       .setBufferSize(1)
+                                       .setTaskConfiguration(taskConfiguration)
+                                       .build()) {
                        StreamTask<Void, BlockingCloseStreamOperator> 
streamTask = new NoOpStreamTask<>(mockEnvironment);
                        final AtomicReference<Throwable> atomicThrowable = new 
AtomicReference<>(null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index ed2da18..26ad3ab 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -137,17 +138,15 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                        int subtaskIndex) throws Exception {
                this(
                        operator,
-                       new MockEnvironment(
-                               "MockTask",
-                               3 * 1024 * 1024,
-                               new MockInputSplitProvider(),
-                               1024,
-                               new Configuration(),
-                               new ExecutionConfig(),
-                               new TestTaskStateManager(),
-                               maxParallelism,
-                               parallelism,
-                               subtaskIndex),
+                       new MockEnvironmentBuilder()
+                               .setTaskName("MockTask")
+                               .setMemorySize(3 * 1024 * 1024)
+                               .setInputSplitProvider(new 
MockInputSplitProvider())
+                               .setBufferSize(1024)
+                               .setMaxParallelism(maxParallelism)
+                               .setParallelism(parallelism)
+                               .setSubtaskIndex(subtaskIndex)
+                               .build(),
                        true);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index 3f54081..660a333 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -51,12 +51,13 @@ public class SourceFunctionUtil {
        }
 
        private static <T extends Serializable> List<T> 
runRichSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
-               try (MockEnvironment environment = new MockEnvironment(
-                       "MockTask",
-                       3 * 1024 * 1024,
-                       new MockInputSplitProvider(),
-                       1024,
-                       new TestTaskStateManager())) {
+               try (MockEnvironment environment =
+                               new MockEnvironmentBuilder()
+                                       .setTaskName("MockTask")
+                                       .setMemorySize(3 * 1024 * 1024)
+                                       .setInputSplitProvider(new 
MockInputSplitProvider())
+                                       .setBufferSize(1024)
+                                       .build()) {
 
                        AbstractStreamOperator<?> operator = 
mock(AbstractStreamOperator.class);
                        when(operator.getExecutionConfig()).thenReturn(new 
ExecutionConfig());

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f64d87/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
index aaa96fb..fe56782 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -37,12 +37,12 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateBackendLoader;
-import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
@@ -351,18 +351,17 @@ public class PojoSerializerUpgradeTest extends TestLogger 
{
                        OperatorSubtaskState operatorSubtaskState,
                        Iterable<Long> input) throws Exception {
 
-               try (final MockEnvironment environment = new MockEnvironment(
-                       "test task",
-                       32 * 1024,
-                       new MockInputSplitProvider(),
-                       256,
-                       taskConfiguration,
-                       executionConfig,
-                       new TestTaskStateManager(),
-                       16,
-                       1,
-                       0,
-                       classLoader)) {
+               try (final MockEnvironment environment =
+                               new MockEnvironmentBuilder()
+                                       .setTaskName("test task")
+                                       .setMemorySize(32 * 1024)
+                                       .setInputSplitProvider(new 
MockInputSplitProvider())
+                                       .setBufferSize(256)
+                                       .setTaskConfiguration(taskConfiguration)
+                                       .setExecutionConfig(executionConfig)
+                                       .setMaxParallelism(16)
+                                       .setUserCodeClassLoader(classLoader)
+                                       .build()) {
 
                        OneInputStreamOperatorTestHarness<Long, Long> harness = 
null;
                        try {

Reply via email to