dawidwys commented on a change in pull request #15294:
URL: https://github.com/apache/flink/pull/15294#discussion_r604172595



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
##########
@@ -99,87 +105,162 @@
 @RunWith(Parameterized.class)
 @Category(FailsWithAdaptiveScheduler.class) // FLINK-21689
 public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
+    enum Topology implements DagCreator {
+        PIPELINE {
+            @Override
+            public void create(
+                    StreamExecutionEnvironment env,
+                    int minCheckpoints,
+                    boolean slotSharing,
+                    int expectedRestarts) {
+                final int parallelism = env.getParallelism();
+                final SingleOutputStreamOperator<Long> stream =
+                        env.fromSource(
+                                        new LongSource(
+                                                minCheckpoints,
+                                                parallelism,
+                                                expectedRestarts,
+                                                env.getCheckpointInterval()),
+                                        noWatermarks(),
+                                        "source")
+                                .slotSharingGroup(slotSharing ? "default" : 
"source")
+                                .disableChaining()
+                                .map(i -> checkHeader(i))
+                                .name("forward")
+                                .uid("forward")
+                                .slotSharingGroup(slotSharing ? "default" : 
"forward")
+                                .keyBy(i -> withoutHeader(i) % parallelism * 
parallelism)
+                                .process(new KeyedIdentityFunction())
+                                .name("keyed")
+                                .uid("keyed");
+                addFailingPipeline(minCheckpoints, slotSharing, stream);
+            }
+        },
+
+        MULTI_INPUT {
+            @Override
+            public void create(
+                    StreamExecutionEnvironment env,
+                    int minCheckpoints,
+                    boolean slotSharing,
+                    int expectedRestarts) {
+                final int parallelism = env.getParallelism();
+                DataStream<Long> combinedSource = null;
+                for (int inputIndex = 0; inputIndex < NUM_SOURCES; 
inputIndex++) {
+                    final SingleOutputStreamOperator<Long> source =
+                            env.fromSource(
+                                            new LongSource(
+                                                    minCheckpoints,
+                                                    parallelism,
+                                                    expectedRestarts,
+                                                    
env.getCheckpointInterval()),
+                                            noWatermarks(),
+                                            "source" + inputIndex)
+                                    .slotSharingGroup(
+                                            slotSharing ? "default" : 
("source" + inputIndex))
+                                    .disableChaining();
+                    combinedSource =
+                            combinedSource == null
+                                    ? source
+                                    : combinedSource
+                                            .connect(source)
+                                            .flatMap(new MinEmittingFunction())
+                                            .name("min" + inputIndex)
+                                            .uid("min" + inputIndex)
+                                            .slotSharingGroup(
+                                                    slotSharing ? "default" : 
("min" + inputIndex));
+                }
 
-    @Parameterized.Parameters(name = "{0}")
-    public static Object[][] parameters() {
-        return new Object[][] {
-            new Object[] {
-                "non-parallel pipeline with local channels", 
createPipelineSettings(1, 1, true)
-            },
-            new Object[] {
-                "non-parallel pipeline with remote channels", 
createPipelineSettings(1, 1, false)
-            },
-            new Object[] {
-                "parallel pipeline with local channels, p = 5", 
createPipelineSettings(5, 5, true)
-            },
-            new Object[] {
-                "parallel pipeline with remote channels, p = 5", 
createPipelineSettings(5, 1, false)
-            },
-            new Object[] {
-                "parallel pipeline with mixed channels, p = 5", 
createPipelineSettings(5, 3, true)
-            },
-            new Object[] {
-                "parallel pipeline with mixed channels, p = 20",
-                createPipelineSettings(20, 10, true)
-            },
-            new Object[] {
-                "parallel pipeline with mixed channels, p = 20, timeout=1",
-                createPipelineSettings(20, 10, true, 1)
-            },
-            new Object[] {"Parallel cogroup, p = 5", createCogroupSettings(5)},
-            new Object[] {"Parallel cogroup, p = 10", 
createCogroupSettings(10)},
-            new Object[] {"Parallel union, p = 5", createUnionSettings(5)},
-            new Object[] {"Parallel union, p = 10", createUnionSettings(10)},
-        };
-    }
+                addFailingPipeline(minCheckpoints, slotSharing, 
combinedSource);
+            }
+        },
+
+        UNION {
+            @Override
+            public void create(
+                    StreamExecutionEnvironment env,
+                    int minCheckpoints,
+                    boolean slotSharing,
+                    int expectedRestarts) {
+                final int parallelism = env.getParallelism();
+                DataStream<Tuple2<Integer, Long>> combinedSource = null;
+                for (int inputIndex = 0; inputIndex < NUM_SOURCES; 
inputIndex++) {
+                    int finalInputIndex = inputIndex;
+                    final SingleOutputStreamOperator<Tuple2<Integer, Long>> 
source =
+                            env.fromSource(
+                                            new LongSource(
+                                                    minCheckpoints,
+                                                    parallelism,
+                                                    expectedRestarts,
+                                                    
env.getCheckpointInterval()),
+                                            noWatermarks(),
+                                            "source" + inputIndex)
+                                    .slotSharingGroup(
+                                            slotSharing ? "default" : 
("source" + inputIndex))
+                                    .map(i -> new Tuple2<>(finalInputIndex, 
checkHeader(i)))
+                                    .returns(
+                                            TypeInformation.of(
+                                                    new 
TypeHint<Tuple2<Integer, Long>>() {}))
+                                    .slotSharingGroup(
+                                            slotSharing ? "default" : 
("source" + inputIndex))
+                                    .disableChaining();
+                    combinedSource = combinedSource == null ? source : 
combinedSource.union(source);
+                }
 
-    private static UnalignedSettings createPipelineSettings(
-            int parallelism, int slotsPerTaskManager, boolean slotSharing) {
-        return createPipelineSettings(parallelism, slotsPerTaskManager, 
slotSharing, 0);
-    }
+                final SingleOutputStreamOperator<Long> deduplicated =
+                        combinedSource
+                                .flatMap(new 
SourceAwareMinEmittingFunction(NUM_SOURCES))
+                                .name("min")
+                                .uid("min")
+                                .slotSharingGroup(slotSharing ? "default" : 
"min");
+                addFailingPipeline(minCheckpoints, slotSharing, deduplicated);
+            }
+        };
 
-    private static UnalignedSettings createPipelineSettings(
-            int parallelism, int slotsPerTaskManager, boolean slotSharing, int 
timeout) {
-        int numShuffles = 4;
-        return new UnalignedSettings(UnalignedCheckpointITCase::createPipeline)
-                .setParallelism(parallelism)
-                .setSlotSharing(slotSharing)
-                .setNumSlots(slotSharing ? parallelism : parallelism * 
numShuffles)
-                .setSlotsPerTaskManager(slotsPerTaskManager)
-                .setExpectedFailures(5)
-                .setFailuresAfterSourceFinishes(1)
-                .setAlignmentTimeout(timeout);
+        @Override
+        public String toString() {
+            return name().toLowerCase();
+        }
     }
 
-    private static UnalignedSettings createCogroupSettings(int parallelism) {
-        int numShuffles = 10;
-        return new 
UnalignedSettings(UnalignedCheckpointITCase::createMultipleInputTopology)
-                .setParallelism(parallelism)
-                .setSlotSharing(true)
-                .setNumSlots(parallelism * numShuffles)
-                .setSlotsPerTaskManager(parallelism)
-                .setExpectedFailures(5)
-                .setFailuresAfterSourceFinishes(1);
+    @Parameterized.Parameters(name = "{0} with {2} channels, p = {1}, timeout 
= {3}")
+    public static Object[][] parameters() {
+        Object[] defaults = {Topology.PIPELINE, 1, MIXED, 0};
+
+        return Stream.of(
+                        new Object[] {Topology.PIPELINE, 1, LOCAL},
+                        new Object[] {Topology.PIPELINE, 1, REMOTE},
+                        new Object[] {Topology.PIPELINE, 5, LOCAL},
+                        new Object[] {Topology.PIPELINE, 5, REMOTE},
+                        new Object[] {Topology.PIPELINE, 20},
+                        new Object[] {Topology.PIPELINE, 20, MIXED, 1},
+                        new Object[] {Topology.MULTI_INPUT, 5},
+                        new Object[] {Topology.MULTI_INPUT, 10},
+                        new Object[] {Topology.UNION, 5},
+                        new Object[] {Topology.UNION, 10})
+                .map(params -> addDefaults(params, defaults))
+                .toArray(Object[][]::new);
     }

Review comment:
       I see... good point!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to