Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5697#discussion_r177795751
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java
---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+import static
org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
+
+public class LegacyScheduleOrUpdateConsumersTest extends TestLogger {
+
+ private static final int NUMBER_OF_TMS = 2;
+ private static final int NUMBER_OF_SLOTS_PER_TM = 2;
+ private static final int PARALLELISM = NUMBER_OF_TMS *
NUMBER_OF_SLOTS_PER_TM;
+
+ private static TestingCluster flink;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ flink = TestingUtils.startTestingCluster(
+ NUMBER_OF_SLOTS_PER_TM,
+ NUMBER_OF_TMS,
+ TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ flink.stop();
+ }
+
+ /**
+ * Tests notifications of multiple receivers when a task produces both
a pipelined and blocking
+ * result.
+ *
+ * <pre>
+ * +----------+
+ * +-- pipelined -> | Receiver |
+ * +--------+ | +----------+
+ * | Sender |-|
+ * +--------+ | +----------+
+ * +-- blocking --> | Receiver |
+ * +----------+
+ * </pre>
+ *
+ * The pipelined receiver gets deployed after the first buffer is
available and the blocking
+ * one after all subtasks are finished.
+ */
+ @Test
+ public void testMixedPipelinedAndBlockingResults() throws Exception {
+ final JobVertex sender = new JobVertex("Sender");
+
sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
+
sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY,
PARALLELISM);
+ sender.setParallelism(PARALLELISM);
+
+ final JobVertex pipelinedReceiver = new JobVertex("Pipelined
Receiver");
+
pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
+ pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY,
PARALLELISM);
+ pipelinedReceiver.setParallelism(PARALLELISM);
+
+ pipelinedReceiver.connectNewDataSetAsInput(
+ sender,
+ DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.PIPELINED);
+
+ final JobVertex blockingReceiver = new JobVertex("Blocking
Receiver");
+
blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
+ blockingReceiver.getConfiguration().setInteger(CONFIG_KEY,
PARALLELISM);
+ blockingReceiver.setParallelism(PARALLELISM);
+
+ blockingReceiver.connectNewDataSetAsInput(sender,
+ DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.BLOCKING);
+
+ SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
+ sender.getID(), pipelinedReceiver.getID(),
blockingReceiver.getID());
+
+ sender.setSlotSharingGroup(slotSharingGroup);
+ pipelinedReceiver.setSlotSharingGroup(slotSharingGroup);
+ blockingReceiver.setSlotSharingGroup(slotSharingGroup);
+
+ final JobGraph jobGraph = new JobGraph(
+ "Mixed pipelined and blocking result",
+ sender,
+ pipelinedReceiver,
+ blockingReceiver);
--- End diff --
Could we deduplicate this code?
---