This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 739c496d2d5a9d12cc7ee7b021fed4a9cd0499ed
Author: Zhijiang <[email protected]>
AuthorDate: Thu Jun 27 11:56:06 2019 +0800

    [FLINK-12882][network] Remove ExecutionAttemptID argument from 
ResultPartitionFactory#create
    
    The ResultPartitionID could be got directly from 
ResultPartitionDeploymentDescriptor, so it is no need to pass
    ExecutionAttemptID to construct new ResultPartitionID during creating 
ResultPartition in factory.
---
 .../org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java | 5 +----
 .../flink/runtime/io/network/partition/ResultPartitionFactory.java   | 4 +---
 .../runtime/io/network/partition/ResultPartitionFactoryTest.java     | 3 +--
 3 files changed, 3 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
index 1565ed8..5171d75 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
@@ -187,10 +187,7 @@ public class NettyShuffleEnvironment implements 
ShuffleEnvironment<ResultPartiti
                        ResultPartition[] resultPartitions = new 
ResultPartition[resultPartitionDeploymentDescriptors.size()];
                        int counter = 0;
                        for (ResultPartitionDeploymentDescriptor rpdd : 
resultPartitionDeploymentDescriptors) {
-                               resultPartitions[counter++] = 
resultPartitionFactory.create(
-                                       ownerContext.getOwnerName(),
-                                       ownerContext.getExecutionAttemptID(),
-                                       rpdd);
+                               resultPartitions[counter++] = 
resultPartitionFactory.create(ownerContext.getOwnerName(), rpdd);
                        }
 
                        
registerOutputMetrics(config.isNetworkDetailedMetrics(), 
ownerContext.getOutputGroup(), resultPartitions);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 9479653..6fc5cfc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -73,12 +72,11 @@ public class ResultPartitionFactory {
 
        public ResultPartition create(
                @Nonnull String taskNameWithSubtaskAndId,
-               @Nonnull ExecutionAttemptID executionAttemptID,
                @Nonnull ResultPartitionDeploymentDescriptor desc) {
 
                return create(
                        taskNameWithSubtaskAndId,
-                       new ResultPartitionID(desc.getPartitionId(), 
executionAttemptID),
+                       desc.getShuffleDescriptor().getResultPartitionID(),
                        desc.getPartitionType(),
                        desc.getNumberOfSubpartitions(),
                        desc.getMaxParallelism(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 8e95f8a..0c6848b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -18,7 +18,6 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.NoOpIOManager;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -74,6 +73,6 @@ public class ResultPartitionFactoryTest extends TestLogger {
                        releaseType
                );
 
-               return factory.create("test", new ExecutionAttemptID(), 
descriptor);
+               return factory.create("test", descriptor);
        }
 }

Reply via email to