This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 933c27f963c5ef53be07a31c8701737b83b28e13
Author: Nico Kruber <[email protected]>
AuthorDate: Fri Sep 7 13:38:54 2018 +0200

    [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow 
custom Configuration instances
    
    This closes #6670.
---
 .../StreamNetworkBenchmarkEnvironment.java         | 46 ++++++++++++++++------
 .../StreamNetworkPointToPointBenchmark.java        |  9 ++++-
 .../StreamNetworkThroughputBenchmark.java          | 24 ++++++++++-
 3 files changed, 64 insertions(+), 15 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 6b53488..bfaed43 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -68,11 +68,6 @@ import static 
org.apache.flink.util.MathUtils.checkedDownCast;
  */
 public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
-       private static final int BUFFER_SIZE =
-               
checkedDownCast(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes());
-
-       private static final int NUM_SLOTS_AND_THREADS = 1;
-
        private static final InetAddress LOCAL_ADDRESS;
 
        static {
@@ -96,6 +91,21 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
 
        protected ResultPartitionID[] partitionIds;
 
+       public void setUp(
+                       int writers,
+                       int channels,
+                       boolean localMode,
+                       int senderBufferPoolSize,
+                       int receiverBufferPoolSize) throws Exception {
+               setUp(
+                       writers,
+                       channels,
+                       localMode,
+                       senderBufferPoolSize,
+                       receiverBufferPoolSize,
+                       new Configuration());
+       }
+
        /**
         * Sets up the environment including buffer pools and netty threads.
         *
@@ -115,7 +125,8 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
                        int channels,
                        boolean localMode,
                        int senderBufferPoolSize,
-                       int receiverBufferPoolSize) throws Exception {
+                       int receiverBufferPoolSize,
+                       Configuration config) throws Exception {
                this.localMode = localMode;
                this.channels = channels;
                this.partitionIds = new ResultPartitionID[writers];
@@ -128,13 +139,13 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
 
                ioManager = new IOManagerAsync();
 
-               senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize);
+               senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize, 
config);
                senderEnv.start();
                if (localMode && senderBufferPoolSize == 
receiverBufferPoolSize) {
                        receiverEnv = senderEnv;
                }
                else {
-                       receiverEnv = 
createNettyNetworkEnvironment(receiverBufferPoolSize);
+                       receiverEnv = 
createNettyNetworkEnvironment(receiverBufferPoolSize, config);
                        receiverEnv.start();
                }
 
@@ -179,12 +190,25 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
        }
 
        private NetworkEnvironment createNettyNetworkEnvironment(
-                       @SuppressWarnings("SameParameterValue") int 
bufferPoolSize) throws Exception {
+                       @SuppressWarnings("SameParameterValue") int 
bufferPoolSize, Configuration config) throws Exception {
+
+               int segmentSize =
+                       checkedDownCast(
+                               
MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE))
+                                       .getBytes());
+
+               // we need this because many configs have been written with a 
"-1" entry
+               // similar to 
TaskManagerServicesConfiguration#fromConfiguration()
+               // -> please note that this directly influences the number of 
netty threads!
+               int slots = 
config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+               if (slots == -1) {
+                       slots = 1;
+               }
 
-               final NetworkBufferPool bufferPool = new 
NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
+               final NetworkBufferPool bufferPool = new 
NetworkBufferPool(bufferPoolSize, segmentSize);
 
                final NettyConnectionManager nettyConnectionManager = new 
NettyConnectionManager(
-                       new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, 
NUM_SLOTS_AND_THREADS, new Configuration()));
+                       new NettyConfig(LOCAL_ADDRESS, 0, segmentSize, slots, 
config));
 
                return new NetworkEnvironment(
                        bufferPool,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index 6b96c62..a8d18e4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.io.benchmark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.types.LongValue;
 
@@ -61,6 +62,10 @@ public class StreamNetworkPointToPointBenchmark {
                recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
        }
 
+       public void setUp(long flushTimeout) throws Exception {
+               setUp(flushTimeout, new Configuration());
+       }
+
        /**
         * Initializes the throughput benchmark with the given parameters.
         *
@@ -68,9 +73,9 @@ public class StreamNetworkPointToPointBenchmark {
         *              output flushing interval of the
         *              {@link 
org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher 
thread
         */
-       public void setUp(long flushTimeout) throws Exception {
+       public void setUp(long flushTimeout, Configuration config) throws 
Exception {
                environment = new StreamNetworkBenchmarkEnvironment<>();
-               environment.setUp(1, 1, false, -1, -1);
+               environment.setUp(1, 1, false, -1, -1, config);
 
                receiver = environment.createReceiver();
                recordWriter = environment.createRecordWriter(0, flushTimeout);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index c55dd43..28d7f35 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.io.benchmark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.LongValue;
 
 import java.util.concurrent.CompletableFuture;
@@ -63,6 +64,24 @@ public class StreamNetworkThroughputBenchmark {
                setUp(recordWriters, channels, flushTimeout, localMode, -1, -1);
        }
 
+       public void setUp(
+                       int recordWriters,
+                       int channels,
+                       int flushTimeout,
+                       boolean localMode,
+                       int senderBufferPoolSize,
+                       int receiverBufferPoolSize) throws Exception {
+               setUp(
+                       recordWriters,
+                       channels,
+                       flushTimeout,
+                       localMode,
+                       senderBufferPoolSize,
+                       receiverBufferPoolSize,
+                       new Configuration()
+               );
+       }
+
        /**
         * Initializes the throughput benchmark with the given parameters.
         *
@@ -78,9 +97,10 @@ public class StreamNetworkThroughputBenchmark {
                        int flushTimeout,
                        boolean localMode,
                        int senderBufferPoolSize,
-                       int receiverBufferPoolSize) throws Exception {
+                       int receiverBufferPoolSize,
+                       Configuration config) throws Exception {
                environment = new StreamNetworkBenchmarkEnvironment<>();
-               environment.setUp(recordWriters, channels, localMode, 
senderBufferPoolSize, receiverBufferPoolSize);
+               environment.setUp(recordWriters, channels, localMode, 
senderBufferPoolSize, receiverBufferPoolSize, config);
                receiver = environment.createReceiver();
                writerThreads = new LongRecordWriterThread[recordWriters];
                for (int writer = 0; writer < recordWriters; writer++) {

Reply via email to