This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new edebb10 [FLINK-11721][network] Remove IOMode from NetworkEnvironment
edebb10 is described below
commit edebb108d0d0477efba81e55b07339755739dd39
Author: Zhijiang <[email protected]>
AuthorDate: Fri Feb 22 14:20:11 2019 +0800
[FLINK-11721][network] Remove IOMode from NetworkEnvironment
---
.../apache/flink/runtime/io/network/NetworkEnvironment.java | 12 ------------
.../flink/runtime/taskexecutor/TaskManagerServices.java | 1 -
.../apache/flink/runtime/taskmanager/TaskAsyncCallTest.java | 2 --
.../java/org/apache/flink/runtime/taskmanager/TaskTest.java | 3 ---
.../io/benchmark/StreamNetworkBenchmarkEnvironment.java | 2 --
.../apache/flink/streaming/runtime/tasks/StreamTaskTest.java | 1 -
6 files changed, 21 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 89e23da..b18f844 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.io.network;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
@@ -74,8 +72,6 @@ public class NetworkEnvironment {
/** Registry for {@link InternalKvState} instances. */
private final KvStateRegistry kvStateRegistry;
- private final IOManager.IOMode defaultIOMode;
-
private final int partitionRequestInitialBackoff;
private final int partitionRequestMaxBackoff;
@@ -106,7 +102,6 @@ public class NetworkEnvironment {
new KvStateRegistry(),
null,
null,
- IOManager.IOMode.SYNC,
partitionRequestInitialBackoff,
partitionRequestMaxBackoff,
networkBuffersPerChannel,
@@ -122,7 +117,6 @@ public class NetworkEnvironment {
KvStateRegistry kvStateRegistry,
KvStateServer kvStateServer,
KvStateClientProxy kvStateClientProxy,
- IOMode defaultIOMode,
int partitionRequestInitialBackoff,
int partitionRequestMaxBackoff,
int networkBuffersPerChannel,
@@ -138,8 +132,6 @@ public class NetworkEnvironment {
this.kvStateServer = kvStateServer;
this.kvStateProxy = kvStateClientProxy;
- this.defaultIOMode = defaultIOMode;
-
this.partitionRequestInitialBackoff =
partitionRequestInitialBackoff;
this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
@@ -170,10 +162,6 @@ public class NetworkEnvironment {
return networkBufferPool;
}
- public IOMode getDefaultIOMode() {
- return defaultIOMode;
- }
-
public int getPartitionRequestInitialBackoff() {
return partitionRequestInitialBackoff;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index cfce18d..8c566f6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -460,7 +460,6 @@ public class TaskManagerServices {
kvStateRegistry,
kvStateServer,
kvClientProxy,
- networkEnvironmentConfiguration.ioMode(),
networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
networkEnvironmentConfiguration.networkBuffersPerChannel(),
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 66c6804..353556e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -55,7 +55,6 @@ import
org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.TestTaskStateManager;
-import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.SerializedValue;
@@ -225,7 +224,6 @@ public class TaskAsyncCallTest extends TestLogger {
TaskEventDispatcher taskEventDispatcher =
mock(TaskEventDispatcher.class);
NetworkEnvironment networkEnvironment =
mock(NetworkEnvironment.class);
when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
-
when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class),
any(JobVertexID.class)))
.thenReturn(mock(TaskKvStateRegistry.class));
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 8988644..3447851 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -258,7 +258,6 @@ public class TaskTest extends TestLogger {
final NetworkEnvironment network =
mock(NetworkEnvironment.class);
when(network.getResultPartitionManager()).thenReturn(partitionManager);
-
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
doThrow(new
RuntimeException("buffers")).when(network).registerTask(any(Task.class));
@@ -576,7 +575,6 @@ public class TaskTest extends TestLogger {
final ResultPartitionConsumableNotifier consumableNotifier =
new NoOpResultPartitionConsumableNotifier();
final NetworkEnvironment network =
mock(NetworkEnvironment.class);
when(network.getResultPartitionManager()).thenReturn(mock(ResultPartitionManager.class));
-
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
when(network.createKvStateTaskRegistry(any(JobID.class),
any(JobVertexID.class)))
.thenReturn(mock(TaskKvStateRegistry.class));
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
@@ -950,7 +948,6 @@ public class TaskTest extends TestLogger {
final TaskEventDispatcher taskEventDispatcher =
mock(TaskEventDispatcher.class);
networkEnvironment = mock(NetworkEnvironment.class);
when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
-
when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class),
any(JobVertexID.class)))
.thenReturn(mock(TaskKvStateRegistry.class));
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 3a69875..ed14db4 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -29,7 +29,6 @@ import
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -222,7 +221,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends
IOReadableWritable> {
new KvStateRegistry(),
null,
null,
- IOMode.SYNC,
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 1ce1c46..372a9d8 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -901,7 +901,6 @@ public class StreamTaskTest extends TestLogger {
NetworkEnvironment network = mock(NetworkEnvironment.class);
when(network.getResultPartitionManager()).thenReturn(partitionManager);
-
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
when(network.createKvStateTaskRegistry(any(JobID.class),
any(JobVertexID.class)))
.thenReturn(mock(TaskKvStateRegistry.class));
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);