This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


View the commit online:
https://github.com/apache/flink/commit/93dfdd05a84f933473c7b22437e12c03239f9462

The following commit(s) were added to refs/heads/master by this push:
     new 93dfdd0  [FLINK-14735][scheduler] Improve scheduling of all-to-all 
partitions with ALL input constraint for new scheduler
93dfdd0 is described below

commit 93dfdd05a84f933473c7b22437e12c03239f9462
Author: Zhu Zhu <[email protected]>
AuthorDate: Fri Nov 15 14:58:31 2019 +0800

    [FLINK-14735][scheduler] Improve scheduling of all-to-all partitions with 
ALL input constraint for new scheduler
    
    This avoids the quadratic complexity in the legacy scheduler when checking 
the availability of the input once
    the successor task becomes ready for scheduling.
---
 .../flink/runtime/executiongraph/Execution.java    |  5 ++-
 .../runtime/scheduler/DefaultSchedulerTest.java    | 46 +++++++++++++++++++++-
 2 files changed, 48 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 3392417..6a1118d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -881,8 +881,9 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                // at least one of the consumer vertex's inputs 
is consumable here. This is to avoid the
                                // O(N) complexity introduced by input 
constraint check for InputDependencyConstraint.ANY,
                                // as we do not want the default scheduling 
performance to be affected.
-                               if 
(consumerVertex.getInputDependencyConstraint() == InputDependencyConstraint.ANY 
||
-                                               
consumerVertex.checkInputDependencyConstraints()) {
+                               if (isLegacyScheduling() &&
+                                       
(consumerVertex.getInputDependencyConstraint() == InputDependencyConstraint.ANY 
||
+                                               
consumerVertex.checkInputDependencyConstraints())) {
                                        scheduleConsumer(consumerVertex);
                                }
                        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 46f99f0..a1b5ee3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -31,6 +32,7 @@ import 
org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
@@ -45,6 +47,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
@@ -72,6 +75,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -88,6 +92,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -497,6 +502,46 @@ public class DefaultSchedulerTest extends TestLogger {
                assertThat(deployedExecutionVertices, 
contains(executionVertexId, executionVertexId));
        }
 
+       @Test
+       public void testInputConstraintALLPerf() throws Exception {
+               final int parallelism = 1000;
+               final JobVertex v1 = 
createVertexWithAllInputConstraints("vertex1", parallelism);
+               final JobVertex v2 = 
createVertexWithAllInputConstraints("vertex2", parallelism);
+               final JobVertex v3 = 
createVertexWithAllInputConstraints("vertex3", parallelism);
+               v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+               v2.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+
+               final JobGraph jobGraph = new JobGraph(v1, v2, v3);
+               final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+               final AccessExecutionJobVertex ejv1 = 
scheduler.requestJob().getAllVertices().get(v1.getID());
+
+               for (int i = 0; i < parallelism - 1; i++) {
+                       finishSubtask(scheduler, ejv1, i);
+               }
+
+               final long startTime = System.nanoTime();
+               finishSubtask(scheduler, ejv1, parallelism - 1);
+
+               final Duration duration = Duration.ofNanos(System.nanoTime() - 
startTime);
+               final Duration timeout = Duration.ofSeconds(5);
+
+               assertThat(duration, lessThan(timeout));
+       }
+
+       private static JobVertex createVertexWithAllInputConstraints(String 
name, int parallelism) {
+               final JobVertex v = new JobVertex(name);
+               v.setParallelism(parallelism);
+               v.setInvokableClass(AbstractInvokable.class);
+               v.setInputDependencyConstraint(InputDependencyConstraint.ALL);
+               return v;
+       }
+
+       private static void finishSubtask(DefaultScheduler scheduler, 
AccessExecutionJobVertex vertex, int subtask) {
+               final ExecutionAttemptID attemptId = 
vertex.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
+               scheduler.updateTaskExecutionState(
+                       new 
TaskExecutionState(scheduler.getJobGraph().getJobID(), attemptId, 
ExecutionState.FINISHED));
+       }
+
        private void acknowledgePendingCheckpoint(final SchedulerBase 
scheduler, final long checkpointId) throws Exception {
                final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
 
@@ -634,5 +679,4 @@ public class DefaultSchedulerTest extends TestLogger {
                
scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
                scheduler.startScheduling();
        }
-
 }

Reply via email to