This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 739c496d2d5a9d12cc7ee7b021fed4a9cd0499ed Author: Zhijiang <[email protected]> AuthorDate: Thu Jun 27 11:56:06 2019 +0800 [FLINK-12882][network] Remove ExecutionAttemptID argument from ResultPartitionFactory#create The ResultPartitionID could be got directly from ResultPartitionDeploymentDescriptor, so it is no need to pass ExecutionAttemptID to construct new ResultPartitionID during creating ResultPartition in factory. --- .../org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java | 5 +---- .../flink/runtime/io/network/partition/ResultPartitionFactory.java | 4 +--- .../runtime/io/network/partition/ResultPartitionFactoryTest.java | 3 +-- 3 files changed, 3 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java index 1565ed8..5171d75 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java @@ -187,10 +187,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti ResultPartition[] resultPartitions = new ResultPartition[resultPartitionDeploymentDescriptors.size()]; int counter = 0; for (ResultPartitionDeploymentDescriptor rpdd : resultPartitionDeploymentDescriptors) { - resultPartitions[counter++] = resultPartitionFactory.create( - ownerContext.getOwnerName(), - ownerContext.getExecutionAttemptID(), - rpdd); + resultPartitions[counter++] = resultPartitionFactory.create(ownerContext.getOwnerName(), rpdd); } registerOutputMetrics(config.isNetworkDetailedMetrics(), ownerContext.getOutputGroup(), resultPartitions); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java index 9479653..6fc5cfc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; import org.apache.flink.runtime.io.network.buffer.BufferPool; @@ -73,12 +72,11 @@ public class ResultPartitionFactory { public ResultPartition create( @Nonnull String taskNameWithSubtaskAndId, - @Nonnull ExecutionAttemptID executionAttemptID, @Nonnull ResultPartitionDeploymentDescriptor desc) { return create( taskNameWithSubtaskAndId, - new ResultPartitionID(desc.getPartitionId(), executionAttemptID), + desc.getShuffleDescriptor().getResultPartitionID(), desc.getPartitionType(), desc.getNumberOfSubpartitions(), desc.getMaxParallelism(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java index 8e95f8a..0c6848b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.disk.iomanager.NoOpIOManager; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -74,6 +73,6 @@ public class ResultPartitionFactoryTest extends TestLogger { releaseType ); - return factory.create("test", new ExecutionAttemptID(), descriptor); + return factory.create("test", descriptor); } }
