zentol commented on a change in pull request #6883: [FLINK-10610] [tests] Port
slot sharing cases to new codebase
URL: https://github.com/apache/flink/pull/6883#discussion_r254204242
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
##########
@@ -317,6 +318,96 @@ public void testSchedulingAllAtOnce() throws Exception {
}
}
+
+
+ /**
+ * This job runs in N slots with N senders and N receivers.
+ * Unless slot sharing is used, it cannot complete.
+ */
+ @Test
+ public void testSlotSharingForForwardJob() throws Exception {
+ final int parallelism = 11;
+
+ final MiniClusterConfiguration cfg = new
MiniClusterConfiguration.Builder()
+ .setNumTaskManagers(1)
+ .setNumSlotsPerTaskManager(parallelism)
+ .setConfiguration(getDefaultConfiguration())
+ .build();
+
+ try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+ miniCluster.start();
+
+ final JobVertex sender = new JobVertex("Sender");
+ sender.setInvokableClass(CountDownLatchedSender.class);
+ sender.setParallelism(parallelism);
+
+ final JobVertex receiver = new JobVertex("Receiver");
+
receiver.setInvokableClass(CountDownLatchedReceiver.class);
+ receiver.setParallelism(parallelism);
+
+ receiver.connectNewDataSetAsInput(sender,
DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED);
+
+ final CountDownLatch countDownLatch = new
CountDownLatch(parallelism);
+ CountDownLatchedSender.setLatch(countDownLatch);
+ CountDownLatchedReceiver.setLatch(countDownLatch);
+
+ final SlotSharingGroup sharingGroup = new
SlotSharingGroup(sender.getID(), receiver.getID());
+ sender.setSlotSharingGroup(sharingGroup);
+ receiver.setSlotSharingGroup(sharingGroup);
+ receiver.setStrictlyCoLocatedWith(sender);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job",
sender, receiver);
+
+ miniCluster.executeJobBlocking(jobGraph);
+ }
+ }
+
+ @Test
+ public void testSlotSharingForTwoInputsJob() throws Exception {
Review comment:
Add javadoc, similarly to the one above.
```
/**
* This job runs in N slots with 2 * N senders and N receivers. Unless
slot sharing is used,
* it cannot complete.
*/
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services