StephanEwen commented on a change in pull request #10278:
[FLINK-14735][scheduler] Improve scheduling of all-to-all partitions with ALL
input constraint for legacy scheduler
URL: https://github.com/apache/flink/pull/10278#discussion_r349061322
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
##########
@@ -164,12 +165,56 @@ public void testInputConstraintALL() throws Exception {
// Inputs constraint not satisfied after failover
ev11.fail(new Exception());
-
waitUntilJobRestarted(eg);
assertFalse(ev31.checkInputDependencyConstraints());
}
+ @Test
+ public void testInputConstraintALLPerformance() throws Exception {
+ final int parallelism = 1000;
+ final JobVertex v1 =
createVertexWithAllInputConstraints("vertex1", parallelism);
+ final JobVertex v2 =
createVertexWithAllInputConstraints("vertex2", parallelism);
+ final JobVertex v3 =
createVertexWithAllInputConstraints("vertex3", parallelism);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING);
+ v2.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING);
+
+ final ExecutionGraph eg =
createExecutionGraph(Arrays.asList(v1, v2, v3), InputDependencyConstraint.ALL,
3000);
+
+ eg.start(mainThreadExecutor);
+ eg.scheduleForExecution();
+
+ for (int i = 0; i < parallelism - 1; i++) {
+ finishSubtask(eg, v1.getID(), i);
+ }
+
+ final long startTime = System.nanoTime();
+ finishSubtask(eg, v1.getID(), parallelism - 1);
+
+ final Duration duration = Duration.ofNanos(System.nanoTime() -
startTime);
+ final Duration timeout = Duration.ofSeconds(5);
+
+ assertThat(duration, lessThan(timeout));
+ }
+
+ private static JobVertex createVertexWithAllInputConstraints(String
name, int parallelism) {
+ final JobVertex v = new JobVertex(name);
+ v.setParallelism(parallelism);
+ v.setInvokableClass(AbstractInvokable.class);
+ v.setInputDependencyConstraint(InputDependencyConstraint.ALL);
+ return v;
+ }
+
+ private static void finishSubtask(ExecutionGraph graph, JobVertexID
jvId, int subtask) {
+ final ExecutionVertex[] vertices =
graph.getJobVertex(jvId).getTaskVertices();
Review comment:
I think that makes the call sites a bit less readable. I would keep it like
it is for now, tbh.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services