This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 92253f2  [FLINK-14818][benchmark] Fix InputGate setup logic of 
StreamNetworkBenchmarkEnvironment
92253f2 is described below

commit 92253f2e15090f5dac8cfc68c49727b62da23b8c
Author: kevin.cyj <kevin....@alibaba-inc.com>
AuthorDate: Tue Nov 19 11:56:16 2019 +0800

    [FLINK-14818][benchmark] Fix InputGate setup logic of 
StreamNetworkBenchmarkEnvironment
    
    Before this change, in network benchmark (for example 1000 channels 
benchmark with 4 record writers) 
StreamNetworkBenchmarkEnvironment#createInputGate was creating 1000 input gates 
with 4 input channels each, which doesn't make much sense. This commit is 
changing that to a single receiver with 4 input gates and each with 1000 
channels.
    
    It is achieved by providing testing implementations of InputChannels, which 
are using channel index for requesting subpartitions from ResultPartition, 
instead of subpartition index. Thanks to that, we can map a single 
ResultPartition with N subpartitions, to a single instance of InputGate with N 
channels.
    
    The change also influences the benchmark results, overall, the performance 
goes down a bit because of the decrease of floating buffers and the followings 
are benchmark results before and after this change:
    
    
------------------------------------------------------------------Before----------------------------------------------------------------------
    Benchmark                                                                   
  (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
    DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput    
                     N/A  thrpt   30  17079.534 ±  830.532  ops/ms
    
StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput    
                 N/A  thrpt   30    599.664 ±   13.325  ops/ms
    StreamNetworkThroughputBenchmarkExecutor.networkThroughput                  
               100,100ms  thrpt   30  45629.898 ± 1623.455  ops/ms
    StreamNetworkThroughputBenchmarkExecutor.networkThroughput                  
           100,100ms,SSL  thrpt   30   9817.421 ±  216.075  ops/ms
    StreamNetworkThroughputBenchmarkExecutor.networkThroughput                  
                1000,1ms  thrpt   30  25442.152 ±  968.340  ops/ms
    StreamNetworkThroughputBenchmarkExecutor.networkThroughput                  
              1000,100ms  thrpt   30  27944.285 ±  518.106  ops/ms
    StreamNetworkThroughputBenchmarkExecutor.networkThroughput                  
          1000,100ms,SSL  thrpt   30   7820.549 ±  895.862  ops/ms
    StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1                    
                     N/A   avgt   30     13.184 ±    0.093   ms/op
    
    
------------------------------------------------------------------After-----------------------------------------------------------------------
    Benchmark                                                                   
  (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
    DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput    
                     N/A  thrpt   30  17345.574 ±  370.647  ops/ms
    
StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput    
                 N/A  thrpt   30    608.881 ±   12.054  ops/ms
    StreamNetworkThroughputBenchmarkExecutor.networkThroughput                  
               100,100ms  thrpt   30  41732.518 ± 1109.436  ops/ms
    StreamNetworkThroughputBenchmarkExecutor.networkThroughput                  
           100,100ms,SSL  thrpt   30   9689.525 ±  202.895  ops/ms
    StreamNetworkThroughputBenchmarkExecutor.networkThroughput                  
                1000,1ms  thrpt   30  24106.705 ± 2952.364  ops/ms
    StreamNetworkThroughputBenchmarkExecutor.networkThroughput                  
              1000,100ms  thrpt   30  27509.665 ± 3246.965  ops/ms
    StreamNetworkThroughputBenchmarkExecutor.networkThroughput                  
          1000,100ms,SSL  thrpt   30   7691.287 ±  927.775  ops/ms
    StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1                    
                     N/A   avgt   30     12.758 ±    0.147   ms/op
---
 .../partition/consumer/LocalInputChannel.java      |   2 +-
 .../partition/consumer/SingleInputGateFactory.java |  22 ++-
 .../benchmark/SingleInputGateBenchmarkFactory.java | 181 +++++++++++++++++++++
 .../StreamNetworkBenchmarkEnvironment.java         |  40 +++--
 4 files changed, 219 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 9118ab3..62a8c7c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -95,7 +95,7 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
        // 
------------------------------------------------------------------------
 
        @Override
-       void requestSubpartition(int subpartitionIndex) throws IOException, 
InterruptedException {
+       protected void requestSubpartition(int subpartitionIndex) throws 
IOException, InterruptedException {
 
                boolean retriggerRequest = false;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
index a47c8da..dca505d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
@@ -54,23 +54,23 @@ public class SingleInputGateFactory {
        private static final Logger LOG = 
LoggerFactory.getLogger(SingleInputGateFactory.class);
 
        @Nonnull
-       private final ResourceID taskExecutorResourceId;
+       protected final ResourceID taskExecutorResourceId;
 
-       private final int partitionRequestInitialBackoff;
+       protected final int partitionRequestInitialBackoff;
 
-       private final int partitionRequestMaxBackoff;
+       protected final int partitionRequestMaxBackoff;
 
        @Nonnull
-       private final ConnectionManager connectionManager;
+       protected final ConnectionManager connectionManager;
 
        @Nonnull
-       private final ResultPartitionManager partitionManager;
+       protected final ResultPartitionManager partitionManager;
 
        @Nonnull
-       private final TaskEventPublisher taskEventPublisher;
+       protected final TaskEventPublisher taskEventPublisher;
 
        @Nonnull
-       private final NetworkBufferPool networkBufferPool;
+       protected final NetworkBufferPool networkBufferPool;
 
        private final int networkBuffersPerChannel;
 
@@ -198,7 +198,8 @@ public class SingleInputGateFactory {
                                        metrics));
        }
 
-       private InputChannel createKnownInputChannel(
+       @VisibleForTesting
+       protected InputChannel createKnownInputChannel(
                        SingleInputGate inputGate,
                        int index,
                        NettyShuffleDescriptor inputChannelDescriptor,
@@ -243,7 +244,10 @@ public class SingleInputGateFactory {
                return () -> bufferPoolFactory.createBufferPool(0, 
floatingNetworkBuffersPerGate);
        }
 
-       private static class ChannelStatistics {
+       /**
+        * Statistics of input channels.
+        */
+       protected static class ChannelStatistics {
                int numLocalChannels;
                int numRemoteChannels;
                int numUnknownChannels;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java
new file mode 100644
index 0000000..aa1a160
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
+import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
+import 
org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
+
+import java.io.IOException;
+
+/**
+ * A benchmark-specific input gate factory which overrides the respective 
methods of creating
+ * {@link RemoteInputChannel} and {@link LocalInputChannel} for requesting 
specific subpartitions.
+ */
+public class SingleInputGateBenchmarkFactory extends SingleInputGateFactory {
+
+       public SingleInputGateBenchmarkFactory(
+                       ResourceID taskExecutorResourceId,
+                       NettyShuffleEnvironmentConfiguration networkConfig,
+                       ConnectionManager connectionManager,
+                       ResultPartitionManager partitionManager,
+                       TaskEventPublisher taskEventPublisher,
+                       NetworkBufferPool networkBufferPool) {
+               super(
+                       taskExecutorResourceId,
+                       networkConfig,
+                       connectionManager,
+                       partitionManager,
+                       taskEventPublisher,
+                       networkBufferPool);
+       }
+
+       @Override
+       protected InputChannel createKnownInputChannel(
+                       SingleInputGate inputGate,
+                       int index,
+                       NettyShuffleDescriptor inputChannelDescriptor,
+                       SingleInputGateFactory.ChannelStatistics 
channelStatistics,
+                       InputChannelMetrics metrics) {
+               ResultPartitionID partitionId = 
inputChannelDescriptor.getResultPartitionID();
+               if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
+                       return new TestLocalInputChannel(
+                               inputGate,
+                               index,
+                               partitionId,
+                               partitionManager,
+                               taskEventPublisher,
+                               partitionRequestInitialBackoff,
+                               partitionRequestMaxBackoff,
+                               metrics);
+               } else {
+                       return new TestRemoteInputChannel(
+                               inputGate,
+                               index,
+                               partitionId,
+                               inputChannelDescriptor.getConnectionId(),
+                               connectionManager,
+                               partitionRequestInitialBackoff,
+                               partitionRequestMaxBackoff,
+                               metrics,
+                               networkBufferPool);
+               }
+       }
+
+       /**
+        * A {@link LocalInputChannel} which ignores the given subpartition 
index and uses channel index
+        * instead when requesting subpartition.
+        */
+       static class TestLocalInputChannel extends LocalInputChannel {
+
+               private final ResultPartitionID newPartitionID = new 
ResultPartitionID();
+
+               public TestLocalInputChannel(
+                               SingleInputGate inputGate,
+                               int channelIndex,
+                               ResultPartitionID partitionId,
+                               ResultPartitionManager partitionManager,
+                               TaskEventPublisher taskEventPublisher,
+                               int initialBackoff,
+                               int maxBackoff,
+                               InputChannelMetrics metrics) {
+                       super(
+                               inputGate,
+                               channelIndex,
+                               partitionId,
+                               partitionManager,
+                               taskEventPublisher,
+                               initialBackoff,
+                               maxBackoff,
+                               metrics);
+               }
+
+               @Override
+               public void requestSubpartition(int subpartitionIndex) throws 
IOException, InterruptedException {
+                       super.requestSubpartition(channelIndex);
+               }
+
+               @Override
+               public ResultPartitionID getPartitionId() {
+                       // the SingleInputGate assumes that all InputChannels 
are consuming different ResultPartition
+                       // so can be distinguished by ResultPartitionID. 
However, the micro benchmark breaks this and
+                       // all InputChannels in a SingleInputGate consume data 
from the same ResultPartition. To make
+                       // it transparent to SingleInputGate, a new and unique 
ResultPartitionID is returned here
+                       return newPartitionID;
+               }
+       }
+
+       /**
+        * A {@link RemoteInputChannel} which ignores the given subpartition 
index and uses channel index
+        * instead when requesting subpartition.
+        */
+       static class TestRemoteInputChannel extends RemoteInputChannel {
+
+               private final ResultPartitionID newPartitionID = new 
ResultPartitionID();
+
+               public TestRemoteInputChannel(
+                               SingleInputGate inputGate,
+                               int channelIndex,
+                               ResultPartitionID partitionId,
+                               ConnectionID connectionId,
+                               ConnectionManager connectionManager,
+                               int initialBackOff,
+                               int maxBackoff,
+                               InputChannelMetrics metrics,
+                               MemorySegmentProvider memorySegmentProvider) {
+                       super(
+                               inputGate,
+                               channelIndex,
+                               partitionId,
+                               connectionId,
+                               connectionManager,
+                               initialBackOff,
+                               maxBackoff,
+                               metrics,
+                               memorySegmentProvider);
+               }
+
+               @Override
+               public void requestSubpartition(int subpartitionIndex) throws 
IOException, InterruptedException {
+                       super.requestSubpartition(channelIndex);
+               }
+
+               @Override
+               public ResultPartitionID getPartitionId() {
+                       // the SingleInputGate assumes that all InputChannels 
are consuming different ResultPartition
+                       // so can be distinguished by ResultPartitionID. 
However, the micro benchmark breaks this and
+                       // all InputChannels in a SingleInputGate consume data 
from the same ResultPartition. To make
+                       // it transparent to SingleInputGate, a new and unique 
ResultPartitionID is returned here
+                       return newPartitionID;
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index aaeb1b0..60a4ba2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -50,7 +50,6 @@ import 
org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.Arrays;
 
 import static org.apache.flink.util.ExceptionUtils.suppressExceptions;
 
@@ -142,7 +141,7 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
                        receiverEnv.start();
                }
 
-               gateFactory = new SingleInputGateFactory(
+               gateFactory = new SingleInputGateBenchmarkFactory(
                        location,
                        receiverEnv.getConfiguration(),
                        receiverEnv.getConnectionManager(),
@@ -158,6 +157,12 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
                suppressExceptions(receiverEnv::close);
        }
 
+       /**
+        * Note: It should be guaranteed that {@link 
#createResultPartitionWriter(int)} has been
+        * called before creating the receiver. Otherwise it might cause 
unexpected behaviors when
+        * {@link 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException} 
happens
+        * in {@link SingleInputGateBenchmarkFactory.TestRemoteInputChannel}.
+        */
        public SerializingLongReceiver createReceiver() throws Exception {
                TaskManagerLocation senderLocation = new TaskManagerLocation(
                        ResourceID.generate(),
@@ -216,20 +221,20 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
        }
 
        private InputGate createInputGate(TaskManagerLocation senderLocation) 
throws Exception {
-               InputGate[] gates = new InputGate[channels];
-               for (int channel = 0; channel < channels; ++channel) {
+               InputGate[] gates = new InputGate[partitionIds.length];
+               for (int gateIndex = 0; gateIndex < gates.length; ++gateIndex) {
                        final InputGateDeploymentDescriptor gateDescriptor = 
createInputGateDeploymentDescriptor(
                                senderLocation,
-                               channel,
+                               gateIndex,
                                location);
 
-                       final InputGate gate = 
createInputGateWithMetrics(gateFactory, gateDescriptor, channel);
+                       final InputGate gate = 
createInputGateWithMetrics(gateFactory, gateDescriptor, gateIndex);
 
                        gate.setup();
-                       gates[channel] = gate;
+                       gates[gateIndex] = gate;
                }
 
-               if (channels > 1) {
+               if (gates.length > 1) {
                        return new UnionInputGate(gates);
                } else {
                        return gates[0];
@@ -238,18 +243,21 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
 
        private InputGateDeploymentDescriptor 
createInputGateDeploymentDescriptor(
                        TaskManagerLocation senderLocation,
-                       int consumedSubpartitionIndex,
+                       int gateIndex,
                        ResourceID localLocation) {
 
-               final ShuffleDescriptor[] channelDescriptors = 
Arrays.stream(partitionIds)
-                       .map(partitionId ->
-                               createShuffleDescriptor(localMode, partitionId, 
localLocation, senderLocation, consumedSubpartitionIndex))
-                       .toArray(ShuffleDescriptor[]::new);
+               final ShuffleDescriptor[] channelDescriptors = new 
ShuffleDescriptor[channels];
+               for (int channelIndex = 0; channelIndex < channels; 
++channelIndex) {
+                       channelDescriptors[channelIndex] = 
createShuffleDescriptor(
+                               localMode, partitionIds[gateIndex], 
localLocation, senderLocation, channelIndex);
+               }
 
                return new InputGateDeploymentDescriptor(
                        dataSetID,
                        ResultPartitionType.PIPELINED_BOUNDED,
-                       consumedSubpartitionIndex,
+                       // 0 is used because TestRemoteInputChannel and 
TestLocalInputChannel will
+                       // ignore this and use channelIndex instead when 
requesting a subpartition
+                       0,
                        channelDescriptors);
        }
 
@@ -272,11 +280,11 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
                        ResultPartitionID resultPartitionID,
                        ResourceID location,
                        TaskManagerLocation senderLocation,
-                       int channel) {
+                       int connectionIndex) {
                final NettyShuffleDescriptorBuilder builder = 
NettyShuffleDescriptorBuilder.newBuilder()
                        .setId(resultPartitionID)
                        .setProducerInfoFromTaskManagerLocation(senderLocation)
-                       .setConnectionIndex(channel);
+                       .setConnectionIndex(connectionIndex);
                return localMode ? 
builder.setProducerLocation(location).buildLocal() : builder.buildRemote();
        }
 }

Reply via email to