azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r290385492
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
##########
@@ -99,110 +96,161 @@ public SingleInputGate create(
@Nonnull InputGateDeploymentDescriptor igdd,
@Nonnull PartitionProducerStateProvider
partitionProducerStateProvider,
@Nonnull InputChannelMetrics metrics) {
- final IntermediateDataSetID consumedResultId =
checkNotNull(igdd.getConsumedResultId());
- final ResultPartitionType consumedPartitionType =
checkNotNull(igdd.getConsumedPartitionType());
-
- final int consumedSubpartitionIndex =
igdd.getConsumedSubpartitionIndex();
- checkArgument(consumedSubpartitionIndex >= 0);
-
- final InputChannelDeploymentDescriptor[] icdd =
checkNotNull(igdd.getInputChannelDeploymentDescriptors());
+ SupplierWithException<BufferPool, IOException>
bufferPoolFactory = createBufferPoolFactory(
+ networkBufferPool,
+ isCreditBased,
+ networkBuffersPerChannel,
+ floatingNetworkBuffersPerGate,
+ igdd.getInputChannelDescriptors().length,
+ igdd.getConsumedPartitionType());
- final SingleInputGate inputGate = new SingleInputGate(
+ SingleInputGate inputGate = new SingleInputGate(
owningTaskName,
- consumedResultId,
- consumedPartitionType,
- consumedSubpartitionIndex,
- icdd.length,
+ igdd.getConsumedResultId(),
+ igdd.getConsumedPartitionType(),
+ igdd.getConsumedSubpartitionIndex(),
+ igdd.getInputChannelDescriptors().length,
partitionProducerStateProvider,
isCreditBased,
- createBufferPoolFactory(icdd.length,
consumedPartitionType));
+ bufferPoolFactory);
+
+ createInputChannels(owningTaskName, igdd, inputGate, metrics);
+ return inputGate;
+ }
+
+ private void createInputChannels(
+ String owningTaskName,
+ InputGateDeploymentDescriptor
inputGateDeploymentDescriptor,
+ SingleInputGate inputGate,
+ InputChannelMetrics metrics) {
+ ShuffleDescriptor[] inputChannelDescriptors =
+
inputGateDeploymentDescriptor.getInputChannelDescriptors();
// Create the input channels. There is one input channel for
each consumed partition.
- final InputChannel[] inputChannels = new
InputChannel[icdd.length];
+ InputChannel[] inputChannels = new
InputChannel[inputChannelDescriptors.length];
- int numLocalChannels = 0;
- int numRemoteChannels = 0;
- int numUnknownChannels = 0;
+ ChannelStatistics channelStatistics = new ChannelStatistics();
for (int i = 0; i < inputChannels.length; i++) {
- final ResultPartitionID partitionId =
icdd[i].getConsumedPartitionId();
- final ResultPartitionLocation partitionLocation =
icdd[i].getConsumedPartitionLocation();
-
- if (partitionLocation.isLocal()) {
- inputChannels[i] = new
LocalInputChannel(inputGate, i, partitionId,
- partitionManager,
- taskEventPublisher,
- partitionRequestInitialBackoff,
- partitionRequestMaxBackoff,
- metrics);
-
- numLocalChannels++;
- }
- else if (partitionLocation.isRemote()) {
- inputChannels[i] = new
RemoteInputChannel(inputGate, i, partitionId,
- partitionLocation.getConnectionId(),
- connectionManager,
- partitionRequestInitialBackoff,
- partitionRequestMaxBackoff,
- metrics,
- networkBufferPool);
-
- numRemoteChannels++;
- }
- else if (partitionLocation.isUnknown()) {
- inputChannels[i] = new
UnknownInputChannel(inputGate, i, partitionId,
- partitionManager,
- taskEventPublisher,
- connectionManager,
- partitionRequestInitialBackoff,
- partitionRequestMaxBackoff,
- metrics,
- networkBufferPool);
-
- numUnknownChannels++;
- }
- else {
- throw new IllegalStateException("Unexpected
partition location.");
- }
-
- inputGate.setInputChannel(partitionId.getPartitionId(),
inputChannels[i]);
+ inputChannels[i] = createInputChannel(
+ inputGate,
+ i,
+
inputGateDeploymentDescriptor.getConsumerLocation(),
+ inputChannelDescriptors[i],
+ channelStatistics,
+ metrics);
+ ResultPartitionID resultPartitionID =
inputChannels[i].getPartitionId();
+
inputGate.setInputChannel(resultPartitionID.getPartitionId(), inputChannels[i]);
}
- LOG.debug("{}: Created {} input channels (local: {}, remote:
{}, unknown: {}).",
+ LOG.debug("{}: Created {} input channels ({}).",
owningTaskName,
inputChannels.length,
- numLocalChannels,
- numRemoteChannels,
- numUnknownChannels);
+ channelStatistics);
+ }
- return inputGate;
+ private InputChannel createInputChannel(
+ SingleInputGate inputGate,
+ int index,
+ ResourceID consumerLocation,
+ ShuffleDescriptor inputChannelDescriptor,
+ ChannelStatistics channelStatistics,
+ InputChannelMetrics metrics) {
+ if (inputChannelDescriptor.isUnknown()) {
+ ResultPartitionID partitionId =
inputChannelDescriptor.getResultPartitionID();
+ channelStatistics.numUnknownChannels++;
+ return new UnknownInputChannel(
+ inputGate,
+ index,
+ partitionId,
+ partitionManager,
+ taskEventPublisher,
+ connectionManager,
+ partitionRequestInitialBackoff,
+ partitionRequestMaxBackoff,
+ metrics,
+ networkBufferPool);
+ } else if (inputChannelDescriptor instanceof
NettyShuffleDescriptor) {
+ return createKnownInputChannel(
+ inputGate,
+ index,
+ consumerLocation,
+ (NettyShuffleDescriptor) inputChannelDescriptor,
+ channelStatistics,
+ metrics);
+ } else {
+ throw new IllegalArgumentException(String.format(
+ "Default network shuffle service: unsupported
ShuffleDescriptor <%s>",
+ inputChannelDescriptor.getClass().getName()));
+ }
}
- private SupplierWithException<BufferPool, IOException>
createBufferPoolFactory(int size, ResultPartitionType type) {
- return createBufferPoolFactory(
- networkBufferPool, isCreditBased,
networkBuffersPerChannel, floatingNetworkBuffersPerGate, size, type);
+ private InputChannel createKnownInputChannel(
+ SingleInputGate inputGate,
+ int index,
+ ResourceID consumerLocation,
+ NettyShuffleDescriptor inputChannelDescriptor,
+ ChannelStatistics channelStatistics,
+ InputChannelMetrics metrics) {
+ ResultPartitionID partitionId =
inputChannelDescriptor.getResultPartitionID();
+ if (inputChannelDescriptor.isLocalTo(consumerLocation)) {
+ // Consuming task is deployed to the same TaskManager
as the partition => local
+ channelStatistics.numLocalChannels++;
+ return new LocalInputChannel(
+ inputGate,
+ index,
+ partitionId,
+ partitionManager,
+ taskEventPublisher,
+ partitionRequestInitialBackoff,
+ partitionRequestMaxBackoff,
+ metrics);
+ } else {
+ // Different instances => remote
+ channelStatistics.numRemoteChannels++;
+ return new RemoteInputChannel(
+ inputGate,
+ index,
+ partitionId,
+ inputChannelDescriptor.getConnectionId(),
+ connectionManager,
+ partitionRequestInitialBackoff,
+ partitionRequestMaxBackoff,
+ metrics,
+ networkBufferPool);
+ }
Review comment:
True, I also think this is for a followup.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services