NicoK closed pull request #6670: [FLINK-10301][network] extend 
StreamNetworkBenchmarkEnvironment to allow custom Configuration instances
URL: https://github.com/apache/flink/pull/6670
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
index 580612ca03b..a93e9d2629d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
@@ -40,6 +40,7 @@ public SerializingLongReceiver(InputGate inputGate, int 
expectedRepetitionsOfExp
                        });
        }
 
+       @Override
        protected void readRecords(long lastExpectedRecord) throws Exception {
                LOG.debug("readRecords(lastExpectedRecord = {})", 
lastExpectedRecord);
                final LongValue value = new LongValue();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 6b53488a5d3..bfaed437797 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -68,11 +68,6 @@
  */
 public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
-       private static final int BUFFER_SIZE =
-               
checkedDownCast(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes());
-
-       private static final int NUM_SLOTS_AND_THREADS = 1;
-
        private static final InetAddress LOCAL_ADDRESS;
 
        static {
@@ -96,6 +91,21 @@
 
        protected ResultPartitionID[] partitionIds;
 
+       public void setUp(
+                       int writers,
+                       int channels,
+                       boolean localMode,
+                       int senderBufferPoolSize,
+                       int receiverBufferPoolSize) throws Exception {
+               setUp(
+                       writers,
+                       channels,
+                       localMode,
+                       senderBufferPoolSize,
+                       receiverBufferPoolSize,
+                       new Configuration());
+       }
+
        /**
         * Sets up the environment including buffer pools and netty threads.
         *
@@ -115,7 +125,8 @@ public void setUp(
                        int channels,
                        boolean localMode,
                        int senderBufferPoolSize,
-                       int receiverBufferPoolSize) throws Exception {
+                       int receiverBufferPoolSize,
+                       Configuration config) throws Exception {
                this.localMode = localMode;
                this.channels = channels;
                this.partitionIds = new ResultPartitionID[writers];
@@ -128,13 +139,13 @@ public void setUp(
 
                ioManager = new IOManagerAsync();
 
-               senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize);
+               senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize, 
config);
                senderEnv.start();
                if (localMode && senderBufferPoolSize == 
receiverBufferPoolSize) {
                        receiverEnv = senderEnv;
                }
                else {
-                       receiverEnv = 
createNettyNetworkEnvironment(receiverBufferPoolSize);
+                       receiverEnv = 
createNettyNetworkEnvironment(receiverBufferPoolSize, config);
                        receiverEnv.start();
                }
 
@@ -179,12 +190,25 @@ private void generatePartitionIds() throws Exception {
        }
 
        private NetworkEnvironment createNettyNetworkEnvironment(
-                       @SuppressWarnings("SameParameterValue") int 
bufferPoolSize) throws Exception {
+                       @SuppressWarnings("SameParameterValue") int 
bufferPoolSize, Configuration config) throws Exception {
+
+               int segmentSize =
+                       checkedDownCast(
+                               
MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE))
+                                       .getBytes());
+
+               // we need this because many configs have been written with a 
"-1" entry
+               // similar to 
TaskManagerServicesConfiguration#fromConfiguration()
+               // -> please note that this directly influences the number of 
netty threads!
+               int slots = 
config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+               if (slots == -1) {
+                       slots = 1;
+               }
 
-               final NetworkBufferPool bufferPool = new 
NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
+               final NetworkBufferPool bufferPool = new 
NetworkBufferPool(bufferPoolSize, segmentSize);
 
                final NettyConnectionManager nettyConnectionManager = new 
NettyConnectionManager(
-                       new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, 
NUM_SLOTS_AND_THREADS, new Configuration()));
+                       new NettyConfig(LOCAL_ADDRESS, 0, segmentSize, slots, 
config));
 
                return new NetworkEnvironment(
                        bufferPool,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index 6b96c62e04a..a8d18e4ef10 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.io.benchmark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.types.LongValue;
 
@@ -61,6 +62,10 @@ public void executeBenchmark(long records, boolean 
flushAfterLastEmit) throws Ex
                recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
        }
 
+       public void setUp(long flushTimeout) throws Exception {
+               setUp(flushTimeout, new Configuration());
+       }
+
        /**
         * Initializes the throughput benchmark with the given parameters.
         *
@@ -68,9 +73,9 @@ public void executeBenchmark(long records, boolean 
flushAfterLastEmit) throws Ex
         *              output flushing interval of the
         *              {@link 
org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher 
thread
         */
-       public void setUp(long flushTimeout) throws Exception {
+       public void setUp(long flushTimeout, Configuration config) throws 
Exception {
                environment = new StreamNetworkBenchmarkEnvironment<>();
-               environment.setUp(1, 1, false, -1, -1);
+               environment.setUp(1, 1, false, -1, -1, config);
 
                receiver = environment.createReceiver();
                recordWriter = environment.createRecordWriter(0, flushTimeout);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index 1b0ef8af438..28d7f3556a0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.io.benchmark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.LongValue;
 
 import java.util.concurrent.CompletableFuture;
@@ -60,7 +61,25 @@ public void setUp(int recordWriters, int channels, int 
flushTimeout) throws Exce
        }
 
        public void setUp(int recordWriters, int channels, int flushTimeout, 
boolean localMode) throws Exception {
-               setUp(recordWriters, channels, flushTimeout, false, -1, -1);
+               setUp(recordWriters, channels, flushTimeout, localMode, -1, -1);
+       }
+
+       public void setUp(
+                       int recordWriters,
+                       int channels,
+                       int flushTimeout,
+                       boolean localMode,
+                       int senderBufferPoolSize,
+                       int receiverBufferPoolSize) throws Exception {
+               setUp(
+                       recordWriters,
+                       channels,
+                       flushTimeout,
+                       localMode,
+                       senderBufferPoolSize,
+                       receiverBufferPoolSize,
+                       new Configuration()
+               );
        }
 
        /**
@@ -78,9 +97,10 @@ public void setUp(
                        int flushTimeout,
                        boolean localMode,
                        int senderBufferPoolSize,
-                       int receiverBufferPoolSize) throws Exception {
+                       int receiverBufferPoolSize,
+                       Configuration config) throws Exception {
                environment = new StreamNetworkBenchmarkEnvironment<>();
-               environment.setUp(recordWriters, channels, localMode, 
senderBufferPoolSize, receiverBufferPoolSize);
+               environment.setUp(recordWriters, channels, localMode, 
senderBufferPoolSize, receiverBufferPoolSize, config);
                receiver = environment.createReceiver();
                writerThreads = new LongRecordWriterThread[recordWriters];
                for (int writer = 0; writer < recordWriters; writer++) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to