This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6bc8399e7f1738ec22cb1082c096269b5106cee5
Author: kevin.cyj <[email protected]>
AuthorDate: Mon Jul 5 14:21:02 2021 +0800

    [FLINK-22674][runtime] Provide JobID when applying for shuffle resources by 
ShuffleMaster#registerPartitionWithProducer
---
 .../org/apache/flink/runtime/executiongraph/Execution.java   |  3 ++-
 .../org/apache/flink/runtime/shuffle/NettyShuffleMaster.java |  5 ++++-
 .../java/org/apache/flink/runtime/shuffle/ShuffleMaster.java |  6 +++++-
 .../flink/runtime/deployment/ShuffleDescriptorTest.java      | 12 ++++++++----
 .../executiongraph/ExecutionPartitionLifecycleTest.java      |  4 +++-
 .../network/partition/JobMasterPartitionTrackerImplTest.java |  4 +++-
 .../scheduler/SsgNetworkMemoryCalculationUtilsTest.java      |  5 ++++-
 .../apache/flink/runtime/shuffle/TestingShuffleMaster.java   |  5 ++++-
 8 files changed, 33 insertions(+), 11 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 58756e2..c74fb66 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -460,7 +460,8 @@ public class Execution
             CompletableFuture<? extends ShuffleDescriptor> 
shuffleDescriptorFuture =
                     vertex.getExecutionGraphAccessor()
                             .getShuffleMaster()
-                            
.registerPartitionWithProducer(partitionDescriptor, producerDescriptor);
+                            .registerPartitionWithProducer(
+                                    vertex.getJobId(), partitionDescriptor, 
producerDescriptor);
 
             CompletableFuture<ResultPartitionDeploymentDescriptor> 
partitionRegistration =
                     shuffleDescriptorFuture.thenApply(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
index 7b6a167..53ea1e7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.shuffle;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
@@ -60,7 +61,9 @@ public class NettyShuffleMaster implements 
ShuffleMaster<NettyShuffleDescriptor>
 
     @Override
     public CompletableFuture<NettyShuffleDescriptor> 
registerPartitionWithProducer(
-            PartitionDescriptor partitionDescriptor, ProducerDescriptor 
producerDescriptor) {
+            JobID jobID,
+            PartitionDescriptor partitionDescriptor,
+            ProducerDescriptor producerDescriptor) {
 
         ResultPartitionID resultPartitionID =
                 new ResultPartitionID(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
index 735669f..b0220e9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.shuffle;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.MemorySize;
 
 import java.util.Collection;
@@ -39,6 +40,7 @@ public interface ShuffleMaster<T extends ShuffleDescriptor> {
      * internally within the shuffle service. The descriptor should provide 
enough information to
      * read from or write data to the partition.
      *
+     * @param jobID job ID of the corresponding job which registered the 
partition
      * @param partitionDescriptor general job graph information about the 
partition
      * @param producerDescriptor general producer information (location, 
execution id, connection
      *     info)
@@ -46,7 +48,9 @@ public interface ShuffleMaster<T extends ShuffleDescriptor> {
      *     and their data exchange.
      */
     CompletableFuture<T> registerPartitionWithProducer(
-            PartitionDescriptor partitionDescriptor, ProducerDescriptor 
producerDescriptor);
+            JobID jobID,
+            PartitionDescriptor partitionDescriptor,
+            ProducerDescriptor producerDescriptor);
 
     /**
      * Release any external resources occupied by the given partition.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java
index 839d5cc..89cc242 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.deployment;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -54,18 +55,20 @@ public class ShuffleDescriptorTest extends TestLogger {
     @Test
     public void testMixedLocalRemoteUnknownDeployment() throws Exception {
         ResourceID consumerResourceID = ResourceID.generate();
+        JobID jobID = new JobID();
 
         // Local and remote channel are only allowed for certain execution
         // states.
         for (ExecutionState state : ExecutionState.values()) {
             ResultPartitionID localPartitionId = new ResultPartitionID();
             ResultPartitionDeploymentDescriptor localPartition =
-                    
createResultPartitionDeploymentDescriptor(localPartitionId, consumerResourceID);
+                    createResultPartitionDeploymentDescriptor(
+                            jobID, localPartitionId, consumerResourceID);
 
             ResultPartitionID remotePartitionId = new ResultPartitionID();
             ResultPartitionDeploymentDescriptor remotePartition =
                     createResultPartitionDeploymentDescriptor(
-                            remotePartitionId, ResourceID.generate());
+                            jobID, remotePartitionId, ResourceID.generate());
 
             ResultPartitionID unknownPartitionId = new ResultPartitionID();
 
@@ -196,7 +199,7 @@ public class ShuffleDescriptorTest extends TestLogger {
     }
 
     private static ResultPartitionDeploymentDescriptor 
createResultPartitionDeploymentDescriptor(
-            ResultPartitionID id, ResourceID location)
+            JobID jobID, ResultPartitionID id, ResourceID location)
             throws ExecutionException, InterruptedException {
         ProducerDescriptor producerDescriptor =
                 new ProducerDescriptor(
@@ -208,7 +211,8 @@ public class ShuffleDescriptorTest extends TestLogger {
                 
PartitionDescriptorBuilder.newBuilder().setPartitionId(id.getPartitionId()).build();
         ShuffleDescriptor shuffleDescriptor =
                 ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER
-                        .registerPartitionWithProducer(partitionDescriptor, 
producerDescriptor)
+                        .registerPartitionWithProducer(
+                                jobID, partitionDescriptor, producerDescriptor)
                         .get();
         return new ResultPartitionDeploymentDescriptor(
                 partitionDescriptor, shuffleDescriptor, 1, true);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
index 2864160..3a01a40 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
@@ -322,7 +322,9 @@ public class ExecutionPartitionLifecycleTest extends 
TestLogger {
 
         @Override
         public CompletableFuture<ShuffleDescriptor> 
registerPartitionWithProducer(
-                PartitionDescriptor partitionDescriptor, ProducerDescriptor 
producerDescriptor) {
+                JobID jobID,
+                PartitionDescriptor partitionDescriptor,
+                ProducerDescriptor producerDescriptor) {
             return CompletableFuture.completedFuture(
                     new TestingShuffleDescriptor(
                             partitionDescriptor.getPartitionId(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java
index 67ac823..1f7fbb1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java
@@ -317,7 +317,9 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
 
         @Override
         public CompletableFuture<ShuffleDescriptor> 
registerPartitionWithProducer(
-                PartitionDescriptor partitionDescriptor, ProducerDescriptor 
producerDescriptor) {
+                JobID jobID,
+                PartitionDescriptor partitionDescriptor,
+                ProducerDescriptor producerDescriptor) {
             return null;
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
index 174304d..54ad8eb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -130,7 +131,9 @@ public class SsgNetworkMemoryCalculationUtilsTest {
     private static class TestShuffleMaster implements 
ShuffleMaster<ShuffleDescriptor> {
         @Override
         public CompletableFuture<ShuffleDescriptor> 
registerPartitionWithProducer(
-                PartitionDescriptor partitionDescriptor, ProducerDescriptor 
producerDescriptor) {
+                JobID jobID,
+                PartitionDescriptor partitionDescriptor,
+                ProducerDescriptor producerDescriptor) {
             return null;
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/TestingShuffleMaster.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/TestingShuffleMaster.java
index c912953..0787718 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/TestingShuffleMaster.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/TestingShuffleMaster.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.shuffle;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -48,7 +49,9 @@ public class TestingShuffleMaster implements 
ShuffleMaster<ShuffleDescriptor> {
 
     @Override
     public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(
-            PartitionDescriptor partitionDescriptor, ProducerDescriptor 
producerDescriptor) {
+            JobID jobID,
+            PartitionDescriptor partitionDescriptor,
+            ProducerDescriptor producerDescriptor) {
         if (throwExceptionalOnRegistration) {
             throw new RuntimeException("Forced partition registration 
failure");
         } else if (autoCompleteRegistration) {

Reply via email to