This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 92253f2 [FLINK-14818][benchmark] Fix InputGate setup logic of StreamNetworkBenchmarkEnvironment 92253f2 is described below commit 92253f2e15090f5dac8cfc68c49727b62da23b8c Author: kevin.cyj <kevin....@alibaba-inc.com> AuthorDate: Tue Nov 19 11:56:16 2019 +0800 [FLINK-14818][benchmark] Fix InputGate setup logic of StreamNetworkBenchmarkEnvironment Before this change, in network benchmark (for example 1000 channels benchmark with 4 record writers) StreamNetworkBenchmarkEnvironment#createInputGate was creating 1000 input gates with 4 input channels each, which doesn't make much sense. This commit is changing that to a single receiver with 4 input gates and each with 1000 channels. It is achieved by providing testing implementations of InputChannels, which are using channel index for requesting subpartitions from ResultPartition, instead of subpartition index. Thanks to that, we can map a single ResultPartition with N subpartitions, to a single instance of InputGate with N channels. The change also influences the benchmark results, overall, the performance goes down a bit because of the decrease of floating buffers and the followings are benchmark results before and after this change: ------------------------------------------------------------------Before---------------------------------------------------------------------- Benchmark (channelsFlushTimeout) Mode Cnt Score Error Units DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput N/A thrpt 30 17079.534 ± 830.532 ops/ms StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput N/A thrpt 30 599.664 ± 13.325 ops/ms StreamNetworkThroughputBenchmarkExecutor.networkThroughput 100,100ms thrpt 30 45629.898 ± 1623.455 ops/ms StreamNetworkThroughputBenchmarkExecutor.networkThroughput 100,100ms,SSL thrpt 30 9817.421 ± 216.075 ops/ms StreamNetworkThroughputBenchmarkExecutor.networkThroughput 1000,1ms thrpt 30 25442.152 ± 968.340 ops/ms StreamNetworkThroughputBenchmarkExecutor.networkThroughput 1000,100ms thrpt 30 27944.285 ± 518.106 ops/ms StreamNetworkThroughputBenchmarkExecutor.networkThroughput 1000,100ms,SSL thrpt 30 7820.549 ± 895.862 ops/ms StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1 N/A avgt 30 13.184 ± 0.093 ms/op ------------------------------------------------------------------After----------------------------------------------------------------------- Benchmark (channelsFlushTimeout) Mode Cnt Score Error Units DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput N/A thrpt 30 17345.574 ± 370.647 ops/ms StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput N/A thrpt 30 608.881 ± 12.054 ops/ms StreamNetworkThroughputBenchmarkExecutor.networkThroughput 100,100ms thrpt 30 41732.518 ± 1109.436 ops/ms StreamNetworkThroughputBenchmarkExecutor.networkThroughput 100,100ms,SSL thrpt 30 9689.525 ± 202.895 ops/ms StreamNetworkThroughputBenchmarkExecutor.networkThroughput 1000,1ms thrpt 30 24106.705 ± 2952.364 ops/ms StreamNetworkThroughputBenchmarkExecutor.networkThroughput 1000,100ms thrpt 30 27509.665 ± 3246.965 ops/ms StreamNetworkThroughputBenchmarkExecutor.networkThroughput 1000,100ms,SSL thrpt 30 7691.287 ± 927.775 ops/ms StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1 N/A avgt 30 12.758 ± 0.147 ms/op --- .../partition/consumer/LocalInputChannel.java | 2 +- .../partition/consumer/SingleInputGateFactory.java | 22 ++- .../benchmark/SingleInputGateBenchmarkFactory.java | 181 +++++++++++++++++++++ .../StreamNetworkBenchmarkEnvironment.java | 40 +++-- 4 files changed, 219 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 9118ab3..62a8c7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -95,7 +95,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit // ------------------------------------------------------------------------ @Override - void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException { + protected void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException { boolean retriggerRequest = false; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java index a47c8da..dca505d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java @@ -54,23 +54,23 @@ public class SingleInputGateFactory { private static final Logger LOG = LoggerFactory.getLogger(SingleInputGateFactory.class); @Nonnull - private final ResourceID taskExecutorResourceId; + protected final ResourceID taskExecutorResourceId; - private final int partitionRequestInitialBackoff; + protected final int partitionRequestInitialBackoff; - private final int partitionRequestMaxBackoff; + protected final int partitionRequestMaxBackoff; @Nonnull - private final ConnectionManager connectionManager; + protected final ConnectionManager connectionManager; @Nonnull - private final ResultPartitionManager partitionManager; + protected final ResultPartitionManager partitionManager; @Nonnull - private final TaskEventPublisher taskEventPublisher; + protected final TaskEventPublisher taskEventPublisher; @Nonnull - private final NetworkBufferPool networkBufferPool; + protected final NetworkBufferPool networkBufferPool; private final int networkBuffersPerChannel; @@ -198,7 +198,8 @@ public class SingleInputGateFactory { metrics)); } - private InputChannel createKnownInputChannel( + @VisibleForTesting + protected InputChannel createKnownInputChannel( SingleInputGate inputGate, int index, NettyShuffleDescriptor inputChannelDescriptor, @@ -243,7 +244,10 @@ public class SingleInputGateFactory { return () -> bufferPoolFactory.createBufferPool(0, floatingNetworkBuffersPerGate); } - private static class ChannelStatistics { + /** + * Statistics of input channels. + */ + protected static class ChannelStatistics { int numLocalChannels; int numRemoteChannels; int numUnknownChannels; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java new file mode 100644 index 0000000..aa1a160 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +import org.apache.flink.core.memory.MemorySegmentProvider; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.TaskEventPublisher; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory; +import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; +import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; + +import java.io.IOException; + +/** + * A benchmark-specific input gate factory which overrides the respective methods of creating + * {@link RemoteInputChannel} and {@link LocalInputChannel} for requesting specific subpartitions. + */ +public class SingleInputGateBenchmarkFactory extends SingleInputGateFactory { + + public SingleInputGateBenchmarkFactory( + ResourceID taskExecutorResourceId, + NettyShuffleEnvironmentConfiguration networkConfig, + ConnectionManager connectionManager, + ResultPartitionManager partitionManager, + TaskEventPublisher taskEventPublisher, + NetworkBufferPool networkBufferPool) { + super( + taskExecutorResourceId, + networkConfig, + connectionManager, + partitionManager, + taskEventPublisher, + networkBufferPool); + } + + @Override + protected InputChannel createKnownInputChannel( + SingleInputGate inputGate, + int index, + NettyShuffleDescriptor inputChannelDescriptor, + SingleInputGateFactory.ChannelStatistics channelStatistics, + InputChannelMetrics metrics) { + ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID(); + if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) { + return new TestLocalInputChannel( + inputGate, + index, + partitionId, + partitionManager, + taskEventPublisher, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + metrics); + } else { + return new TestRemoteInputChannel( + inputGate, + index, + partitionId, + inputChannelDescriptor.getConnectionId(), + connectionManager, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + metrics, + networkBufferPool); + } + } + + /** + * A {@link LocalInputChannel} which ignores the given subpartition index and uses channel index + * instead when requesting subpartition. + */ + static class TestLocalInputChannel extends LocalInputChannel { + + private final ResultPartitionID newPartitionID = new ResultPartitionID(); + + public TestLocalInputChannel( + SingleInputGate inputGate, + int channelIndex, + ResultPartitionID partitionId, + ResultPartitionManager partitionManager, + TaskEventPublisher taskEventPublisher, + int initialBackoff, + int maxBackoff, + InputChannelMetrics metrics) { + super( + inputGate, + channelIndex, + partitionId, + partitionManager, + taskEventPublisher, + initialBackoff, + maxBackoff, + metrics); + } + + @Override + public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException { + super.requestSubpartition(channelIndex); + } + + @Override + public ResultPartitionID getPartitionId() { + // the SingleInputGate assumes that all InputChannels are consuming different ResultPartition + // so can be distinguished by ResultPartitionID. However, the micro benchmark breaks this and + // all InputChannels in a SingleInputGate consume data from the same ResultPartition. To make + // it transparent to SingleInputGate, a new and unique ResultPartitionID is returned here + return newPartitionID; + } + } + + /** + * A {@link RemoteInputChannel} which ignores the given subpartition index and uses channel index + * instead when requesting subpartition. + */ + static class TestRemoteInputChannel extends RemoteInputChannel { + + private final ResultPartitionID newPartitionID = new ResultPartitionID(); + + public TestRemoteInputChannel( + SingleInputGate inputGate, + int channelIndex, + ResultPartitionID partitionId, + ConnectionID connectionId, + ConnectionManager connectionManager, + int initialBackOff, + int maxBackoff, + InputChannelMetrics metrics, + MemorySegmentProvider memorySegmentProvider) { + super( + inputGate, + channelIndex, + partitionId, + connectionId, + connectionManager, + initialBackOff, + maxBackoff, + metrics, + memorySegmentProvider); + } + + @Override + public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException { + super.requestSubpartition(channelIndex); + } + + @Override + public ResultPartitionID getPartitionId() { + // the SingleInputGate assumes that all InputChannels are consuming different ResultPartition + // so can be distinguished by ResultPartitionID. However, the micro benchmark breaks this and + // all InputChannels in a SingleInputGate consume data from the same ResultPartition. To make + // it transparent to SingleInputGate, a new and unique ResultPartitionID is returned here + return newPartitionID; + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java index aaeb1b0..60a4ba2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java @@ -50,7 +50,6 @@ import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.Arrays; import static org.apache.flink.util.ExceptionUtils.suppressExceptions; @@ -142,7 +141,7 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> { receiverEnv.start(); } - gateFactory = new SingleInputGateFactory( + gateFactory = new SingleInputGateBenchmarkFactory( location, receiverEnv.getConfiguration(), receiverEnv.getConnectionManager(), @@ -158,6 +157,12 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> { suppressExceptions(receiverEnv::close); } + /** + * Note: It should be guaranteed that {@link #createResultPartitionWriter(int)} has been + * called before creating the receiver. Otherwise it might cause unexpected behaviors when + * {@link org.apache.flink.runtime.io.network.partition.PartitionNotFoundException} happens + * in {@link SingleInputGateBenchmarkFactory.TestRemoteInputChannel}. + */ public SerializingLongReceiver createReceiver() throws Exception { TaskManagerLocation senderLocation = new TaskManagerLocation( ResourceID.generate(), @@ -216,20 +221,20 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> { } private InputGate createInputGate(TaskManagerLocation senderLocation) throws Exception { - InputGate[] gates = new InputGate[channels]; - for (int channel = 0; channel < channels; ++channel) { + InputGate[] gates = new InputGate[partitionIds.length]; + for (int gateIndex = 0; gateIndex < gates.length; ++gateIndex) { final InputGateDeploymentDescriptor gateDescriptor = createInputGateDeploymentDescriptor( senderLocation, - channel, + gateIndex, location); - final InputGate gate = createInputGateWithMetrics(gateFactory, gateDescriptor, channel); + final InputGate gate = createInputGateWithMetrics(gateFactory, gateDescriptor, gateIndex); gate.setup(); - gates[channel] = gate; + gates[gateIndex] = gate; } - if (channels > 1) { + if (gates.length > 1) { return new UnionInputGate(gates); } else { return gates[0]; @@ -238,18 +243,21 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> { private InputGateDeploymentDescriptor createInputGateDeploymentDescriptor( TaskManagerLocation senderLocation, - int consumedSubpartitionIndex, + int gateIndex, ResourceID localLocation) { - final ShuffleDescriptor[] channelDescriptors = Arrays.stream(partitionIds) - .map(partitionId -> - createShuffleDescriptor(localMode, partitionId, localLocation, senderLocation, consumedSubpartitionIndex)) - .toArray(ShuffleDescriptor[]::new); + final ShuffleDescriptor[] channelDescriptors = new ShuffleDescriptor[channels]; + for (int channelIndex = 0; channelIndex < channels; ++channelIndex) { + channelDescriptors[channelIndex] = createShuffleDescriptor( + localMode, partitionIds[gateIndex], localLocation, senderLocation, channelIndex); + } return new InputGateDeploymentDescriptor( dataSetID, ResultPartitionType.PIPELINED_BOUNDED, - consumedSubpartitionIndex, + // 0 is used because TestRemoteInputChannel and TestLocalInputChannel will + // ignore this and use channelIndex instead when requesting a subpartition + 0, channelDescriptors); } @@ -272,11 +280,11 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> { ResultPartitionID resultPartitionID, ResourceID location, TaskManagerLocation senderLocation, - int channel) { + int connectionIndex) { final NettyShuffleDescriptorBuilder builder = NettyShuffleDescriptorBuilder.newBuilder() .setId(resultPartitionID) .setProducerInfoFromTaskManagerLocation(senderLocation) - .setConnectionIndex(channel); + .setConnectionIndex(connectionIndex); return localMode ? builder.setProducerLocation(location).buildLocal() : builder.buildRemote(); } }