This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 5b90c075bf9 [FLINK-31144][coordination] Ignore the input locations of
a ConsumePartitionGroup if it has too many consumers
5b90c075bf9 is described below
commit 5b90c075bf9b987de2fa4a02e0cf63602152eba1
Author: JunRuiLee <[email protected]>
AuthorDate: Wed Mar 8 10:57:30 2023 +0800
[FLINK-31144][coordination] Ignore the input locations of a
ConsumePartitionGroup if it has too many consumers
This closes #22098.
---
.../DefaultPreferredLocationsRetriever.java | 8 +++---
.../DefaultPreferredLocationsRetrieverTest.java | 30 +++++++---------------
2 files changed, 12 insertions(+), 26 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java
index f3b71366682..d5f0f8dae65 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java
@@ -92,11 +92,9 @@ public class DefaultPreferredLocationsRetriever implements
PreferredLocationsRet
inputsLocationsRetriever.getConsumedPartitionGroups(executionVertexId);
for (ConsumedPartitionGroup consumedPartitionGroup :
consumedPartitionGroups) {
// Ignore the location of a consumed partition group if it has too
many distinct
- // consumers compared to the consumed partition group size. This
is to avoid tasks
- // unevenly distributed on nodes when running batch jobs or
running jobs in
- // session/standalone mode.
- if ((double) consumedPartitionGroup.getConsumerVertexGroup().size()
- / consumedPartitionGroup.size()
+ // consumers. This is to avoid tasks unevenly distributed on nodes
when running batch
+ // jobs or running jobs in session/standalone mode.
+ if (consumedPartitionGroup.getConsumerVertexGroup().size()
> MAX_DISTINCT_CONSUMERS_TO_CONSIDER) {
continue;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetrieverTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetrieverTest.java
index 6fda52dc173..7409e297af3 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetrieverTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetrieverTest.java
@@ -71,26 +71,14 @@ class DefaultPreferredLocationsRetrieverTest {
@Test
void testInputLocations() {
- {
- final List<TaskManagerLocation> producerLocations =
- Collections.singletonList(new LocalTaskManagerLocation());
- testInputLocationsInternal(
- 1,
- MAX_DISTINCT_CONSUMERS_TO_CONSIDER,
- producerLocations,
- producerLocations,
- Collections.emptySet());
- }
- {
- final List<TaskManagerLocation> producerLocations =
- Arrays.asList(new LocalTaskManagerLocation(), new
LocalTaskManagerLocation());
- testInputLocationsInternal(
- 2,
- MAX_DISTINCT_CONSUMERS_TO_CONSIDER * 2,
- producerLocations,
- producerLocations,
- Collections.emptySet());
- }
+ final List<TaskManagerLocation> producerLocations =
+ Collections.singletonList(new LocalTaskManagerLocation());
+ testInputLocationsInternal(
+ 1,
+ MAX_DISTINCT_CONSUMERS_TO_CONSIDER,
+ producerLocations,
+ producerLocations,
+ Collections.emptySet());
}
@Test
@@ -101,7 +89,7 @@ class DefaultPreferredLocationsRetrieverTest {
@Test
void testInputLocationsIgnoresEdgeOfTooManyConsumers() {
testNoPreferredInputLocationsInternal(1,
MAX_DISTINCT_CONSUMERS_TO_CONSIDER + 1);
- testNoPreferredInputLocationsInternal(2,
MAX_DISTINCT_CONSUMERS_TO_CONSIDER * 2 + 1);
+ testNoPreferredInputLocationsInternal(2,
MAX_DISTINCT_CONSUMERS_TO_CONSIDER + 1);
}
@Test