zhuzhurk commented on code in PR #19653:
URL: https://github.com/apache/flink/pull/19653#discussion_r886299714


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobIntermediateDatasetReuseTest.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import org.apache.flink.types.IntValue;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/** Integration tests for reusing persisted intermediate dataset */
+public class JobIntermediateDatasetReuseTest {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(JobIntermediateDatasetReuseTest.class);
+
+    @Test
+    public void testClusterPartitionReuse() throws Exception {
+        final TestingMiniClusterConfiguration miniClusterConfiguration =
+                TestingMiniClusterConfiguration.newBuilder().build();
+
+        try (TestingMiniCluster miniCluster =
+                
TestingMiniCluster.newBuilder(miniClusterConfiguration).build()) {
+            miniCluster.start();
+
+            IntermediateDataSetID intermediateDataSetID = new 
IntermediateDataSetID();
+            final JobGraph firstJobGraph = createFirstJobGraph(1, 
intermediateDataSetID);
+            miniCluster.submitJob(firstJobGraph).get();
+            CompletableFuture<JobResult> jobResultFuture =
+                    miniCluster.requestJobResult(firstJobGraph.getJobID());
+            JobResult jobResult = jobResultFuture.get();
+            assertTrue(jobResult.isSuccess());
+
+            final JobGraph secondJobGraph = createSecondJobGraph(1, 
intermediateDataSetID);
+            miniCluster.submitJob(secondJobGraph).get();
+            jobResultFuture = 
miniCluster.requestJobResult(secondJobGraph.getJobID());
+            jobResult = jobResultFuture.get();
+            assertTrue(jobResult.isSuccess());
+        }
+    }
+
+    @Test
+    public void testClusterPartitionReuseMultipleParallelism() throws 
Exception {

Review Comment:
   Looks to me the only differences of these tests are the parallelisms and 
result verification.
   Can we extract the shared logic to a method and let each tests to just 
invoke the method with different parallelisms and verification implementations?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -568,6 +583,14 @@ public void setResultOptimizerProperties(String 
resultOptimizerProperties) {
         this.resultOptimizerProperties = resultOptimizerProperties;
     }
 
+    public void addIntermediateDataSetIdToConsume(IntermediateDataSetID 
intermediateDataSetId) {
+        intermediateDataSetIdsToConsume.add(intermediateDataSetId);
+    }
+
+    public List<IntermediateDataSetID> getIntermediateDataSetIdToConsume() {

Review Comment:
   getIntermediateDataSetIdToConsume -> getIntermediateDataSetIdsToConsume



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java:
##########
@@ -120,6 +125,26 @@ public Collection<ResultPartitionDeploymentDescriptor> 
getAllTrackedPartitions()
         return 
partitionInfos.values().stream().map(PartitionInfo::getMetaInfo).collect(toList());
     }
 
+    @Override
+    public void connectToResourceManager(ResourceManagerGateway 
resourceManagerGateway) {
+        this.resourceManagerGateway = resourceManagerGateway;
+    }
+
+    @Override
+    public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+            IntermediateDataSetID intermediateDataSetID) {
+        Preconditions.checkNotNull(
+                resourceManagerGateway, "JobMaster is not connected to 
ResourceManager");
+        try {
+            return this.resourceManagerGateway

Review Comment:
   Can we add a cache for it? Otherwise it can result in thousands of RPCs when 
deploying a large scale job vertex.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -244,7 +266,50 @@ public static TaskDeploymentDescriptorFactory 
fromExecutionVertex(
                 
internalExecutionGraphAccessor.getPartitionLocationConstraint(),
                 executionVertex.getAllConsumedPartitionGroups(),
                 internalExecutionGraphAccessor::getResultPartitionOrThrow,
-                internalExecutionGraphAccessor.getBlobWriter());
+                internalExecutionGraphAccessor.getBlobWriter(),
+                clusterPartitionShuffleDescriptors);
+    }
+
+    private static Map<IntermediateDataSetID, ShuffleDescriptor[]>
+            getClusterPartitionShuffleDescriptors(ExecutionVertex 
executionVertex) {
+        final InternalExecutionGraphAccessor internalExecutionGraphAccessor =
+                executionVertex.getExecutionGraphAccessor();
+        final List<IntermediateDataSetID> consumedClusterDataSetIds =
+                
executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdToConsume();
+        Map<IntermediateDataSetID, ShuffleDescriptor[]> 
clusterPartitionShuffleDescriptors =
+                new HashMap<>();
+
+        for (IntermediateDataSetID consumedClusterDataSetId : 
consumedClusterDataSetIds) {
+            Collection<? extends ShuffleDescriptor> shuffleDescriptors =
+                    
internalExecutionGraphAccessor.getClusterPartitionShuffleDescriptors(
+                            consumedClusterDataSetId);
+
+            Preconditions.checkState(
+                    executionVertex.getTotalNumberOfParallelSubtasks() == 
shuffleDescriptors.size(),
+                    "The parallelism (%s) of the cache consuming job vertex is 
"
+                            + "different from the number of shuffle 
descriptors (%s) of the intermediate data set",
+                    executionVertex.getTotalNumberOfParallelSubtasks(),
+                    shuffleDescriptors.size());
+
+            shuffleDescriptors =

Review Comment:
   I would add the assumption that ShuffleMaster has returned ordered 
shuffleDescriptors. And then we can change the complexity to O(1) by getting 
the only shuffle descriptor via the index. Otherwise it will result in an O(N) 
complexity and further result in an O(N^2) complexity when deploying N 
execution vertices.
   



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java:
##########
@@ -42,6 +44,7 @@ public class TestingSchedulingExecutionVertex implements 
SchedulingExecutionVert
 
     private final Map<IntermediateResultPartitionID, 
TestingSchedulingResultPartition>
             resultPartitionsById;
+    private final List<IntermediateDataSetID> cachedIntermediateDataSetID;

Review Comment:
   cachedIntermediateDataSetID -> consumedCachedIntermediateDataSetId
   
   As once explained, we need to clarify it is a produced partition or a 
consumed partition to avoid confusion.
   Similar changes should also be applied around.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java:
##########
@@ -131,6 +147,7 @@ public static class Builder {
         private final Map<IntermediateResultPartitionID, 
TestingSchedulingResultPartition>
                 resultPartitionsById = new HashMap<>();
         private ExecutionState executionState = ExecutionState.CREATED;
+        private final List<IntermediateDataSetID> cachedIntermediateDataset = 
new ArrayList<>();

Review Comment:
   Dataset -> DataSet 
   
   To be aligned with others.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java:
##########
@@ -38,11 +39,16 @@ public class ConsumedPartitionGroup implements 
Iterable<IntermediateResultPartit
 
     private final IntermediateDataSetID intermediateDataSetID;
 
-    private ConsumedPartitionGroup(List<IntermediateResultPartitionID> 
resultPartitions) {
+    private final ResultPartitionType resultPartitionType;
+
+    private ConsumedPartitionGroup(
+            List<IntermediateResultPartitionID> resultPartitions,
+            ResultPartitionType resultPartitionType) {
         checkArgument(
                 resultPartitions.size() > 0,
                 "The size of result partitions in the ConsumedPartitionGroup 
should be larger than 0.");
         this.intermediateDataSetID = 
resultPartitions.get(0).getIntermediateDataSetID();
+        this.resultPartitionType = resultPartitionType;

Review Comment:
   better to `checkNotNull`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java:
##########
@@ -177,6 +177,15 @@ public TestingSchedulingExecutionVertex newExecutionVertex(
         return newVertex;
     }
 
+    public TestingSchedulingExecutionVertex newExecutionVertex(

Review Comment:
   maybe `newExecutionVertexToConsumeCachedIntermediateDataSet`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -137,6 +148,16 @@ private List<InputGateDeploymentDescriptor> 
createInputGateDeploymentDescriptors
                                     consumedIntermediateResult, 
consumedPartitionGroup)));
         }
 
+        for (Map.Entry<IntermediateDataSetID, ShuffleDescriptor[]> entry :
+                consumedClusterPartitionShuffleDescriptors.entrySet()) {
+            inputGates.add(
+                    new InputGateDeploymentDescriptor(
+                            entry.getKey(),
+                            ResultPartitionType.BLOCKING_PERSISTENT,
+                            0,

Review Comment:
   I'd suggest to explicitly add this assumption to the comment because it is 
not easy to understand unless one reads the assumption/limitation of the FLIP. 
Because theoretically in runtime, one partition can be consumed by multiple 
consumers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to