zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289702541
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
##########
@@ -101,111 +98,158 @@ public SingleInputGate create(
@Nonnull PartitionProducerStateProvider
partitionProducerStateProvider,
@Nonnull InputChannelMetrics metrics,
@Nonnull Counter numBytesInCounter) {
- final IntermediateDataSetID consumedResultId =
checkNotNull(igdd.getConsumedResultId());
- final ResultPartitionType consumedPartitionType =
checkNotNull(igdd.getConsumedPartitionType());
-
- final int consumedSubpartitionIndex =
igdd.getConsumedSubpartitionIndex();
- checkArgument(consumedSubpartitionIndex >= 0);
-
- final InputChannelDeploymentDescriptor[] icdd =
checkNotNull(igdd.getInputChannelDeploymentDescriptors());
-
- final SingleInputGate inputGate = new SingleInputGate(
+ SingleInputGate inputGate = new SingleInputGate(
owningTaskName,
- consumedResultId,
- consumedPartitionType,
- consumedSubpartitionIndex,
- icdd.length,
+ igdd.getConsumedResultId(),
+ igdd.getConsumedPartitionType(),
+ igdd.getConsumedSubpartitionIndex(),
+ igdd.getInputChannelDescriptors().length,
partitionProducerStateProvider,
numBytesInCounter,
isCreditBased,
- createBufferPoolFactory(icdd.length,
consumedPartitionType));
+
createBufferPoolFactory(igdd.getInputChannelDescriptors().length,
igdd.getConsumedPartitionType()));
+ createInputChannels(owningTaskName, igdd, inputGate, metrics);
+ return inputGate;
+ }
+
+ private void createInputChannels(
+ String owningTaskName,
+ InputGateDeploymentDescriptor
inputGateDeploymentDescriptor,
+ SingleInputGate inputGate,
+ InputChannelMetrics metrics) {
+ ShuffleDescriptor[] inputChannelDescriptors =
+
inputGateDeploymentDescriptor.getInputChannelDescriptors();
// Create the input channels. There is one input channel for
each consumed partition.
- final InputChannel[] inputChannels = new
InputChannel[icdd.length];
+ InputChannel[] inputChannels = new
InputChannel[inputChannelDescriptors.length];
- int numLocalChannels = 0;
- int numRemoteChannels = 0;
- int numUnknownChannels = 0;
+ ChannelStatistics channelStatistics = new ChannelStatistics();
for (int i = 0; i < inputChannels.length; i++) {
- final ResultPartitionID partitionId =
icdd[i].getConsumedPartitionId();
- final ResultPartitionLocation partitionLocation =
icdd[i].getConsumedPartitionLocation();
-
- if (partitionLocation.isLocal()) {
- inputChannels[i] = new
LocalInputChannel(inputGate, i, partitionId,
- partitionManager,
- taskEventPublisher,
- partitionRequestInitialBackoff,
- partitionRequestMaxBackoff,
- metrics);
-
- numLocalChannels++;
- }
- else if (partitionLocation.isRemote()) {
- inputChannels[i] = new
RemoteInputChannel(inputGate, i, partitionId,
- partitionLocation.getConnectionId(),
- connectionManager,
- partitionRequestInitialBackoff,
- partitionRequestMaxBackoff,
- metrics,
- networkBufferPool);
-
- numRemoteChannels++;
- }
- else if (partitionLocation.isUnknown()) {
- inputChannels[i] = new
UnknownInputChannel(inputGate, i, partitionId,
- partitionManager,
- taskEventPublisher,
- connectionManager,
- partitionRequestInitialBackoff,
- partitionRequestMaxBackoff,
- metrics,
- networkBufferPool);
-
- numUnknownChannels++;
- }
- else {
- throw new IllegalStateException("Unexpected
partition location.");
- }
-
- inputGate.setInputChannel(partitionId.getPartitionId(),
inputChannels[i]);
+ inputChannels[i] = createInputChannel(
+ inputGate,
+ i,
+
inputGateDeploymentDescriptor.getConsumerLocation(),
+ inputChannelDescriptors[i],
+ channelStatistics,
+ metrics);
+ ResultPartitionID resultPartitionID =
inputChannels[i].getPartitionId();
+
inputGate.setInputChannel(resultPartitionID.getPartitionId(), inputChannels[i]);
}
LOG.debug("{}: Created {} input channels (local: {}, remote:
{}, unknown: {}).",
owningTaskName,
inputChannels.length,
- numLocalChannels,
- numRemoteChannels,
- numUnknownChannels);
+ channelStatistics.numLocalChannels,
+ channelStatistics.numRemoteChannels,
+ channelStatistics.numUnknownChannels);
+ }
- return inputGate;
+ private InputChannel createInputChannel(
+ SingleInputGate inputGate,
+ int index,
+ ResourceID consumerLocation,
+ ShuffleDescriptor inputChannelDescriptor,
+ ChannelStatistics channelStatistics,
+ InputChannelMetrics metrics) {
+ if (inputChannelDescriptor.isUnknown()) {
+ ResultPartitionID partitionId =
inputChannelDescriptor.getResultPartitionID();
+ channelStatistics.numUnknownChannels++;
+ return new UnknownInputChannel(
+ inputGate,
+ index,
+ partitionId,
+ partitionManager,
+ taskEventPublisher,
+ connectionManager,
+ partitionRequestInitialBackoff,
+ partitionRequestMaxBackoff,
+ metrics,
+ networkBufferPool);
+ } else if (inputChannelDescriptor instanceof
NettyShuffleDescriptor) {
+ return createKnownInputChannel(
+ inputGate,
+ index,
+ consumerLocation,
+ (NettyShuffleDescriptor) inputChannelDescriptor,
+ channelStatistics,
+ taskEventPublisher,
+ metrics);
+ } else {
+ throw new IllegalArgumentException(String.format(
+ "Default network shuffle service: unsupported
ShuffleDescriptor <%s>",
+ inputChannelDescriptor.getClass().getName()));
+ }
+ }
+
+ private InputChannel createKnownInputChannel(
+ SingleInputGate inputGate,
+ int index,
+ ResourceID consumerLocation,
+ NettyShuffleDescriptor inputChannelDescriptor,
+ ChannelStatistics channelStatistics,
+ TaskEventPublisher taskEventPublisher,
+ InputChannelMetrics metrics) {
+ ResultPartitionID partitionId =
inputChannelDescriptor.getResultPartitionID();
+ if (inputChannelDescriptor.isLocalTo(consumerLocation)) {
+ // Consuming task is deployed to the same TaskManager
as the partition => local
+ channelStatistics.numLocalChannels++;
+ return new LocalInputChannel(
+ inputGate,
+ index,
+ partitionId,
+ partitionManager,
+ taskEventPublisher,
+ partitionRequestInitialBackoff,
+ partitionRequestMaxBackoff,
+ metrics);
+ } else {
+ // Different instances => remote
+ channelStatistics.numRemoteChannels++;
+ return new RemoteInputChannel(
+ inputGate,
+ index,
+ partitionId,
+ inputChannelDescriptor.getConnectionId(),
+ connectionManager,
+ partitionRequestInitialBackoff,
+ partitionRequestMaxBackoff,
+ metrics,
+ networkBufferPool);
+ }
}
private SupplierWithException<BufferPool, IOException>
createBufferPoolFactory(int size, ResultPartitionType type) {
Review comment:
this method may be removed to replace by the following method directly?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services