This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 5b90c075bf9 [FLINK-31144][coordination] Ignore the input locations of 
a ConsumePartitionGroup if it has too many consumers
5b90c075bf9 is described below

commit 5b90c075bf9b987de2fa4a02e0cf63602152eba1
Author: JunRuiLee <[email protected]>
AuthorDate: Wed Mar 8 10:57:30 2023 +0800

    [FLINK-31144][coordination] Ignore the input locations of a 
ConsumePartitionGroup if it has too many consumers
    
    This closes #22098.
---
 .../DefaultPreferredLocationsRetriever.java        |  8 +++---
 .../DefaultPreferredLocationsRetrieverTest.java    | 30 +++++++---------------
 2 files changed, 12 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java
index f3b71366682..d5f0f8dae65 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java
@@ -92,11 +92,9 @@ public class DefaultPreferredLocationsRetriever implements 
PreferredLocationsRet
                 
inputsLocationsRetriever.getConsumedPartitionGroups(executionVertexId);
         for (ConsumedPartitionGroup consumedPartitionGroup : 
consumedPartitionGroups) {
             // Ignore the location of a consumed partition group if it has too 
many distinct
-            // consumers compared to the consumed partition group size. This 
is to avoid tasks
-            // unevenly distributed on nodes when running batch jobs or 
running jobs in
-            // session/standalone mode.
-            if ((double) consumedPartitionGroup.getConsumerVertexGroup().size()
-                            / consumedPartitionGroup.size()
+            // consumers. This is to avoid tasks unevenly distributed on nodes 
when running batch
+            // jobs or running jobs in session/standalone mode.
+            if (consumedPartitionGroup.getConsumerVertexGroup().size()
                     > MAX_DISTINCT_CONSUMERS_TO_CONSIDER) {
                 continue;
             }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetrieverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetrieverTest.java
index 6fda52dc173..7409e297af3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetrieverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetrieverTest.java
@@ -71,26 +71,14 @@ class DefaultPreferredLocationsRetrieverTest {
 
     @Test
     void testInputLocations() {
-        {
-            final List<TaskManagerLocation> producerLocations =
-                    Collections.singletonList(new LocalTaskManagerLocation());
-            testInputLocationsInternal(
-                    1,
-                    MAX_DISTINCT_CONSUMERS_TO_CONSIDER,
-                    producerLocations,
-                    producerLocations,
-                    Collections.emptySet());
-        }
-        {
-            final List<TaskManagerLocation> producerLocations =
-                    Arrays.asList(new LocalTaskManagerLocation(), new 
LocalTaskManagerLocation());
-            testInputLocationsInternal(
-                    2,
-                    MAX_DISTINCT_CONSUMERS_TO_CONSIDER * 2,
-                    producerLocations,
-                    producerLocations,
-                    Collections.emptySet());
-        }
+        final List<TaskManagerLocation> producerLocations =
+                Collections.singletonList(new LocalTaskManagerLocation());
+        testInputLocationsInternal(
+                1,
+                MAX_DISTINCT_CONSUMERS_TO_CONSIDER,
+                producerLocations,
+                producerLocations,
+                Collections.emptySet());
     }
 
     @Test
@@ -101,7 +89,7 @@ class DefaultPreferredLocationsRetrieverTest {
     @Test
     void testInputLocationsIgnoresEdgeOfTooManyConsumers() {
         testNoPreferredInputLocationsInternal(1, 
MAX_DISTINCT_CONSUMERS_TO_CONSIDER + 1);
-        testNoPreferredInputLocationsInternal(2, 
MAX_DISTINCT_CONSUMERS_TO_CONSIDER * 2 + 1);
+        testNoPreferredInputLocationsInternal(2, 
MAX_DISTINCT_CONSUMERS_TO_CONSIDER + 1);
     }
 
     @Test

Reply via email to