zhuzhurk commented on a change in pull request #10278: [FLINK-14735][scheduler] 
Improve scheduling of all-to-all partitions with ALL input constraint for 
legacy scheduler
URL: https://github.com/apache/flink/pull/10278#discussion_r349040805
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
 ##########
 @@ -164,12 +165,56 @@ public void testInputConstraintALL() throws Exception {
                // Inputs constraint not satisfied after failover
                ev11.fail(new Exception());
 
-
                waitUntilJobRestarted(eg);
 
                assertFalse(ev31.checkInputDependencyConstraints());
        }
 
+       @Test
+       public void testInputConstraintALLPerformance() throws Exception {
+               final int parallelism = 1000;
+               final JobVertex v1 = 
createVertexWithAllInputConstraints("vertex1", parallelism);
+               final JobVertex v2 = 
createVertexWithAllInputConstraints("vertex2", parallelism);
+               final JobVertex v3 = 
createVertexWithAllInputConstraints("vertex3", parallelism);
+               v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+               v2.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+
+               final ExecutionGraph eg = 
createExecutionGraph(Arrays.asList(v1, v2, v3), InputDependencyConstraint.ALL, 
3000);
+
+               eg.start(mainThreadExecutor);
+               eg.scheduleForExecution();
+
+               for (int i = 0; i < parallelism - 1; i++) {
+                       finishSubtask(eg, v1.getID(), i);
+               }
+
+               final long startTime = System.nanoTime();
+               finishSubtask(eg, v1.getID(), parallelism - 1);
+
+               final Duration duration = Duration.ofNanos(System.nanoTime() - 
startTime);
+               final Duration timeout = Duration.ofSeconds(5);
+
+               assertThat(duration, lessThan(timeout));
+       }
+
+       private static JobVertex createVertexWithAllInputConstraints(String 
name, int parallelism) {
+               final JobVertex v = new JobVertex(name);
+               v.setParallelism(parallelism);
+               v.setInvokableClass(AbstractInvokable.class);
+               v.setInputDependencyConstraint(InputDependencyConstraint.ALL);
+               return v;
+       }
+
+       private static void finishSubtask(ExecutionGraph graph, JobVertexID 
jvId, int subtask) {
+               final ExecutionVertex[] vertices = 
graph.getJobVertex(jvId).getTaskVertices();
 
 Review comment:
   Nit: Since `vertices` is only used for getting the `ExecutionAttemptID`, how 
about to directly get the `ExecutionAttemptID` in this statement?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to