zhuzhurk commented on a change in pull request #15310:
URL: https://github.com/apache/flink/pull/15310#discussion_r602884482
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
##########
@@ -19,30 +19,51 @@
package org.apache.flink.runtime.scheduler.adapter;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.util.Preconditions;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
/** Default implementation of {@link SchedulingPipelinedRegion}. */
public class DefaultSchedulingPipelinedRegion implements
SchedulingPipelinedRegion {
private final Map<ExecutionVertexID, DefaultExecutionVertex>
executionVertices;
- private Set<DefaultResultPartition> consumedResults;
+ private List<ConsumedPartitionGroup> blockingConsumedPartitionGroups;
+
+ private final Function<IntermediateResultPartitionID,
DefaultResultPartition>
+ resultPartitionRetriever;
+
+ public DefaultSchedulingPipelinedRegion(
+ Set<DefaultExecutionVertex> defaultExecutionVertices,
+ Function<IntermediateResultPartitionID, DefaultResultPartition>
+ resultPartitionRetriever) {
- public DefaultSchedulingPipelinedRegion(Set<DefaultExecutionVertex>
defaultExecutionVertices) {
Preconditions.checkNotNull(defaultExecutionVertices);
this.executionVertices = new HashMap<>();
for (DefaultExecutionVertex executionVertex :
defaultExecutionVertices) {
this.executionVertices.put(executionVertex.getId(),
executionVertex);
}
+
+ this.resultPartitionRetriever = resultPartitionRetriever;
+ }
+
+ @VisibleForTesting
+ public DefaultSchedulingPipelinedRegion(Set<DefaultExecutionVertex>
defaultExecutionVertices) {
Review comment:
Can we avoid introducing such a testing method? I also think
`resultPartitionRetriever` should never be `null`
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
##########
@@ -47,12 +47,16 @@
private ResultPartitionState state;
- TestingSchedulingResultPartition(
- IntermediateDataSetID dataSetID, ResultPartitionType type,
ResultPartitionState state) {
+ private TestingSchedulingResultPartition(
+ IntermediateDataSetID dataSetID,
+ int partitionNum,
Review comment:
Which tests will be broken if not doing this change?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -49,16 +50,21 @@
private final DeploymentOption deploymentOption = new
DeploymentOption(false);
- /** Result partitions are correlated if they have the same result id. */
- private final Map<IntermediateDataSetID, Set<SchedulingResultPartition>>
- correlatedResultPartitions = new HashMap<>();
+ /** ConsumedPartitionGroups are correlated if they have the same result
id. */
+ private final Map<IntermediateDataSetID, Set<ConsumedPartitionGroup>>
+ correlatedResultPartitionGroups = new HashMap<>();
- private final Map<IntermediateResultPartitionID,
Set<SchedulingPipelinedRegion>>
- partitionConsumerRegions = new HashMap<>();
+ /** External consumer regions of each ConsumedPartitionGroup. */
+ private final Map<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>>
+ partitionGroupConsumerRegions = new HashMap<>();
Review comment:
better to use `IdentityHashMap`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
##########
@@ -74,7 +74,10 @@ private void initRegionExecutionViewByVertex() {
if (regionExecutionView.isFinished()) {
final SchedulingPipelinedRegion pipelinedRegion =
schedulingTopology.getPipelinedRegionOfVertex(finishedVertex);
- return
filterReleasablePartitions(pipelinedRegion.getConsumedResults());
+ return filterReleasablePartitions(
Review comment:
I think the input of `filterReleasablePartitions` should be changed to a
`Set` to ensure there is no duplicated element. Or at least we should add a
`unique()` filter in the implementation of `filterReleasablePartitions`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -49,16 +50,21 @@
private final DeploymentOption deploymentOption = new
DeploymentOption(false);
- /** Result partitions are correlated if they have the same result id. */
- private final Map<IntermediateDataSetID, Set<SchedulingResultPartition>>
- correlatedResultPartitions = new HashMap<>();
+ /** ConsumedPartitionGroups are correlated if they have the same result
id. */
+ private final Map<IntermediateDataSetID, Set<ConsumedPartitionGroup>>
+ correlatedResultPartitionGroups = new HashMap<>();
- private final Map<IntermediateResultPartitionID,
Set<SchedulingPipelinedRegion>>
- partitionConsumerRegions = new HashMap<>();
+ /** External consumer regions of each ConsumedPartitionGroup. */
+ private final Map<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>>
+ partitionGroupConsumerRegions = new HashMap<>();
private final Map<SchedulingPipelinedRegion, List<ExecutionVertexID>>
regionVerticesSorted =
new IdentityHashMap<>();
+ /** The ConsumedPartitionGroups which have inter-region producers and
inter-region producers. */
+ private final Set<ConsumedPartitionGroup>
crossRegionConsumerPartitionGroups =
Review comment:
`crossRegionConsumerPartitionGroups` ->
`crossRegionConsumedPartitionGroups `
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
}
}
+ private void initCrossRegionConsumerPartitionGroups() {
+ Set<ConsumedPartitionGroup> visitedPartitionGroups =
+ Collections.newSetFromMap(new IdentityHashMap<>());
+
+ for (SchedulingPipelinedRegion pipelinedRegion :
+ schedulingTopology.getAllPipelinedRegions()) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+ if (!visitedPartitionGroups.contains(consumedPartitionGroup)) {
+ visitedPartitionGroups.add(consumedPartitionGroup);
+
+ SchedulingPipelinedRegion producerRegion = null;
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ SchedulingPipelinedRegion region =
getProducerRegion(partitionId);
+ if (producerRegion == null) {
+ producerRegion = region;
+ } else if (producerRegion != region) {
+
crossRegionConsumerPartitionGroups.add(consumedPartitionGroup);
Review comment:
break;
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
}
}
+ private void initCrossRegionConsumerPartitionGroups() {
+ Set<ConsumedPartitionGroup> visitedPartitionGroups =
+ Collections.newSetFromMap(new IdentityHashMap<>());
+
+ for (SchedulingPipelinedRegion pipelinedRegion :
+ schedulingTopology.getAllPipelinedRegions()) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+ if (!visitedPartitionGroups.contains(consumedPartitionGroup)) {
+ visitedPartitionGroups.add(consumedPartitionGroup);
+
+ SchedulingPipelinedRegion producerRegion = null;
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ SchedulingPipelinedRegion region =
getProducerRegion(partitionId);
+ if (producerRegion == null) {
+ producerRegion = region;
+ } else if (producerRegion != region) {
+
crossRegionConsumerPartitionGroups.add(consumedPartitionGroup);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private SchedulingPipelinedRegion
getProducerRegion(IntermediateResultPartitionID partitionId) {
+ return schedulingTopology.getPipelinedRegionOfVertex(
+
schedulingTopology.getResultPartition(partitionId).getProducer().getId());
+ }
+
+ private void initPartitionGroupConsumerRegions() {
+ for (SchedulingPipelinedRegion region :
schedulingTopology.getAllPipelinedRegions()) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ region.getAllBlockingConsumedPartitionGroups()) {
+ if
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)
+ ||
isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
+ partitionGroupConsumerRegions
+ .computeIfAbsent(consumedPartitionGroup, group ->
new HashSet<>())
+ .add(region);
+ }
+ }
+ }
+ }
+
+ private void initCorrelatedResultPartitionGroups() {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ partitionGroupConsumerRegions.keySet()) {
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ correlatedResultPartitionGroups
+ .computeIfAbsent(
+ partitionId.getIntermediateDataSetID(), id ->
new HashSet<>())
+ .add(consumedPartitionGroup);
+ }
+ }
+ }
+
@Override
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions =
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
- .filter(region ->
!region.getConsumedResults().iterator().hasNext())
+ .filter(this::noExternalBlockingConsumedPartitions)
.collect(Collectors.toSet());
maybeScheduleRegions(sourceRegions);
}
+ private boolean
noExternalBlockingConsumedPartitions(SchedulingPipelinedRegion region) {
Review comment:
I would prefer this method to be `isSourceRegion`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java
##########
@@ -23,10 +23,19 @@
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.topology.PipelinedRegion;
+import java.util.List;
+
/** Pipelined region on execution level, i.e., {@link ExecutionGraph} level. */
public interface SchedulingPipelinedRegion
extends PipelinedRegion<
ExecutionVertexID,
IntermediateResultPartitionID,
SchedulingExecutionVertex,
- SchedulingResultPartition> {}
+ SchedulingResultPartition> {
+ /**
+ * Get all blocking {@link ConsumedPartitionGroup}s.
+ *
+ * @return list of {@link ConsumedPartitionGroup}s
+ */
+ List<ConsumedPartitionGroup> getAllBlockingConsumedPartitionGroups();
Review comment:
I would suggest this to be an `Iterable`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
}
}
+ private void initCrossRegionConsumerPartitionGroups() {
Review comment:
`initCrossRegionConsumerPartitionGroups` ->
`initCrossRegionConsumedPartitionGroups `
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -49,16 +50,21 @@
private final DeploymentOption deploymentOption = new
DeploymentOption(false);
- /** Result partitions are correlated if they have the same result id. */
- private final Map<IntermediateDataSetID, Set<SchedulingResultPartition>>
- correlatedResultPartitions = new HashMap<>();
+ /** ConsumedPartitionGroups are correlated if they have the same result
id. */
+ private final Map<IntermediateDataSetID, Set<ConsumedPartitionGroup>>
+ correlatedResultPartitionGroups = new HashMap<>();
- private final Map<IntermediateResultPartitionID,
Set<SchedulingPipelinedRegion>>
- partitionConsumerRegions = new HashMap<>();
+ /** External consumer regions of each ConsumedPartitionGroup. */
+ private final Map<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>>
+ partitionGroupConsumerRegions = new HashMap<>();
private final Map<SchedulingPipelinedRegion, List<ExecutionVertexID>>
regionVerticesSorted =
new IdentityHashMap<>();
+ /** The ConsumedPartitionGroups which have inter-region producers and
inter-region producers. */
Review comment:
`which have inter-region producers and inter-region producers.` ->
`which are produced by multiple regions.`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
##########
@@ -19,30 +19,51 @@
package org.apache.flink.runtime.scheduler.adapter;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.util.Preconditions;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
/** Default implementation of {@link SchedulingPipelinedRegion}. */
public class DefaultSchedulingPipelinedRegion implements
SchedulingPipelinedRegion {
private final Map<ExecutionVertexID, DefaultExecutionVertex>
executionVertices;
- private Set<DefaultResultPartition> consumedResults;
+ private List<ConsumedPartitionGroup> blockingConsumedPartitionGroups;
+
+ private final Function<IntermediateResultPartitionID,
DefaultResultPartition>
+ resultPartitionRetriever;
+
+ public DefaultSchedulingPipelinedRegion(
+ Set<DefaultExecutionVertex> defaultExecutionVertices,
+ Function<IntermediateResultPartitionID, DefaultResultPartition>
+ resultPartitionRetriever) {
- public DefaultSchedulingPipelinedRegion(Set<DefaultExecutionVertex>
defaultExecutionVertices) {
Preconditions.checkNotNull(defaultExecutionVertices);
this.executionVertices = new HashMap<>();
for (DefaultExecutionVertex executionVertex :
defaultExecutionVertices) {
this.executionVertices.put(executionVertex.getId(),
executionVertex);
}
+
+ this.resultPartitionRetriever = resultPartitionRetriever;
Review comment:
checkNotNull
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java
##########
@@ -30,15 +33,24 @@
private final Map<ExecutionVertexID, TestingSchedulingExecutionVertex>
regionVertices =
new HashMap<>();
- private final Set<TestingSchedulingResultPartition> consumedPartitions =
new HashSet<>();
+ private final List<ConsumedPartitionGroup> consumedPartitionGroups = new
ArrayList<>();
+
+ private final Map<IntermediateResultPartitionID,
TestingSchedulingResultPartition>
Review comment:
Is it needed as a field?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java
##########
@@ -30,15 +33,24 @@
private final Map<ExecutionVertexID, TestingSchedulingExecutionVertex>
regionVertices =
new HashMap<>();
- private final Set<TestingSchedulingResultPartition> consumedPartitions =
new HashSet<>();
+ private final List<ConsumedPartitionGroup> consumedPartitionGroups = new
ArrayList<>();
Review comment:
Could we make it a `Set` instead of a `List`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
##########
@@ -60,23 +81,33 @@ public DefaultExecutionVertex getVertex(final
ExecutionVertexID vertexId) {
return executionVertex;
}
- @Override
- public Iterable<DefaultResultPartition> getConsumedResults() {
- if (consumedResults == null) {
- initializeConsumedResults();
- }
- return consumedResults;
- }
-
- private void initializeConsumedResults() {
- final Set<DefaultResultPartition> consumedResults = new HashSet<>();
+ private void initializeAllBlockingConsumedPartitionGroups() {
+ final Set<ConsumedPartitionGroup> consumedResultGroupSet = new
HashSet<>();
for (DefaultExecutionVertex executionVertex :
executionVertices.values()) {
- for (DefaultResultPartition resultPartition :
executionVertex.getConsumedResults()) {
- if
(!executionVertices.containsKey(resultPartition.getProducer().getId())) {
- consumedResults.add(resultPartition);
+ for (ConsumedPartitionGroup consumedResultGroup :
+ executionVertex.getConsumedPartitionGroups()) {
+ SchedulingResultPartition resultPartition =
+
resultPartitionRetriever.apply(consumedResultGroup.getFirst());
+ if (resultPartition.getResultType().isBlocking()) {
+ consumedResultGroupSet.add(consumedResultGroup);
}
}
}
- this.consumedResults = Collections.unmodifiableSet(consumedResults);
+
+ this.blockingConsumedPartitionGroups =
Review comment:
There is no need to convert the `Set` to a `List`. I think
`blockingConsumedPartitionGroups` should just be a `Set`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java
##########
@@ -31,7 +31,7 @@
import java.util.stream.Collectors;
/** Utils for {@link SchedulingStrategy}. */
-class SchedulingStrategyUtils {
+public class SchedulingStrategyUtils {
Review comment:
There is no need to do this change
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -175,15 +246,50 @@ private void maybeScheduleRegion(final
SchedulingPipelinedRegion region) {
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
}
- private boolean areRegionInputsAllConsumable(final
SchedulingPipelinedRegion region) {
- for (SchedulingResultPartition partition :
region.getConsumedResults()) {
- if (partition.getState() != ResultPartitionState.CONSUMABLE) {
+ private boolean areRegionInputsAllConsumable(
+ final SchedulingPipelinedRegion region,
+ final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ region.getAllBlockingConsumedPartitionGroups()) {
+ if
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)) {
+ if
(!isCrossRegionConsumedPartitionConsumable(consumedPartitionGroup, region)) {
+ return false;
+ }
+ } else if
(isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
+ if (!consumableStatusCache.computeIfAbsent(
+ consumedPartitionGroup,
this::isConsumedPartitionGroupConsumable)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private boolean isConsumedPartitionGroupConsumable(
+ final ConsumedPartitionGroup consumedPartitionGroup) {
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ if (schedulingTopology.getResultPartition(partitionId).getState()
+ != ResultPartitionState.CONSUMABLE) {
return false;
}
}
return true;
}
+ private boolean isCrossRegionConsumedPartitionConsumable(
+ final ConsumedPartitionGroup consumedPartitionGroup,
+ final SchedulingPipelinedRegion pipelinedRegion) {
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ if (isExternalConsumedPartition(partitionId, pipelinedRegion)) {
Review comment:
It's better to merge the two nested `if`s into one.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
}
}
+ private void initCrossRegionConsumerPartitionGroups() {
+ Set<ConsumedPartitionGroup> visitedPartitionGroups =
+ Collections.newSetFromMap(new IdentityHashMap<>());
+
+ for (SchedulingPipelinedRegion pipelinedRegion :
+ schedulingTopology.getAllPipelinedRegions()) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+ if (!visitedPartitionGroups.contains(consumedPartitionGroup)) {
+ visitedPartitionGroups.add(consumedPartitionGroup);
+
+ SchedulingPipelinedRegion producerRegion = null;
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ SchedulingPipelinedRegion region =
getProducerRegion(partitionId);
+ if (producerRegion == null) {
+ producerRegion = region;
+ } else if (producerRegion != region) {
+
crossRegionConsumerPartitionGroups.add(consumedPartitionGroup);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private SchedulingPipelinedRegion
getProducerRegion(IntermediateResultPartitionID partitionId) {
+ return schedulingTopology.getPipelinedRegionOfVertex(
+
schedulingTopology.getResultPartition(partitionId).getProducer().getId());
+ }
+
+ private void initPartitionGroupConsumerRegions() {
+ for (SchedulingPipelinedRegion region :
schedulingTopology.getAllPipelinedRegions()) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ region.getAllBlockingConsumedPartitionGroups()) {
+ if
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)
+ ||
isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
+ partitionGroupConsumerRegions
+ .computeIfAbsent(consumedPartitionGroup, group ->
new HashSet<>())
+ .add(region);
+ }
+ }
+ }
+ }
+
+ private void initCorrelatedResultPartitionGroups() {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ partitionGroupConsumerRegions.keySet()) {
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ correlatedResultPartitionGroups
+ .computeIfAbsent(
+ partitionId.getIntermediateDataSetID(), id ->
new HashSet<>())
+ .add(consumedPartitionGroup);
+ }
+ }
+ }
+
@Override
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions =
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
- .filter(region ->
!region.getConsumedResults().iterator().hasNext())
+ .filter(this::noExternalBlockingConsumedPartitions)
.collect(Collectors.toSet());
maybeScheduleRegions(sourceRegions);
}
+ private boolean
noExternalBlockingConsumedPartitions(SchedulingPipelinedRegion region) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ region.getAllBlockingConsumedPartitionGroups()) {
+ if
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)
Review comment:
It is no correct here. If a region has cross-region
`ConsumerPartitionGroups` but has no external `ConsumerPartitionGroup`, it
should also be treated as a source region.
I think we need to add a test case for it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]