pnowojski closed pull request #6891: [FLINK-10606][network][test] Construct
NetworkEnvironment simple for tests
URL: https://github.com/apache/flink/pull/6891
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index f2547563872..1363175e96e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -90,19 +90,43 @@
private boolean isShutdown;
public NetworkEnvironment(
- NetworkBufferPool networkBufferPool,
- ConnectionManager connectionManager,
- ResultPartitionManager resultPartitionManager,
- TaskEventDispatcher taskEventDispatcher,
- KvStateRegistry kvStateRegistry,
- KvStateServer kvStateServer,
- KvStateClientProxy kvStateClientProxy,
- IOMode defaultIOMode,
- int partitionRequestInitialBackoff,
- int partitionRequestMaxBackoff,
- int networkBuffersPerChannel,
- int extraNetworkBuffersPerGate,
- boolean enableCreditBased) {
+ int numBuffers,
+ int memorySegmentSize,
+ int partitionRequestInitialBackoff,
+ int partitionRequestMaxBackoff,
+ int networkBuffersPerChannel,
+ int extraNetworkBuffersPerGate,
+ boolean enableCreditBased) {
+ this(
+ new NetworkBufferPool(numBuffers, memorySegmentSize),
+ new LocalConnectionManager(),
+ new ResultPartitionManager(),
+ new TaskEventDispatcher(),
+ new KvStateRegistry(),
+ null,
+ null,
+ IOManager.IOMode.SYNC,
+ partitionRequestInitialBackoff,
+ partitionRequestMaxBackoff,
+ networkBuffersPerChannel,
+ extraNetworkBuffersPerGate,
+ enableCreditBased);
+ }
+
+ public NetworkEnvironment(
+ NetworkBufferPool networkBufferPool,
+ ConnectionManager connectionManager,
+ ResultPartitionManager resultPartitionManager,
+ TaskEventDispatcher taskEventDispatcher,
+ KvStateRegistry kvStateRegistry,
+ KvStateServer kvStateServer,
+ KvStateClientProxy kvStateClientProxy,
+ IOMode defaultIOMode,
+ int partitionRequestInitialBackoff,
+ int partitionRequestMaxBackoff,
+ int networkBuffersPerChannel,
+ int extraNetworkBuffersPerGate,
+ boolean enableCreditBased) {
this.networkBufferPool = checkNotNull(networkBufferPool);
this.connectionManager = checkNotNull(connectionManager);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index f0f1926b008..8c2fb7a15f0 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -21,7 +21,6 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -31,7 +30,6 @@
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskActions;
@@ -79,21 +77,8 @@
*/
@Test
public void testRegisterTaskUsesBoundedBuffers() throws Exception {
-
final NetworkEnvironment network = new NetworkEnvironment(
- new NetworkBufferPool(numBuffers, memorySegmentSize),
- new LocalConnectionManager(),
- new ResultPartitionManager(),
- new TaskEventDispatcher(),
- new KvStateRegistry(),
- null,
- null,
- IOManager.IOMode.SYNC,
- 0,
- 0,
- 2,
- 8,
- enableCreditBasedFlowControl);
+ numBuffers, memorySegmentSize, 0, 0, 2, 8,
enableCreditBasedFlowControl);
// result partitions
ResultPartition rp1 =
createResultPartition(ResultPartitionType.PIPELINED, 2);
@@ -197,19 +182,7 @@ public void testRegisterTaskWithInsufficientBuffers()
throws Exception {
private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize)
throws Exception {
final NetworkEnvironment network = new NetworkEnvironment(
- new NetworkBufferPool(bufferPoolSize,
memorySegmentSize),
- new LocalConnectionManager(),
- new ResultPartitionManager(),
- new TaskEventDispatcher(),
- new KvStateRegistry(),
- null,
- null,
- IOManager.IOMode.SYNC,
- 0,
- 0,
- 2,
- 8,
- enableCreditBasedFlowControl);
+ bufferPoolSize, memorySegmentSize, 0, 0, 2, 8,
enableCreditBasedFlowControl);
final ConnectionManager connManager =
createDummyConnectionManager();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 4bf5b220be8..63f18551130 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -25,7 +25,6 @@
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
@@ -45,7 +44,6 @@
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Test;
@@ -71,7 +69,6 @@
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -344,7 +341,8 @@ public void testRequestBackoffConfiguration() throws
Exception {
int initialBackoff = 137;
int maxBackoff = 1001;
- final NetworkEnvironment netEnv = createNetworkEnvironment(2,
8, initialBackoff, maxBackoff);
+ final NetworkEnvironment netEnv = new NetworkEnvironment(
+ 100, 32, initialBackoff, maxBackoff, 2, 8,
enableCreditBasedFlowControl);
SingleInputGate gate = SingleInputGate.create(
"TestTask",
@@ -403,8 +401,8 @@ public void testRequestBuffersWithRemoteInputChannel()
throws Exception {
final SingleInputGate inputGate = createInputGate(1,
ResultPartitionType.PIPELINED_BOUNDED);
int buffersPerChannel = 2;
int extraNetworkBuffersPerGate = 8;
- final NetworkEnvironment network =
createNetworkEnvironment(buffersPerChannel,
- extraNetworkBuffersPerGate, 0, 0);
+ final NetworkEnvironment network = new NetworkEnvironment(
+ 100, 32, 0, 0, buffersPerChannel,
extraNetworkBuffersPerGate, enableCreditBasedFlowControl);
try {
final ResultPartitionID resultPartitionId = new
ResultPartitionID();
@@ -415,8 +413,6 @@ public void testRequestBuffersWithRemoteInputChannel()
throws Exception {
NetworkBufferPool bufferPool =
network.getNetworkBufferPool();
if (enableCreditBasedFlowControl) {
- verify(bufferPool,
-
times(1)).requestMemorySegments(buffersPerChannel);
RemoteInputChannel remote =
(RemoteInputChannel) inputGate.getInputChannels()
.get(resultPartitionId.getPartitionId());
// only the exclusive buffers should be
assigned/available now
@@ -444,7 +440,8 @@ public void testRequestBuffersWithUnknownInputChannel()
throws Exception {
final SingleInputGate inputGate = createInputGate(1,
ResultPartitionType.PIPELINED_BOUNDED);
int buffersPerChannel = 2;
int extraNetworkBuffersPerGate = 8;
- final NetworkEnvironment network =
createNetworkEnvironment(buffersPerChannel, extraNetworkBuffersPerGate, 0, 0);
+ final NetworkEnvironment network = new NetworkEnvironment(
+ 100, 32, 0, 0, buffersPerChannel,
extraNetworkBuffersPerGate, enableCreditBasedFlowControl);
try {
final ResultPartitionID resultPartitionId = new
ResultPartitionID();
@@ -454,8 +451,6 @@ public void testRequestBuffersWithUnknownInputChannel()
throws Exception {
NetworkBufferPool bufferPool =
network.getNetworkBufferPool();
if (enableCreditBasedFlowControl) {
- verify(bufferPool,
times(0)).requestMemorySegments(buffersPerChannel);
-
assertEquals(bufferPool.getTotalNumberOfMemorySegments(),
bufferPool.getNumberOfAvailableMemorySegments());
// note: exclusive buffers are not handed out
into LocalBufferPool and are thus not counted
@@ -471,8 +466,6 @@ public void testRequestBuffersWithUnknownInputChannel()
throws Exception {
ResultPartitionLocation.createRemote(connectionId)));
if (enableCreditBasedFlowControl) {
- verify(bufferPool,
-
times(1)).requestMemorySegments(buffersPerChannel);
RemoteInputChannel remote =
(RemoteInputChannel) inputGate.getInputChannels()
.get(resultPartitionId.getPartitionId());
// only the exclusive buffers should be
assigned/available now
@@ -499,7 +492,8 @@ public void testRequestBuffersWithUnknownInputChannel()
throws Exception {
public void testUpdateUnknownInputChannel() throws Exception {
final SingleInputGate inputGate = createInputGate(2);
int buffersPerChannel = 2;
- final NetworkEnvironment network =
createNetworkEnvironment(buffersPerChannel, 8, 0, 0);
+ final NetworkEnvironment network = new NetworkEnvironment(
+ 100, 32, 0, 0, buffersPerChannel, 8,
enableCreditBasedFlowControl);
try {
final ResultPartitionID localResultPartitionId = new
ResultPartitionID();
@@ -543,27 +537,6 @@ public void testUpdateUnknownInputChannel() throws
Exception {
//
---------------------------------------------------------------------------------------------
- private NetworkEnvironment createNetworkEnvironment(
- int buffersPerChannel,
- int extraNetworkBuffersPerGate,
- int initialBackoff,
- int maxBackoff) {
- return new NetworkEnvironment(
- spy(new NetworkBufferPool(100, 32)),
- new LocalConnectionManager(),
- new ResultPartitionManager(),
- new TaskEventDispatcher(),
- new KvStateRegistry(),
- null,
- null,
- IOManager.IOMode.SYNC,
- initialBackoff,
- maxBackoff,
- buffersPerChannel,
- extraNetworkBuffersPerGate,
- enableCreditBasedFlowControl);
- }
-
private SingleInputGate createInputGate() {
return createInputGate(2);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 9669513a111..c3118c91f9d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -33,11 +33,7 @@
import
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.memory.MemoryManager;
@@ -45,7 +41,6 @@
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -146,14 +141,8 @@ public void testComponentsStartupShutdown() throws
Exception {
final MemoryManager memManager = new
MemoryManager(networkBufNum * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP,
false);
final IOManager ioManager = new IOManagerAsync(TMP_DIR);
final NetworkEnvironment network = new
NetworkEnvironment(
- new NetworkBufferPool(32,
netConf.networkBufferSize()),
- new LocalConnectionManager(),
- new ResultPartitionManager(),
- new TaskEventDispatcher(),
- new KvStateRegistry(),
- null,
- null,
- netConf.ioMode(),
+ 32,
+ netConf.networkBufferSize(),
netConf.partitionRequestInitialBackoff(),
netConf.partitionRequestMaxBackoff(),
netConf.networkBuffersPerChannel(),
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services