This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6bc8399e7f1738ec22cb1082c096269b5106cee5 Author: kevin.cyj <[email protected]> AuthorDate: Mon Jul 5 14:21:02 2021 +0800 [FLINK-22674][runtime] Provide JobID when applying for shuffle resources by ShuffleMaster#registerPartitionWithProducer --- .../org/apache/flink/runtime/executiongraph/Execution.java | 3 ++- .../org/apache/flink/runtime/shuffle/NettyShuffleMaster.java | 5 ++++- .../java/org/apache/flink/runtime/shuffle/ShuffleMaster.java | 6 +++++- .../flink/runtime/deployment/ShuffleDescriptorTest.java | 12 ++++++++---- .../executiongraph/ExecutionPartitionLifecycleTest.java | 4 +++- .../network/partition/JobMasterPartitionTrackerImplTest.java | 4 +++- .../scheduler/SsgNetworkMemoryCalculationUtilsTest.java | 5 ++++- .../apache/flink/runtime/shuffle/TestingShuffleMaster.java | 5 ++++- 8 files changed, 33 insertions(+), 11 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 58756e2..c74fb66 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -460,7 +460,8 @@ public class Execution CompletableFuture<? extends ShuffleDescriptor> shuffleDescriptorFuture = vertex.getExecutionGraphAccessor() .getShuffleMaster() - .registerPartitionWithProducer(partitionDescriptor, producerDescriptor); + .registerPartitionWithProducer( + vertex.getJobId(), partitionDescriptor, producerDescriptor); CompletableFuture<ResultPartitionDeploymentDescriptor> partitionRegistration = shuffleDescriptorFuture.thenApply( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java index 7b6a167..53ea1e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.shuffle; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; @@ -60,7 +61,9 @@ public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor> @Override public CompletableFuture<NettyShuffleDescriptor> registerPartitionWithProducer( - PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) { + JobID jobID, + PartitionDescriptor partitionDescriptor, + ProducerDescriptor producerDescriptor) { ResultPartitionID resultPartitionID = new ResultPartitionID( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java index 735669f..b0220e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.shuffle; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.MemorySize; import java.util.Collection; @@ -39,6 +40,7 @@ public interface ShuffleMaster<T extends ShuffleDescriptor> { * internally within the shuffle service. The descriptor should provide enough information to * read from or write data to the partition. * + * @param jobID job ID of the corresponding job which registered the partition * @param partitionDescriptor general job graph information about the partition * @param producerDescriptor general producer information (location, execution id, connection * info) @@ -46,7 +48,9 @@ public interface ShuffleMaster<T extends ShuffleDescriptor> { * and their data exchange. */ CompletableFuture<T> registerPartitionWithProducer( - PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor); + JobID jobID, + PartitionDescriptor partitionDescriptor, + ProducerDescriptor producerDescriptor); /** * Release any external resources occupied by the given partition. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java index 839d5cc..89cc242 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.deployment; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -54,18 +55,20 @@ public class ShuffleDescriptorTest extends TestLogger { @Test public void testMixedLocalRemoteUnknownDeployment() throws Exception { ResourceID consumerResourceID = ResourceID.generate(); + JobID jobID = new JobID(); // Local and remote channel are only allowed for certain execution // states. for (ExecutionState state : ExecutionState.values()) { ResultPartitionID localPartitionId = new ResultPartitionID(); ResultPartitionDeploymentDescriptor localPartition = - createResultPartitionDeploymentDescriptor(localPartitionId, consumerResourceID); + createResultPartitionDeploymentDescriptor( + jobID, localPartitionId, consumerResourceID); ResultPartitionID remotePartitionId = new ResultPartitionID(); ResultPartitionDeploymentDescriptor remotePartition = createResultPartitionDeploymentDescriptor( - remotePartitionId, ResourceID.generate()); + jobID, remotePartitionId, ResourceID.generate()); ResultPartitionID unknownPartitionId = new ResultPartitionID(); @@ -196,7 +199,7 @@ public class ShuffleDescriptorTest extends TestLogger { } private static ResultPartitionDeploymentDescriptor createResultPartitionDeploymentDescriptor( - ResultPartitionID id, ResourceID location) + JobID jobID, ResultPartitionID id, ResourceID location) throws ExecutionException, InterruptedException { ProducerDescriptor producerDescriptor = new ProducerDescriptor( @@ -208,7 +211,8 @@ public class ShuffleDescriptorTest extends TestLogger { PartitionDescriptorBuilder.newBuilder().setPartitionId(id.getPartitionId()).build(); ShuffleDescriptor shuffleDescriptor = ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER - .registerPartitionWithProducer(partitionDescriptor, producerDescriptor) + .registerPartitionWithProducer( + jobID, partitionDescriptor, producerDescriptor) .get(); return new ResultPartitionDeploymentDescriptor( partitionDescriptor, shuffleDescriptor, 1, true); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java index 2864160..3a01a40 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java @@ -322,7 +322,9 @@ public class ExecutionPartitionLifecycleTest extends TestLogger { @Override public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer( - PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) { + JobID jobID, + PartitionDescriptor partitionDescriptor, + ProducerDescriptor producerDescriptor) { return CompletableFuture.completedFuture( new TestingShuffleDescriptor( partitionDescriptor.getPartitionId(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java index 67ac823..1f7fbb1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java @@ -317,7 +317,9 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { @Override public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer( - PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) { + JobID jobID, + PartitionDescriptor partitionDescriptor, + ProducerDescriptor producerDescriptor) { return null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java index 174304d..54ad8eb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.scheduler; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.MemorySize; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.executiongraph.ExecutionGraph; @@ -130,7 +131,9 @@ public class SsgNetworkMemoryCalculationUtilsTest { private static class TestShuffleMaster implements ShuffleMaster<ShuffleDescriptor> { @Override public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer( - PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) { + JobID jobID, + PartitionDescriptor partitionDescriptor, + ProducerDescriptor producerDescriptor) { return null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/TestingShuffleMaster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/TestingShuffleMaster.java index c912953..0787718 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/TestingShuffleMaster.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/TestingShuffleMaster.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.shuffle; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -48,7 +49,9 @@ public class TestingShuffleMaster implements ShuffleMaster<ShuffleDescriptor> { @Override public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer( - PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) { + JobID jobID, + PartitionDescriptor partitionDescriptor, + ProducerDescriptor producerDescriptor) { if (throwExceptionalOnRegistration) { throw new RuntimeException("Forced partition registration failure"); } else if (autoCompleteRegistration) {
