zentol commented on a change in pull request #6883: [FLINK-10610] [tests] Port 
slot sharing cases to new codebase
URL: https://github.com/apache/flink/pull/6883#discussion_r255935106
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 ##########
 @@ -337,6 +347,262 @@ public void testSchedulingAllAtOnce() throws Exception {
                }
        }
 
+       @Test
+       public void testSlotSharingForForwardJobWithCoLocationConstraint() 
throws Exception {
+               testSlotSharingForForwardJob(true);
+       }
+
+       @Test
+       public void testSlotSharingForForwardJobWithoutCoLocationConstraint() 
throws Exception {
+               testSlotSharingForForwardJob(false);
+       }
+
+       /**
+        * This job runs in N slots with N senders and N receivers.
+        * Unless slot sharing is used, it cannot complete.
+        * Either with or without co-location constraint should not
+        * make difference.
+        */
+       private synchronized void testSlotSharingForForwardJob(boolean 
withCoLocationConstraint) throws Exception {
+               final int parallelism = 11;
+
+               final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(parallelism)
+                       .setConfiguration(getDefaultConfiguration())
+                       .build();
+
+               try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+                       miniCluster.start();
+
+                       final JobVertex sender = new JobVertex("Sender");
+                       sender.setInvokableClass(CountDownLatchedSender.class);
+                       sender.setParallelism(parallelism);
+
+                       final JobVertex receiver = new JobVertex("Receiver");
+                       
receiver.setInvokableClass(CountDownLatchedReceiver.class);
+                       receiver.setParallelism(parallelism);
+
+                       receiver.connectNewDataSetAsInput(sender, 
DistributionPattern.POINTWISE,
+                               ResultPartitionType.PIPELINED);
+
+                       final CountDownLatch countDownLatch = new 
CountDownLatch(parallelism);
+                       CountDownLatchedSender.setLatch(countDownLatch);
+                       CountDownLatchedReceiver.setLatch(countDownLatch);
+
+                       final SlotSharingGroup sharingGroup = new 
SlotSharingGroup(sender.getID(), receiver.getID());
+                       sender.setSlotSharingGroup(sharingGroup);
+                       receiver.setSlotSharingGroup(sharingGroup);
+
+                       if (withCoLocationConstraint) {
+                               receiver.setStrictlyCoLocatedWith(sender);
+                       }
+
+                       final JobGraph jobGraph = new JobGraph("Pointwise Job", 
sender, receiver);
+
+                       miniCluster.executeJobBlocking(jobGraph);
+               }
+       }
+
+       /**
+        * A sender that does not exit until all receivers are running.
+        */
+       public static class CountDownLatchedSender extends AbstractInvokable {
+
+               private static CountDownLatch latch;
+
+               static void setLatch(CountDownLatch latch) {
+                       CountDownLatchedSender.latch = latch;
+               }
+
+               public CountDownLatchedSender(Environment environment) {
+                       super(environment);
+               }
+
+               @Override
+               public void invoke() throws Exception {
+                       RecordWriter<IntValue> writer = new 
RecordWriter<>(getEnvironment().getWriter(0));
+
+                       try {
+                               writer.emit(new IntValue(42));
+                               writer.emit(new IntValue(1337));
+                               writer.flushAll();
+                       } finally {
+                               writer.clearBuffers();
+                               latch.await();
+                       }
+               }
+       }
+
+       /**
+        * A receiver that counts down the latch on running.
+        */
+       public static class CountDownLatchedReceiver extends AbstractInvokable {
+
+               private static CountDownLatch latch;
+
+               static void setLatch(CountDownLatch latch) {
+                       CountDownLatchedReceiver.latch = latch;
+               }
+
+               public CountDownLatchedReceiver(Environment environment) {
+                       super(environment);
+               }
+
+               @Override
+               public void invoke() throws Exception {
+                       latch.countDown();
+
+                       RecordReader<IntValue> reader = new RecordReader<>(
+                               getEnvironment().getInputGate(0),
+                               IntValue.class,
+                               
getEnvironment().getTaskManagerInfo().getTmpDirectories());
+
+                       IntValue i1 = reader.next();
+                       IntValue i2 = reader.next();
+                       IntValue i3 = reader.next();
+
+                       if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 
!= null) {
+                               throw new Exception("Wrong data received.");
+                       }
+               }
+       }
+
+       /**
+        * This job runs in N slots with 2 * N senders and N receivers. Unless 
slot sharing is used,
+        * it cannot complete.
+        */
+       @Test
+       public void testSlotSharingForTwoInputsJob() throws Exception {
+               final int parallelism = 11;
+
+               final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(parallelism)
+                       .setConfiguration(getDefaultConfiguration())
+                       .build();
+
+               try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+                       miniCluster.start();
+
+                       final JobVertex sender1 = new JobVertex("Sender1");
+                       sender1.setInvokableClass(CountDownLatchedSender.class);
+                       sender1.setParallelism(parallelism);
+
+                       final JobVertex sender2 = new JobVertex("Sender2");
+                       sender2.setInvokableClass(CountDownLatchedSender.class);
+                       sender2.setParallelism(parallelism);
+
+                       final JobVertex receiver = new JobVertex("Receiver");
+                       
receiver.setInvokableClass(CountDownLatchedAgnosticBinaryReceiver.class);
+                       receiver.setParallelism(parallelism);
+
+                       receiver.connectNewDataSetAsInput(sender1, 
DistributionPattern.POINTWISE,
+                               ResultPartitionType.PIPELINED);
+                       receiver.connectNewDataSetAsInput(sender2, 
DistributionPattern.ALL_TO_ALL,
+                               ResultPartitionType.PIPELINED);
+
+                       final CountDownLatch countDownLatch = new 
CountDownLatch(parallelism);
+                       CountDownLatchedSender.setLatch(countDownLatch);
 
 Review comment:
   FYI: if the test had failed another test could've been blocked indefinitely 
since you aren't calling unlock in a finally block.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to