TisonKun commented on a change in pull request #6883: [FLINK-10610] [tests]
Port slot sharing cases to new codebase
URL: https://github.com/apache/flink/pull/6883#discussion_r254611464
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
##########
@@ -317,6 +318,96 @@ public void testSchedulingAllAtOnce() throws Exception {
}
}
+
+
+ /**
+ * This job runs in N slots with N senders and N receivers.
+ * Unless slot sharing is used, it cannot complete.
+ */
+ @Test
+ public void testSlotSharingForForwardJob() throws Exception {
+ final int parallelism = 11;
+
+ final MiniClusterConfiguration cfg = new
MiniClusterConfiguration.Builder()
+ .setNumTaskManagers(1)
+ .setNumSlotsPerTaskManager(parallelism)
+ .setConfiguration(getDefaultConfiguration())
+ .build();
+
+ try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+ miniCluster.start();
+
+ final JobVertex sender = new JobVertex("Sender");
+ sender.setInvokableClass(CountDownLatchedSender.class);
+ sender.setParallelism(parallelism);
+
+ final JobVertex receiver = new JobVertex("Receiver");
+
receiver.setInvokableClass(CountDownLatchedReceiver.class);
+ receiver.setParallelism(parallelism);
+
+ receiver.connectNewDataSetAsInput(sender,
DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED);
+
+ final CountDownLatch countDownLatch = new
CountDownLatch(parallelism);
+ CountDownLatchedSender.setLatch(countDownLatch);
+ CountDownLatchedReceiver.setLatch(countDownLatch);
+
+ final SlotSharingGroup sharingGroup = new
SlotSharingGroup(sender.getID(), receiver.getID());
+ sender.setSlotSharingGroup(sharingGroup);
+ receiver.setSlotSharingGroup(sharingGroup);
+ receiver.setStrictlyCoLocatedWith(sender);
Review comment:
Previously, we only have slot-sharing with colocation constraints in
`CoLocationConstraintITCase.scala`. For test coverage, should I add a test case
that nearly same as this one except without the statement
`receiver.setStrictlyCoLocatedWith(sender)`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services