zhuzhurk commented on code in PR #19653:
URL: https://github.com/apache/flink/pull/19653#discussion_r872369788
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java:
##########
@@ -42,6 +43,8 @@ public class DefaultSchedulingPipelinedRegion implements
SchedulingPipelinedRegi
private Set<ConsumedPartitionGroup> blockingConsumedPartitionGroups;
+ private Set<ConsumedPartitionGroup> persistentConsumedPartitionGroups;
Review Comment:
How about to add a `ResultPartitionType` field(which contains `isPersistent`
info) to ConsumedPartitionGroup?
I think it can simplify things a lot.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -481,8 +481,18 @@ public IntermediateDataSet createAndAddResultDataSet(
public JobEdge connectNewDataSetAsInput(
JobVertex input, DistributionPattern distPattern,
ResultPartitionType partitionType) {
+ return this.connectNewDataSetAsInput(
+ input, distPattern, partitionType, new
IntermediateDataSetID());
+ }
+
+ public JobEdge connectNewDataSetAsInput(
+ JobVertex input,
+ DistributionPattern distPattern,
+ ResultPartitionType partitionType,
+ IntermediateDataSetID intermediateDataSetID) {
Review Comment:
NIT: "intermediateDataSetID" -> "intermediateDataSetId".
This is the naming convention we currently follow, although some old code
are still using *ID variables.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -85,7 +88,9 @@ private TaskDeploymentDescriptorFactory(
List<ConsumedPartitionGroup> consumedPartitionGroups,
Function<IntermediateResultPartitionID,
IntermediateResultPartition>
resultPartitionRetriever,
- BlobWriter blobWriter) {
+ BlobWriter blobWriter,
+ IntermediateDataSetID cachedIntermediateDataSetID,
+ Collection<? extends ShuffleDescriptor>
clusterPartitionShuffleDescriptors) {
Review Comment:
clusterPartitionShuffleDescriptors ->
consumedClusterPartitionShuffleDescriptors
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -241,6 +271,23 @@ public static TaskDeploymentDescriptorFactory
fromExecutionVertex(
ExecutionVertex executionVertex, int attemptNumber) throws
IOException {
InternalExecutionGraphAccessor internalExecutionGraphAccessor =
executionVertex.getExecutionGraphAccessor();
+ final IntermediateDataSetID intermediateDataSetID =
+
executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIDToConsume();
+ Collection<? extends ShuffleDescriptor>
clusterPartitionShuffleDescriptors = null;
Review Comment:
I prefer to make `clusterPartitionShuffleDescriptors` an empty list if the
`intermediateDataSetID` is null, instead of making it nullable.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -175,9 +194,27 @@ public JobVertex(String name, JobVertexID id) {
* @param operatorIDPairs The operator ID pairs of the job vertex.
*/
public JobVertex(String name, JobVertexID primaryId, List<OperatorIDPair>
operatorIDPairs) {
+ this(name, primaryId, operatorIDPairs, null);
+ }
+
+ /**
+ * Constructs a new job vertex and assigns it with the given name.
+ *
+ * @param name The name of the new job vertex.
+ * @param primaryId The id of the job vertex.
+ * @param operatorIDPairs The operator ID pairs of the job vertex.
+ * @param intermediateDataSetID The id of the cached intermediate dataset
that the job vertex
+ * consumes.
+ */
+ public JobVertex(
Review Comment:
Looks to me it is only used in test and can be replaced with
`JobVertex#createAndAddResultDataSet`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -253,7 +300,9 @@ public static TaskDeploymentDescriptorFactory
fromExecutionVertex(
executionVertex.getParallelSubtaskIndex(),
executionVertex.getAllConsumedPartitionGroups(),
internalExecutionGraphAccessor::getResultPartitionOrThrow,
- internalExecutionGraphAccessor.getBlobWriter());
+ internalExecutionGraphAccessor.getBlobWriter(),
+ intermediateDataSetID,
Review Comment:
The param `intermediateDataSetID` is not needed because it can be get from
the shuffle descriptor.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -253,7 +300,9 @@ public static TaskDeploymentDescriptorFactory
fromExecutionVertex(
executionVertex.getParallelSubtaskIndex(),
executionVertex.getAllConsumedPartitionGroups(),
internalExecutionGraphAccessor::getResultPartitionOrThrow,
- internalExecutionGraphAccessor.getBlobWriter());
+ internalExecutionGraphAccessor.getBlobWriter(),
+ intermediateDataSetID,
+ clusterPartitionShuffleDescriptors);
Review Comment:
Can we pass in shuffleDescriptors which are already filtered? I think it can
simplify the logic.
For each consumed cluster/cached result, the shuffle descriptor should be
`clusterPartitionShuffleDescriptors[subtaskIndex]`.
We can move the check
`descriptor.getResultPartitionID().getPartitionId().getPartitionNumber() ==
subtaskIndex` here to ensure this assumption is correct.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -241,6 +271,23 @@ public static TaskDeploymentDescriptorFactory
fromExecutionVertex(
ExecutionVertex executionVertex, int attemptNumber) throws
IOException {
InternalExecutionGraphAccessor internalExecutionGraphAccessor =
executionVertex.getExecutionGraphAccessor();
+ final IntermediateDataSetID intermediateDataSetID =
Review Comment:
intermediateDataSetID -> consumedClusterResultId or consumedClusterDataSetId
It can be easier to understanding. because
1. a vertex can produce partitions as well as consume partitions
2. A result contains multiple result partitions. A cluster result contains
multiple cluster result partitions (cluster partition in short).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]