zhuzhurk commented on a change in pull request #15088:
URL: https://github.com/apache/flink/pull/15088#discussion_r594996057
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java
##########
@@ -44,4 +46,14 @@
* @return result partition state
*/
ResultPartitionState getState();
+
+ /**
+ * Get the grouped {@link ExecutionVertexID}.
+ *
+ * @return List of grouped Execution Vertex IDs.
+ */
+ List<ConsumerVertexGroup> getConsumerVertexGroups();
+
+ /** Get {@link SchedulingExecutionVertex} by {@link ExecutionVertexID}. */
+ SchedulingExecutionVertex getVertexOrThrow(ExecutionVertexID id);
Review comment:
Similar to the other comment. Why do we need to add this method to the
interface? I do not find it publicly used except for in tests.
Besides that, we also have `SchedulingTopology#getVertex(id)`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
##########
@@ -37,4 +39,14 @@
* @return state of the execution vertex
*/
ExecutionState getState();
+
+ /**
+ * Get the grouped consumed result partitions.
+ *
+ * @return list of grouped intermediate result partition IDs
+ */
+ List<ConsumedPartitionGroup> getConsumerPartitionGroups();
+
+ /** Get {@link SchedulingResultPartition} by {@link
IntermediateResultPartitionID}. */
+ SchedulingResultPartition
getResultPartitionOrThrow(IntermediateResultPartitionID id);
Review comment:
Why do we need to add this method to the interface? I do not find it
publicly used except for in tests.
Besides that, we also have `SchedulingTopology#getResultPartition(id)`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java
##########
@@ -123,7 +123,7 @@ public void returnsIncidentBlockingPartitions() throws
Exception {
final DefaultExecutionVertex vertexB0 =
topology.getVertex(new ExecutionVertexID(b.getID(), 0));
final IntermediateResultPartitionID b0ConsumedResultPartition =
-
Iterables.getOnlyElement(vertexB0.getConsumedResults()).getId();
Review comment:
I think this change is not needed and it even skip covering the change
to flatten the group.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
##########
@@ -66,16 +76,39 @@ public void setState(ExecutionState state) {
@Override
public Iterable<TestingSchedulingResultPartition> getConsumedResults() {
- return consumedPartitions;
+ return () ->
+ createFlattenIterator(
+ this::getConsumerPartitionGroups,
this::getResultPartitionOrThrow);
}
@Override
public Iterable<TestingSchedulingResultPartition> getProducedResults() {
return producedPartitions;
}
- void addConsumedPartition(TestingSchedulingResultPartition partition) {
- consumedPartitions.add(partition);
+ @Override
+ public List<ConsumedPartitionGroup> getConsumerPartitionGroups() {
+ return consumedPartitionGroups;
+ }
+
+ @Override
+ public TestingSchedulingResultPartition getResultPartitionOrThrow(
+ IntermediateResultPartitionID id) {
+ return resultPartitionsById.get(id);
+ }
+
+ void addConsumedPartition(TestingSchedulingResultPartition
consumedPartition) {
+ this.consumedPartitionGroups.add(
+
ConsumedPartitionGroup.fromSinglePartition(consumedPartition.getId()));
+ this.resultPartitionsById.putIfAbsent(consumedPartition.getId(),
consumedPartition);
+ }
+
+ void addConsumedPartitionGroup(
+ ConsumedPartitionGroup consumedResultIdGroup,
Review comment:
`consumedResultIdGroup` -> `consumedPartitionGroup `
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java
##########
@@ -44,4 +46,14 @@
* @return result partition state
*/
ResultPartitionState getState();
+
+ /**
+ * Get the grouped {@link ExecutionVertexID}.
Review comment:
I think "Get the {@link ConsumerVertexGroup}s" is clear enough and is
more accurate.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
##########
@@ -112,4 +117,33 @@ private static CheckpointIDCounter
createCheckpointIdCounter(
CheckpointRecoveryFactory recoveryFactory, JobID jobId) throws
Exception {
return recoveryFactory.createCheckpointIDCounter(jobId);
}
+
+ public static <ID, GROUP extends Iterable<ID>, RESULT> Iterator<RESULT>
createFlattenIterator(
Review comment:
This method looks too common to be part of scheduler. I think it is a
`flatMap` we can add it in `IterableUtils` by changing the name to be more
common:
```
@Internal
public static <K, V, G extends Iterable<K>> Iterable<V> flatMap(
Iterable<G> itemGroups, Function<K, V> mapper) {
return () ->
new Iterator<V>() {
private final Iterator<G> groupIterator =
itemGroups.iterator();
private Iterator<K> itemIterator;
@Override
public boolean hasNext() {
while (itemIterator == null ||
!itemIterator.hasNext()) {
if (!groupIterator.hasNext()) {
return false;
} else {
itemIterator =
groupIterator.next().iterator();
}
}
return true;
}
@Override
public V next() {
if (hasNext()) {
return mapper.apply(itemIterator.next());
} else {
throw new NoSuchElementException();
}
}
};
```
I would also suggest annotating it with `@Internal` because it is in
`flink-core`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java
##########
@@ -43,18 +47,38 @@
private DefaultExecutionVertex producer;
- private final List<DefaultExecutionVertex> consumers;
+ private final List<ConsumerVertexGroup> consumerVertexGroups;
+
+ private final Map<ExecutionVertexID, DefaultExecutionVertex>
executionVertexById;
Review comment:
I think a `executionVertexRetriever` (`Function< ExecutionVertexID ,
DefaultExecutionVertex >`) is better.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
##########
@@ -65,12 +68,20 @@ public void setUp() throws Exception {
Collections.singletonList(schedulingResultPartition),
stateSupplier);
schedulingResultPartition.setProducer(producerVertex);
+
+ List<ConsumedPartitionGroup> consumedPartitionIds =
Review comment:
`consumedPartitionIds` -> `consumedPartitionGroups`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
##########
@@ -37,4 +39,14 @@
* @return state of the execution vertex
*/
ExecutionState getState();
+
+ /**
+ * Get the grouped consumed result partitions.
Review comment:
I think "Get the {@link ConsumedPartitionGroup}s" is clear enough and is
more accurate. One can look into the definition of `ConsumedPartitionGroup` if
interested.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
##########
@@ -97,9 +108,8 @@ public void testGetProducedResultPartitions() {
@Test
public void testGetConsumedResultPartitions() {
IntermediateResultPartitionID partitionIds1 =
- IterableUtils.toStream(consumerVertex.getConsumedResults())
+
IterableUtils.toStream(consumerVertex.getConsumerPartitionGroups().get(0))
Review comment:
I think this change is not needed and it even skip covering the change
to flatten the group.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
##########
@@ -95,7 +128,9 @@ public static TestingSchedulingExecutionVertex
withExecutionVertexID(
public static class Builder {
private JobVertexID jobVertexId = new JobVertexID();
private int subtaskIndex = 0;
- private List<TestingSchedulingResultPartition> partitions = new
ArrayList<>();
+ private final List<ConsumedPartitionGroup> partitions = new
ArrayList<>();
Review comment:
`partitions` -> `consumedPartitionGroups`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java
##########
@@ -18,35 +18,52 @@
package org.apache.flink.runtime.scheduler.adapter;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.function.Supplier;
+import static
org.apache.flink.runtime.scheduler.SchedulerUtils.createFlattenIterator;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Default implementation of {@link SchedulingExecutionVertex}. */
class DefaultExecutionVertex implements SchedulingExecutionVertex {
private final ExecutionVertexID executionVertexId;
- private final List<DefaultResultPartition> consumedResults;
-
private final List<DefaultResultPartition> producedResults;
private final Supplier<ExecutionState> stateSupplier;
+ private final List<ConsumedPartitionGroup> consumedPartitionGroups;
+
+ private final Map<IntermediateResultPartitionID, DefaultResultPartition>
resultPartitionsById;
Review comment:
I think a `resultPartitionRetriever` (`Function<
IntermediateResultPartitionID , DefaultResultPartition>`) is better. It is
unmodifiable and we can avoid introducing the `getResultPartitionOrThrow`
method.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
##########
@@ -104,8 +139,20 @@ Builder withExecutionVertexID(JobVertexID jobVertexId, int
subtaskIndex) {
return this;
}
- public Builder
withConsumedPartitions(List<TestingSchedulingResultPartition> partitions) {
- this.partitions = partitions;
+ public Builder withConsumedPartitions(
Review comment:
`withConsumedPartitions` -> `withConsumedPartitionGroups`
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
##########
@@ -140,6 +143,20 @@ private void addSchedulingResultPartitions(
}
}
+ private void addSchedulingResultPartitions(
+ final Iterable<ConsumedPartitionGroup> resultPartitionGroups,
+ final Function<IntermediateResultPartitionID,
SchedulingResultPartition>
+ resultPartitionById) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
resultPartitionGroups) {
+ for (IntermediateResultPartitionID consumedPartitionId :
consumedPartitionGroup) {
+ schedulingResultPartitions.put(
+ consumedPartitionId,
+ (TestingSchedulingResultPartition)
Review comment:
by changing `Function<IntermediateResultPartitionID,
SchedulingResultPartition>` to `Function<IntermediateResultPartitionID,
TestingSchedulingResultPartition>` this force conversion can be avoided.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
##########
@@ -104,8 +139,20 @@ Builder withExecutionVertexID(JobVertexID jobVertexId, int
subtaskIndex) {
return this;
}
- public Builder
withConsumedPartitions(List<TestingSchedulingResultPartition> partitions) {
- this.partitions = partitions;
+ public Builder withConsumedPartitions(
+ List<ConsumedPartitionGroup> partitions,
Review comment:
`partitions` -> `consumedPartitionGroups`
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
##########
@@ -38,17 +41,24 @@
private TestingSchedulingExecutionVertex producer;
- private Collection<TestingSchedulingExecutionVertex> consumers;
+ private final List<ConsumerVertexGroup> consumerVertexGroups;
+
+ private final Map<ExecutionVertexID, TestingSchedulingExecutionVertex>
executionVerticesById;
private ResultPartitionState state;
TestingSchedulingResultPartition(
- IntermediateDataSetID dataSetID, ResultPartitionType type,
ResultPartitionState state) {
+ IntermediateDataSetID dataSetID,
+ int partitionNum,
Review comment:
why do we need to specify the `partitionNum`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]