This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 933c27f963c5ef53be07a31c8701737b83b28e13 Author: Nico Kruber <[email protected]> AuthorDate: Fri Sep 7 13:38:54 2018 +0200 [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances This closes #6670. --- .../StreamNetworkBenchmarkEnvironment.java | 46 ++++++++++++++++------ .../StreamNetworkPointToPointBenchmark.java | 9 ++++- .../StreamNetworkThroughputBenchmark.java | 24 ++++++++++- 3 files changed, 64 insertions(+), 15 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java index 6b53488..bfaed43 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java @@ -68,11 +68,6 @@ import static org.apache.flink.util.MathUtils.checkedDownCast; */ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> { - private static final int BUFFER_SIZE = - checkedDownCast(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes()); - - private static final int NUM_SLOTS_AND_THREADS = 1; - private static final InetAddress LOCAL_ADDRESS; static { @@ -96,6 +91,21 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> { protected ResultPartitionID[] partitionIds; + public void setUp( + int writers, + int channels, + boolean localMode, + int senderBufferPoolSize, + int receiverBufferPoolSize) throws Exception { + setUp( + writers, + channels, + localMode, + senderBufferPoolSize, + receiverBufferPoolSize, + new Configuration()); + } + /** * Sets up the environment including buffer pools and netty threads. * @@ -115,7 +125,8 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> { int channels, boolean localMode, int senderBufferPoolSize, - int receiverBufferPoolSize) throws Exception { + int receiverBufferPoolSize, + Configuration config) throws Exception { this.localMode = localMode; this.channels = channels; this.partitionIds = new ResultPartitionID[writers]; @@ -128,13 +139,13 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> { ioManager = new IOManagerAsync(); - senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize); + senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize, config); senderEnv.start(); if (localMode && senderBufferPoolSize == receiverBufferPoolSize) { receiverEnv = senderEnv; } else { - receiverEnv = createNettyNetworkEnvironment(receiverBufferPoolSize); + receiverEnv = createNettyNetworkEnvironment(receiverBufferPoolSize, config); receiverEnv.start(); } @@ -179,12 +190,25 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> { } private NetworkEnvironment createNettyNetworkEnvironment( - @SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception { + @SuppressWarnings("SameParameterValue") int bufferPoolSize, Configuration config) throws Exception { + + int segmentSize = + checkedDownCast( + MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)) + .getBytes()); + + // we need this because many configs have been written with a "-1" entry + // similar to TaskManagerServicesConfiguration#fromConfiguration() + // -> please note that this directly influences the number of netty threads! + int slots = config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1); + if (slots == -1) { + slots = 1; + } - final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE); + final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, segmentSize); final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager( - new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration())); + new NettyConfig(LOCAL_ADDRESS, 0, segmentSize, slots, config)); return new NetworkEnvironment( bufferPool, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java index 6b96c62..a8d18e4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.io.benchmark; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.types.LongValue; @@ -61,6 +62,10 @@ public class StreamNetworkPointToPointBenchmark { recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS); } + public void setUp(long flushTimeout) throws Exception { + setUp(flushTimeout, new Configuration()); + } + /** * Initializes the throughput benchmark with the given parameters. * @@ -68,9 +73,9 @@ public class StreamNetworkPointToPointBenchmark { * output flushing interval of the * {@link org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher thread */ - public void setUp(long flushTimeout) throws Exception { + public void setUp(long flushTimeout, Configuration config) throws Exception { environment = new StreamNetworkBenchmarkEnvironment<>(); - environment.setUp(1, 1, false, -1, -1); + environment.setUp(1, 1, false, -1, -1, config); receiver = environment.createReceiver(); recordWriter = environment.createRecordWriter(0, flushTimeout); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java index c55dd43..28d7f35 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.io.benchmark; +import org.apache.flink.configuration.Configuration; import org.apache.flink.types.LongValue; import java.util.concurrent.CompletableFuture; @@ -63,6 +64,24 @@ public class StreamNetworkThroughputBenchmark { setUp(recordWriters, channels, flushTimeout, localMode, -1, -1); } + public void setUp( + int recordWriters, + int channels, + int flushTimeout, + boolean localMode, + int senderBufferPoolSize, + int receiverBufferPoolSize) throws Exception { + setUp( + recordWriters, + channels, + flushTimeout, + localMode, + senderBufferPoolSize, + receiverBufferPoolSize, + new Configuration() + ); + } + /** * Initializes the throughput benchmark with the given parameters. * @@ -78,9 +97,10 @@ public class StreamNetworkThroughputBenchmark { int flushTimeout, boolean localMode, int senderBufferPoolSize, - int receiverBufferPoolSize) throws Exception { + int receiverBufferPoolSize, + Configuration config) throws Exception { environment = new StreamNetworkBenchmarkEnvironment<>(); - environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize); + environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize, config); receiver = environment.createReceiver(); writerThreads = new LongRecordWriterThread[recordWriters]; for (int writer = 0; writer < recordWriters; writer++) {
