zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289698969
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
##########
@@ -101,111 +98,158 @@ public SingleInputGate create(
@Nonnull PartitionProducerStateProvider
partitionProducerStateProvider,
@Nonnull InputChannelMetrics metrics,
@Nonnull Counter numBytesInCounter) {
- final IntermediateDataSetID consumedResultId =
checkNotNull(igdd.getConsumedResultId());
- final ResultPartitionType consumedPartitionType =
checkNotNull(igdd.getConsumedPartitionType());
-
- final int consumedSubpartitionIndex =
igdd.getConsumedSubpartitionIndex();
- checkArgument(consumedSubpartitionIndex >= 0);
-
- final InputChannelDeploymentDescriptor[] icdd =
checkNotNull(igdd.getInputChannelDeploymentDescriptors());
-
- final SingleInputGate inputGate = new SingleInputGate(
+ SingleInputGate inputGate = new SingleInputGate(
owningTaskName,
- consumedResultId,
- consumedPartitionType,
- consumedSubpartitionIndex,
- icdd.length,
+ igdd.getConsumedResultId(),
+ igdd.getConsumedPartitionType(),
+ igdd.getConsumedSubpartitionIndex(),
+ igdd.getInputChannelDescriptors().length,
partitionProducerStateProvider,
numBytesInCounter,
isCreditBased,
- createBufferPoolFactory(icdd.length,
consumedPartitionType));
+
createBufferPoolFactory(igdd.getInputChannelDescriptors().length,
igdd.getConsumedPartitionType()));
+ createInputChannels(owningTaskName, igdd, inputGate, metrics);
+ return inputGate;
+ }
+
+ private void createInputChannels(
+ String owningTaskName,
+ InputGateDeploymentDescriptor
inputGateDeploymentDescriptor,
+ SingleInputGate inputGate,
+ InputChannelMetrics metrics) {
+ ShuffleDescriptor[] inputChannelDescriptors =
+
inputGateDeploymentDescriptor.getInputChannelDescriptors();
// Create the input channels. There is one input channel for
each consumed partition.
- final InputChannel[] inputChannels = new
InputChannel[icdd.length];
+ InputChannel[] inputChannels = new
InputChannel[inputChannelDescriptors.length];
- int numLocalChannels = 0;
- int numRemoteChannels = 0;
- int numUnknownChannels = 0;
+ ChannelStatistics channelStatistics = new ChannelStatistics();
for (int i = 0; i < inputChannels.length; i++) {
- final ResultPartitionID partitionId =
icdd[i].getConsumedPartitionId();
- final ResultPartitionLocation partitionLocation =
icdd[i].getConsumedPartitionLocation();
-
- if (partitionLocation.isLocal()) {
- inputChannels[i] = new
LocalInputChannel(inputGate, i, partitionId,
- partitionManager,
- taskEventPublisher,
- partitionRequestInitialBackoff,
- partitionRequestMaxBackoff,
- metrics);
-
- numLocalChannels++;
- }
- else if (partitionLocation.isRemote()) {
- inputChannels[i] = new
RemoteInputChannel(inputGate, i, partitionId,
- partitionLocation.getConnectionId(),
- connectionManager,
- partitionRequestInitialBackoff,
- partitionRequestMaxBackoff,
- metrics,
- networkBufferPool);
-
- numRemoteChannels++;
- }
- else if (partitionLocation.isUnknown()) {
- inputChannels[i] = new
UnknownInputChannel(inputGate, i, partitionId,
- partitionManager,
- taskEventPublisher,
- connectionManager,
- partitionRequestInitialBackoff,
- partitionRequestMaxBackoff,
- metrics,
- networkBufferPool);
-
- numUnknownChannels++;
- }
- else {
- throw new IllegalStateException("Unexpected
partition location.");
- }
-
- inputGate.setInputChannel(partitionId.getPartitionId(),
inputChannels[i]);
+ inputChannels[i] = createInputChannel(
+ inputGate,
+ i,
+
inputGateDeploymentDescriptor.getConsumerLocation(),
+ inputChannelDescriptors[i],
+ channelStatistics,
+ metrics);
+ ResultPartitionID resultPartitionID =
inputChannels[i].getPartitionId();
+
inputGate.setInputChannel(resultPartitionID.getPartitionId(), inputChannels[i]);
}
LOG.debug("{}: Created {} input channels (local: {}, remote:
{}, unknown: {}).",
owningTaskName,
inputChannels.length,
- numLocalChannels,
- numRemoteChannels,
- numUnknownChannels);
+ channelStatistics.numLocalChannels,
Review comment:
maybe define `ChannelStatistics.toString()` instead?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services