JunRuiLee commented on code in PR #25552:
URL: https://github.com/apache/flink/pull/25552#discussion_r1898321168
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -522,11 +522,42 @@ public JobEdge connectNewDataSetAsInput(
IntermediateDataSetID intermediateDataSetId,
boolean isBroadcast,
boolean isForward) {
Review Comment:
visibleForTesting.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java:
##########
@@ -77,6 +77,10 @@ public class StreamEdge implements Serializable {
private final IntermediateDataSetID intermediateDatasetIdToProduce;
+ private boolean existInterInputsKeyCorrelation;
+
+ private boolean existIntraInputKeyCorrelation;
+
Review Comment:
maybe `interInputsKeyCorrelation` and `intraInputKeyCorrelation` .
And add some detail comments to explain the meaning of these two concepts.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingInputInfo.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class BlockingInputInfo implements BlockingResultInfo {
Review Comment:
And I'm curious about the relationship between this class and
`AllToAllBlockingResultInfo` and `PointwiseBlockingResultInfo`.
It seems that after this PR, `AllToAllBlockingResultInfo` and
`PointwiseBlockingResultInfo` are no longer needed.
Could we put the new attributes in `AllToAllBlockingResultInfo` and
`PointwiseBlockingResultInfo` and do some related updates?
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java:
##########
@@ -150,6 +154,11 @@ public StreamEdge(
+ outputPartitioner
+ "_"
+ uniqueId;
+ if (outputPartitioner != null) {
+ this.existIntraInputKeyCorrelation =
!outputPartitioner.isPointwise();
+ this.existInterInputsKeyCorrelation =
+ !outputPartitioner.isPointwise() &&
!outputPartitioner.isBroadcast();
Review Comment:
maybe introduce a method `configureKeyCorrelation(StreamPartitioner<?>
partitioner)`. Then the `setPartitioner` also could call this method.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java:
##########
@@ -65,6 +65,12 @@ public class JobEdge implements java.io.Serializable {
/** Optional description of the caching inside an operator, to be
displayed in the JSON plan. */
private String operatorLevelCachingDescription;
+ private final int typeNumber;
+
+ private final boolean existInterInputsKeyCorrelation;
+
+ private final boolean existIntraInputKeyCorrelation;
Review Comment:
ditto
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java:
##########
@@ -243,4 +255,20 @@ public IntermediateDataSetID
getIntermediateDatasetIdToProduce() {
public String getEdgeId() {
return edgeId;
}
+
+ public boolean existInterInputsKeyCorrelation() {
+ return existInterInputsKeyCorrelation;
+ }
+
+ public boolean existIntraInputKeyCorrelation() {
+ return existIntraInputKeyCorrelation;
+ }
+
+ public void setExistInterInputsKeyCorrelation(boolean
existInterInputsKeyCorrelation) {
+ this.existInterInputsKeyCorrelation = existInterInputsKeyCorrelation;
+ }
+
+ public void setExistIntraInputKeyCorrelation(boolean
existIntraInputKeyCorrelation) {
+ this.existIntraInputKeyCorrelation = existIntraInputKeyCorrelation;
+ }
Review Comment:
Can we change the key correlation in all cases? If not, can we add some
checks here?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingInputInfo.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class BlockingInputInfo implements BlockingResultInfo {
Review Comment:
javadoc
and can be package-private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]