This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new edebb10  [FLINK-11721][network] Remove IOMode from NetworkEnvironment
edebb10 is described below

commit edebb108d0d0477efba81e55b07339755739dd39
Author: Zhijiang <[email protected]>
AuthorDate: Fri Feb 22 14:20:11 2019 +0800

    [FLINK-11721][network] Remove IOMode from NetworkEnvironment
---
 .../apache/flink/runtime/io/network/NetworkEnvironment.java  | 12 ------------
 .../flink/runtime/taskexecutor/TaskManagerServices.java      |  1 -
 .../apache/flink/runtime/taskmanager/TaskAsyncCallTest.java  |  2 --
 .../java/org/apache/flink/runtime/taskmanager/TaskTest.java  |  3 ---
 .../io/benchmark/StreamNetworkBenchmarkEnvironment.java      |  2 --
 .../apache/flink/streaming/runtime/tasks/StreamTaskTest.java |  1 -
 6 files changed, 21 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 89e23da..b18f844 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.io.network;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
@@ -74,8 +72,6 @@ public class NetworkEnvironment {
        /** Registry for {@link InternalKvState} instances. */
        private final KvStateRegistry kvStateRegistry;
 
-       private final IOManager.IOMode defaultIOMode;
-
        private final int partitionRequestInitialBackoff;
 
        private final int partitionRequestMaxBackoff;
@@ -106,7 +102,6 @@ public class NetworkEnvironment {
                        new KvStateRegistry(),
                        null,
                        null,
-                       IOManager.IOMode.SYNC,
                        partitionRequestInitialBackoff,
                        partitionRequestMaxBackoff,
                        networkBuffersPerChannel,
@@ -122,7 +117,6 @@ public class NetworkEnvironment {
                KvStateRegistry kvStateRegistry,
                KvStateServer kvStateServer,
                KvStateClientProxy kvStateClientProxy,
-               IOMode defaultIOMode,
                int partitionRequestInitialBackoff,
                int partitionRequestMaxBackoff,
                int networkBuffersPerChannel,
@@ -138,8 +132,6 @@ public class NetworkEnvironment {
                this.kvStateServer = kvStateServer;
                this.kvStateProxy = kvStateClientProxy;
 
-               this.defaultIOMode = defaultIOMode;
-
                this.partitionRequestInitialBackoff = 
partitionRequestInitialBackoff;
                this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
 
@@ -170,10 +162,6 @@ public class NetworkEnvironment {
                return networkBufferPool;
        }
 
-       public IOMode getDefaultIOMode() {
-               return defaultIOMode;
-       }
-
        public int getPartitionRequestInitialBackoff() {
                return partitionRequestInitialBackoff;
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index cfce18d..8c566f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -460,7 +460,6 @@ public class TaskManagerServices {
                        kvStateRegistry,
                        kvStateServer,
                        kvClientProxy,
-                       networkEnvironmentConfiguration.ioMode(),
                        
networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
                        
networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
                        
networkEnvironmentConfiguration.networkBuffersPerChannel(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 66c6804..353556e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -55,7 +55,6 @@ import 
org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TestTaskStateManager;
-import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.SerializedValue;
@@ -225,7 +224,6 @@ public class TaskAsyncCallTest extends TestLogger {
                TaskEventDispatcher taskEventDispatcher = 
mock(TaskEventDispatcher.class);
                NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
                
when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
-               
when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                
when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
                
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 8988644..3447851 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -258,7 +258,6 @@ public class TaskTest extends TestLogger {
 
                final NetworkEnvironment network = 
mock(NetworkEnvironment.class);
                
when(network.getResultPartitionManager()).thenReturn(partitionManager);
-               
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
                doThrow(new 
RuntimeException("buffers")).when(network).registerTask(any(Task.class));
 
@@ -576,7 +575,6 @@ public class TaskTest extends TestLogger {
                final ResultPartitionConsumableNotifier consumableNotifier = 
new NoOpResultPartitionConsumableNotifier();
                final NetworkEnvironment network = 
mock(NetworkEnvironment.class);
                
when(network.getResultPartitionManager()).thenReturn(mock(ResultPartitionManager.class));
-               
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                        .thenReturn(mock(TaskKvStateRegistry.class));
                
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
@@ -950,7 +948,6 @@ public class TaskTest extends TestLogger {
                        final TaskEventDispatcher taskEventDispatcher = 
mock(TaskEventDispatcher.class);
                        networkEnvironment = mock(NetworkEnvironment.class);
                        
when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
-                       
when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                        
when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
                        
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 3a69875..ed14db4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -222,7 +221,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
                        new KvStateRegistry(),
                        null,
                        null,
-                       IOMode.SYNC,
                        
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
                        
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
                        
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 1ce1c46..372a9d8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -901,7 +901,6 @@ public class StreamTaskTest extends TestLogger {
 
                NetworkEnvironment network = mock(NetworkEnvironment.class);
                
when(network.getResultPartitionManager()).thenReturn(partitionManager);
-               
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
                
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);

Reply via email to