zentol commented on a change in pull request #6883: [FLINK-10610] [tests] Port
slot sharing cases to new codebase
URL: https://github.com/apache/flink/pull/6883#discussion_r255051998
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
##########
@@ -337,6 +346,264 @@ public void testSchedulingAllAtOnce() throws Exception {
}
}
+ @Test
+ public void testSlotSharingForForwardJobWithCoLocationConstraint()
throws Exception {
+ testSlotSharingForForwardJob(true);
+ }
+
+
+ @Test
+ public void testSlotSharingForForwardJobWithoutCoLocationConstraint()
throws Exception {
+ testSlotSharingForForwardJob(false);
+ }
+
+ /**
+ * This job runs in N slots with N senders and N receivers.
+ * Unless slot sharing is used, it cannot complete.
+ * Either with or without co-location constraint should not
+ * make difference.
+ */
+ private synchronized void testSlotSharingForForwardJob(boolean
withCoLocationConstraint) throws Exception {
+ final int parallelism = 11;
+
+ final MiniClusterConfiguration cfg = new
MiniClusterConfiguration.Builder()
+ .setNumTaskManagers(1)
+ .setNumSlotsPerTaskManager(parallelism)
+ .setConfiguration(getDefaultConfiguration())
+ .build();
+
+ try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+ miniCluster.start();
+
+ final JobVertex sender = new JobVertex("Sender");
+ sender.setInvokableClass(CountDownLatchedSender.class);
+ sender.setParallelism(parallelism);
+
+ final JobVertex receiver = new JobVertex("Receiver");
+
receiver.setInvokableClass(CountDownLatchedReceiver.class);
+ receiver.setParallelism(parallelism);
+
+ receiver.connectNewDataSetAsInput(sender,
DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED);
+
+ final CountDownLatch countDownLatch = new
CountDownLatch(parallelism);
+ CountDownLatchedSender.setLatch(countDownLatch);
+ CountDownLatchedReceiver.setLatch(countDownLatch);
+
+ final SlotSharingGroup sharingGroup = new
SlotSharingGroup(sender.getID(), receiver.getID());
+ sender.setSlotSharingGroup(sharingGroup);
+ receiver.setSlotSharingGroup(sharingGroup);
+
+ if (withCoLocationConstraint) {
+ receiver.setStrictlyCoLocatedWith(sender);
+ }
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job",
sender, receiver);
+
+ miniCluster.executeJobBlocking(jobGraph);
+ }
+ }
+
+ /**
+ * A sender that does not exit until all receivers are running.
+ */
+ public static class CountDownLatchedSender extends AbstractInvokable {
+
+ private static CountDownLatch latch;
+
+ static void setLatch(CountDownLatch latch) {
+ CountDownLatchedSender.latch = latch;
+ }
+
+ public CountDownLatchedSender(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ RecordWriter<IntValue> writer = new
RecordWriter<>(getEnvironment().getWriter(0));
+
+ try {
+ writer.emit(new IntValue(42));
+ writer.emit(new IntValue(1337));
+ writer.flushAll();
+ } finally {
+ writer.clearBuffers();
+ latch.await();
+ }
+ }
+ }
+
+ /**
+ * A receiver that counts down the latch on running.
+ */
+ public static class CountDownLatchedReceiver extends AbstractInvokable {
+
+ private static CountDownLatch latch;
+
+ static void setLatch(CountDownLatch latch) {
+ CountDownLatchedReceiver.latch = latch;
+ }
+
+ public CountDownLatchedReceiver(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ latch.countDown();
+
+ RecordReader<IntValue> reader = new RecordReader<>(
+ getEnvironment().getInputGate(0),
+ IntValue.class,
+
getEnvironment().getTaskManagerInfo().getTmpDirectories());
+
+ IntValue i1 = reader.next();
+ IntValue i2 = reader.next();
+ IntValue i3 = reader.next();
+
+ if (i1.getValue() != 42 || i2.getValue() != 1337 || i3
!= null) {
+ throw new Exception("Wrong data received.");
+ }
+ }
+ }
+
+ /**
+ * This job runs in N slots with 2 * N senders and N receivers. Unless
slot sharing is used,
+ * it cannot complete.
+ */
+ @Test
+ public void testSlotSharingForTwoInputsJob() throws Exception {
+ final int parallelism = 11;
+
+ final MiniClusterConfiguration cfg = new
MiniClusterConfiguration.Builder()
+ .setNumTaskManagers(1)
+ .setNumSlotsPerTaskManager(parallelism)
+ .setConfiguration(getDefaultConfiguration())
+ .build();
+
+ try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+ miniCluster.start();
+
+ final JobVertex sender1 = new JobVertex("Sender1");
+ sender1.setInvokableClass(CountDownLatchedSender.class);
+ sender1.setParallelism(parallelism);
+
+ final JobVertex sender2 = new JobVertex("Sender2");
+ sender2.setInvokableClass(CountDownLatchedSender.class);
+ sender2.setParallelism(parallelism);
+
+ final JobVertex receiver = new JobVertex("Receiver");
+
receiver.setInvokableClass(CountDownLatchedAgnosticBinaryReceiver.class);
+ receiver.setParallelism(parallelism);
+
+ receiver.connectNewDataSetAsInput(sender1,
DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED);
+ receiver.connectNewDataSetAsInput(sender2,
DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.PIPELINED);
+
+ final CountDownLatch countDownLatch = new
CountDownLatch(parallelism);
+ CountDownLatchedSender.setLatch(countDownLatch);
+
CountDownLatchedAgnosticBinaryReceiver.setLatch(countDownLatch);
+
+ final SlotSharingGroup sharingGroup = new
SlotSharingGroup(sender1.getID(), sender2.getID(), receiver.getID());
+ sender1.setSlotSharingGroup(sharingGroup);
+ sender2.setSlotSharingGroup(sharingGroup);
+ receiver.setSlotSharingGroup(sharingGroup);
+
+ final JobGraph jobGraph = new JobGraph("Bipartite job",
sender1, sender2, receiver);
+
+ miniCluster.executeJobBlocking(jobGraph);
+ }
+ }
+
+ /**
+ * A receiver that counts down the latch on running.
+ */
+ public static class CountDownLatchedAgnosticBinaryReceiver extends
AbstractInvokable {
+
+ private static CountDownLatch latch;
+
+ static void setLatch(CountDownLatch latch) {
+ CountDownLatchedAgnosticBinaryReceiver.latch = latch;
+ }
+
+ public CountDownLatchedAgnosticBinaryReceiver(Environment
environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ latch.countDown();
+
+ RecordReader<IntValue> reader1 = new RecordReader<>(
+ getEnvironment().getInputGate(0),
+ IntValue.class,
+
getEnvironment().getTaskManagerInfo().getTmpDirectories());
+
+ RecordReader<IntValue> reader2 = new RecordReader<>(
+ getEnvironment().getInputGate(1),
+ IntValue.class,
+
getEnvironment().getTaskManagerInfo().getTmpDirectories());
+
+ while (reader1.next() != null) { }
+ while (reader2.next() != null) { }
+ }
+ }
+
+ @Test
+ public void testSlotSharingForForwardJobWithFailedTaskManager() throws
Exception {
+ final int parallelism = 20;
+ final int numOfTaskManagers = 2;
+
+ final Configuration configuration = getDefaultConfiguration();
+ configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT,
100L);
+
+ final MiniClusterConfiguration cfg = new
MiniClusterConfiguration.Builder()
+ .setNumTaskManagers(numOfTaskManagers)
+ .setNumSlotsPerTaskManager(parallelism /
numOfTaskManagers)
+ .setConfiguration(configuration)
+ .build();
+
+ try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+ miniCluster.start();
+
+ final JobVertex sender = new JobVertex("Sender");
+ sender.setInvokableClass(Sender.class);
+ sender.setParallelism(parallelism);
+
+ final JobVertex receiver = new
JobVertex("BlockingReceiver");
+ receiver.setInvokableClass(BlockingReceiver.class);
+ receiver.setParallelism(parallelism);
+
+ receiver.connectNewDataSetAsInput(sender,
DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED);
+
+ final SlotSharingGroup sharingGroup = new
SlotSharingGroup();
+ sender.setSlotSharingGroup(sharingGroup);
+ receiver.setSlotSharingGroup(sharingGroup);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job",
sender, receiver);
+
+ final CompletableFuture<JobSubmissionResult>
submissionFuture = miniCluster.submitJob(jobGraph);
+
+ final CompletableFuture<Void> failTaskManager =
submissionFuture.thenRun(
+ () ->
miniCluster.getTaskManagers()[0].shutDown());
Review comment:
shutdown is not a blocking operation. You have to wait for the termination
future instead, presumably using `thenCompose()`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services