dawidwys commented on a change in pull request #15294:
URL: https://github.com/apache/flink/pull/15294#discussion_r604056590



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
##########
@@ -76,4 +76,8 @@ public SubtaskStateMapper getUpstreamSubtaskStateMapper() {
      * in-flight data.
      */
     public abstract SubtaskStateMapper getDownstreamSubtaskStateMapper();
+
+    public boolean isPointwise() {

Review comment:
       nit: how about making this method `abstract`? When adding a new 
partitioner we will need to think about this property then. I know it's 
unlikely but still :sweat_smile: 

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
##########
@@ -99,87 +105,162 @@
 @RunWith(Parameterized.class)
 @Category(FailsWithAdaptiveScheduler.class) // FLINK-21689
 public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
+    enum Topology implements DagCreator {
+        PIPELINE {
+            @Override
+            public void create(
+                    StreamExecutionEnvironment env,
+                    int minCheckpoints,
+                    boolean slotSharing,
+                    int expectedRestarts) {
+                final int parallelism = env.getParallelism();
+                final SingleOutputStreamOperator<Long> stream =
+                        env.fromSource(
+                                        new LongSource(
+                                                minCheckpoints,
+                                                parallelism,
+                                                expectedRestarts,
+                                                env.getCheckpointInterval()),
+                                        noWatermarks(),
+                                        "source")
+                                .slotSharingGroup(slotSharing ? "default" : 
"source")
+                                .disableChaining()
+                                .map(i -> checkHeader(i))
+                                .name("forward")
+                                .uid("forward")
+                                .slotSharingGroup(slotSharing ? "default" : 
"forward")
+                                .keyBy(i -> withoutHeader(i) % parallelism * 
parallelism)
+                                .process(new KeyedIdentityFunction())
+                                .name("keyed")
+                                .uid("keyed");
+                addFailingPipeline(minCheckpoints, slotSharing, stream);
+            }
+        },
+
+        MULTI_INPUT {
+            @Override
+            public void create(
+                    StreamExecutionEnvironment env,
+                    int minCheckpoints,
+                    boolean slotSharing,
+                    int expectedRestarts) {
+                final int parallelism = env.getParallelism();
+                DataStream<Long> combinedSource = null;
+                for (int inputIndex = 0; inputIndex < NUM_SOURCES; 
inputIndex++) {
+                    final SingleOutputStreamOperator<Long> source =
+                            env.fromSource(
+                                            new LongSource(
+                                                    minCheckpoints,
+                                                    parallelism,
+                                                    expectedRestarts,
+                                                    
env.getCheckpointInterval()),
+                                            noWatermarks(),
+                                            "source" + inputIndex)
+                                    .slotSharingGroup(
+                                            slotSharing ? "default" : 
("source" + inputIndex))
+                                    .disableChaining();
+                    combinedSource =
+                            combinedSource == null
+                                    ? source
+                                    : combinedSource
+                                            .connect(source)
+                                            .flatMap(new MinEmittingFunction())
+                                            .name("min" + inputIndex)
+                                            .uid("min" + inputIndex)
+                                            .slotSharingGroup(
+                                                    slotSharing ? "default" : 
("min" + inputIndex));
+                }
 
-    @Parameterized.Parameters(name = "{0}")
-    public static Object[][] parameters() {
-        return new Object[][] {
-            new Object[] {
-                "non-parallel pipeline with local channels", 
createPipelineSettings(1, 1, true)
-            },
-            new Object[] {
-                "non-parallel pipeline with remote channels", 
createPipelineSettings(1, 1, false)
-            },
-            new Object[] {
-                "parallel pipeline with local channels, p = 5", 
createPipelineSettings(5, 5, true)
-            },
-            new Object[] {
-                "parallel pipeline with remote channels, p = 5", 
createPipelineSettings(5, 1, false)
-            },
-            new Object[] {
-                "parallel pipeline with mixed channels, p = 5", 
createPipelineSettings(5, 3, true)
-            },
-            new Object[] {
-                "parallel pipeline with mixed channels, p = 20",
-                createPipelineSettings(20, 10, true)
-            },
-            new Object[] {
-                "parallel pipeline with mixed channels, p = 20, timeout=1",
-                createPipelineSettings(20, 10, true, 1)
-            },
-            new Object[] {"Parallel cogroup, p = 5", createCogroupSettings(5)},
-            new Object[] {"Parallel cogroup, p = 10", 
createCogroupSettings(10)},
-            new Object[] {"Parallel union, p = 5", createUnionSettings(5)},
-            new Object[] {"Parallel union, p = 10", createUnionSettings(10)},
-        };
-    }
+                addFailingPipeline(minCheckpoints, slotSharing, 
combinedSource);
+            }
+        },
+
+        UNION {
+            @Override
+            public void create(
+                    StreamExecutionEnvironment env,
+                    int minCheckpoints,
+                    boolean slotSharing,
+                    int expectedRestarts) {
+                final int parallelism = env.getParallelism();
+                DataStream<Tuple2<Integer, Long>> combinedSource = null;
+                for (int inputIndex = 0; inputIndex < NUM_SOURCES; 
inputIndex++) {
+                    int finalInputIndex = inputIndex;
+                    final SingleOutputStreamOperator<Tuple2<Integer, Long>> 
source =
+                            env.fromSource(
+                                            new LongSource(
+                                                    minCheckpoints,
+                                                    parallelism,
+                                                    expectedRestarts,
+                                                    
env.getCheckpointInterval()),
+                                            noWatermarks(),
+                                            "source" + inputIndex)
+                                    .slotSharingGroup(
+                                            slotSharing ? "default" : 
("source" + inputIndex))
+                                    .map(i -> new Tuple2<>(finalInputIndex, 
checkHeader(i)))
+                                    .returns(
+                                            TypeInformation.of(
+                                                    new 
TypeHint<Tuple2<Integer, Long>>() {}))
+                                    .slotSharingGroup(
+                                            slotSharing ? "default" : 
("source" + inputIndex))
+                                    .disableChaining();
+                    combinedSource = combinedSource == null ? source : 
combinedSource.union(source);
+                }
 
-    private static UnalignedSettings createPipelineSettings(
-            int parallelism, int slotsPerTaskManager, boolean slotSharing) {
-        return createPipelineSettings(parallelism, slotsPerTaskManager, 
slotSharing, 0);
-    }
+                final SingleOutputStreamOperator<Long> deduplicated =
+                        combinedSource
+                                .flatMap(new 
SourceAwareMinEmittingFunction(NUM_SOURCES))
+                                .name("min")
+                                .uid("min")
+                                .slotSharingGroup(slotSharing ? "default" : 
"min");
+                addFailingPipeline(minCheckpoints, slotSharing, deduplicated);
+            }
+        };
 
-    private static UnalignedSettings createPipelineSettings(
-            int parallelism, int slotsPerTaskManager, boolean slotSharing, int 
timeout) {
-        int numShuffles = 4;
-        return new UnalignedSettings(UnalignedCheckpointITCase::createPipeline)
-                .setParallelism(parallelism)
-                .setSlotSharing(slotSharing)
-                .setNumSlots(slotSharing ? parallelism : parallelism * 
numShuffles)
-                .setSlotsPerTaskManager(slotsPerTaskManager)
-                .setExpectedFailures(5)
-                .setFailuresAfterSourceFinishes(1)
-                .setAlignmentTimeout(timeout);
+        @Override
+        public String toString() {
+            return name().toLowerCase();
+        }
     }
 
-    private static UnalignedSettings createCogroupSettings(int parallelism) {
-        int numShuffles = 10;
-        return new 
UnalignedSettings(UnalignedCheckpointITCase::createMultipleInputTopology)
-                .setParallelism(parallelism)
-                .setSlotSharing(true)
-                .setNumSlots(parallelism * numShuffles)
-                .setSlotsPerTaskManager(parallelism)
-                .setExpectedFailures(5)
-                .setFailuresAfterSourceFinishes(1);
+    @Parameterized.Parameters(name = "{0} with {2} channels, p = {1}, timeout 
= {3}")
+    public static Object[][] parameters() {
+        Object[] defaults = {Topology.PIPELINE, 1, MIXED, 0};
+
+        return Stream.of(
+                        new Object[] {Topology.PIPELINE, 1, LOCAL},
+                        new Object[] {Topology.PIPELINE, 1, REMOTE},
+                        new Object[] {Topology.PIPELINE, 5, LOCAL},
+                        new Object[] {Topology.PIPELINE, 5, REMOTE},
+                        new Object[] {Topology.PIPELINE, 20},
+                        new Object[] {Topology.PIPELINE, 20, MIXED, 1},
+                        new Object[] {Topology.MULTI_INPUT, 5},
+                        new Object[] {Topology.MULTI_INPUT, 10},
+                        new Object[] {Topology.UNION, 5},
+                        new Object[] {Topology.UNION, 10})
+                .map(params -> addDefaults(params, defaults))
+                .toArray(Object[][]::new);
     }

Review comment:
       Isn't it the same @pnowojski ? You should compare the `parameters` 
methods.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
##########
@@ -405,45 +412,60 @@ public void processElement(Long value, Context ctx, 
Collector<Long> out) {
         }
     }
 
-    private static class CountingMapFunction extends RichFlatMapFunction<Long, 
Long>
+    private static class SourceAwareMinEmittingFunction
+            extends RichFlatMapFunction<Tuple2<Integer, Long>, Long>
             implements CheckpointedFunction {
-        private BitSet seenRecords;
-
-        private final int withdrawnCount;
+        private final int numSources;
+        private State state;
 
-        private ListState<BitSet> stateList;
+        private ListState<State> stateList;
 
-        public CountingMapFunction(int numSources) {
-            withdrawnCount = numSources - 1;
+        public SourceAwareMinEmittingFunction(int numSources) {
+            this.numSources = numSources;
         }
 
         @Override
-        public void flatMap(Long value, Collector<Long> out) throws Exception {
-            long baseValue = withoutHeader(value);
-            final int offset = StrictMath.toIntExact(baseValue * 
withdrawnCount);
-            for (int index = 0; index < withdrawnCount; index++) {
-                if (!seenRecords.get(index + offset)) {
-                    seenRecords.set(index + offset);
+        public void flatMap(Tuple2<Integer, Long> sourceValue, Collector<Long> 
out)
+                throws Exception {
+            int source = sourceValue.f0;
+            long value = withoutHeader(sourceValue.f1);
+            int partition = (int) (value % 
getRuntimeContext().getNumberOfParallelSubtasks());
+            state.lastValues[source][partition] = value;
+            for (int index = 0; index < numSources; index++) {
+                if (state.lastValues[index][partition] < value) {
                     return;
                 }
             }
-            out.collect(value);
+            out.collect(sourceValue.f1);
         }
 
         @Override
         public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
             stateList.clear();
-            stateList.add(seenRecords);
+            stateList.add(state);
         }
 
         @Override
         public void initializeState(FunctionInitializationContext context) 
throws Exception {
             stateList =
                     context.getOperatorStateStore()
-                            .getListState(
-                                    new ListStateDescriptor<>(
-                                            "state", new 
GenericTypeInfo<>(BitSet.class)));
-            seenRecords = getOnlyElement(stateList.get(), new BitSet());
+                            .getListState(new ListStateDescriptor<>("state", 
State.class));
+            state =
+                    getOnlyElement(
+                            stateList.get(),
+                            new State(
+                                    numSources, 
getRuntimeContext().getNumberOfParallelSubtasks()));
+        }
+
+        private static class State {
+            private long[][] lastValues;

Review comment:
       nit: `final`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
##########
@@ -39,6 +39,14 @@
  */
 public class CheckpointOptions implements Serializable {
 
+    /** How a checkpoint should be aligned. */
+    public enum AlignmentType {

Review comment:
       Nice change! Improves readability a lot!

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/SubtaskStateMapper.java
##########
@@ -183,6 +183,15 @@ public boolean isAmbiguous() {
             }
             return subtasks.toArray();
         }
+    },
+
+    UNSUPPORTED {
+        @Override
+        public int[] getOldSubtasks(
+                int newSubtaskIndex, int oldNumberOfSubtasks, int 
newNumberOfSubtasks) {
+            throw new UnsupportedOperationException(
+                    "Cannot rescale the given pointwise partitioner. Did you 
change the partitioner to forward or rescale? It may also help to add an 
explicit shuffle().");

Review comment:
       nit: chop down the line

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
##########
@@ -99,87 +104,156 @@
 @RunWith(Parameterized.class)
 @Category(FailsWithAdaptiveScheduler.class) // FLINK-21689
 public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
+    enum Topology implements DagCreator {
+        PIPELINE {
+            @Override
+            public void create(
+                    StreamExecutionEnvironment env,
+                    int minCheckpoints,
+                    boolean slotSharing,
+                    int expectedRestarts) {
+                final int parallelism = env.getParallelism();
+                final SingleOutputStreamOperator<Long> stream =
+                        env.fromSource(
+                                        new LongSource(
+                                                minCheckpoints,
+                                                parallelism,
+                                                expectedRestarts,
+                                                env.getCheckpointInterval()),
+                                        noWatermarks(),
+                                        "source")
+                                .slotSharingGroup(slotSharing ? "default" : 
"source")
+                                .disableChaining()
+                                .map(i -> checkHeader(i))
+                                .name("forward")
+                                .uid("forward")
+                                .slotSharingGroup(slotSharing ? "default" : 
"forward")
+                                .keyBy(i -> withoutHeader(i) % parallelism * 
parallelism)
+                                .process(new KeyedIdentityFunction())
+                                .name("keyed")
+                                .uid("keyed");
+                addFailingPipeline(minCheckpoints, slotSharing, stream);
+            }
+        },
+
+        MULTI_INPUT {
+            @Override
+            public void create(
+                    StreamExecutionEnvironment env,
+                    int minCheckpoints,
+                    boolean slotSharing,
+                    int expectedRestarts) {
+                final int parallelism = env.getParallelism();
+                DataStream<Long> combinedSource = null;
+                for (int inputIndex = 0; inputIndex < NUM_SOURCES; 
inputIndex++) {
+                    final SingleOutputStreamOperator<Long> source =
+                            env.fromSource(
+                                            new LongSource(
+                                                    minCheckpoints,
+                                                    parallelism,
+                                                    expectedRestarts,
+                                                    
env.getCheckpointInterval()),
+                                            noWatermarks(),
+                                            "source" + inputIndex)
+                                    .slotSharingGroup(
+                                            slotSharing ? "default" : 
("source" + inputIndex))
+                                    .disableChaining();
+                    combinedSource =
+                            combinedSource == null
+                                    ? source
+                                    : combinedSource
+                                            .connect(source)
+                                            .flatMap(new MinEmittingFunction())
+                                            .name("min" + inputIndex)
+                                            .uid("min" + inputIndex)
+                                            .slotSharingGroup(
+                                                    slotSharing ? "default" : 
("min" + inputIndex));
+                }
 
-    @Parameterized.Parameters(name = "{0}")
-    public static Object[][] parameters() {
-        return new Object[][] {
-            new Object[] {
-                "non-parallel pipeline with local channels", 
createPipelineSettings(1, 1, true)
-            },
-            new Object[] {
-                "non-parallel pipeline with remote channels", 
createPipelineSettings(1, 1, false)
-            },
-            new Object[] {
-                "parallel pipeline with local channels, p = 5", 
createPipelineSettings(5, 5, true)
-            },
-            new Object[] {
-                "parallel pipeline with remote channels, p = 5", 
createPipelineSettings(5, 1, false)
-            },
-            new Object[] {
-                "parallel pipeline with mixed channels, p = 5", 
createPipelineSettings(5, 3, true)
-            },
-            new Object[] {
-                "parallel pipeline with mixed channels, p = 20",
-                createPipelineSettings(20, 10, true)
-            },
-            new Object[] {
-                "parallel pipeline with mixed channels, p = 20, timeout=1",
-                createPipelineSettings(20, 10, true, 1)
-            },
-            new Object[] {"Parallel cogroup, p = 5", createCogroupSettings(5)},
-            new Object[] {"Parallel cogroup, p = 10", 
createCogroupSettings(10)},
-            new Object[] {"Parallel union, p = 5", createUnionSettings(5)},
-            new Object[] {"Parallel union, p = 10", createUnionSettings(10)},
-        };
-    }
+                addFailingPipeline(minCheckpoints, slotSharing, 
combinedSource);
+            }
+        },
+
+        UNION {
+            @Override
+            public void create(
+                    StreamExecutionEnvironment env,
+                    int minCheckpoints,
+                    boolean slotSharing,
+                    int expectedRestarts) {
+                final int parallelism = env.getParallelism();
+                DataStream<Long> combinedSource = null;
+                for (int inputIndex = 0; inputIndex < NUM_SOURCES; 
inputIndex++) {
+                    final SingleOutputStreamOperator<Long> source =
+                            env.fromSource(
+                                            new LongSource(
+                                                    minCheckpoints,
+                                                    parallelism,
+                                                    expectedRestarts,
+                                                    
env.getCheckpointInterval()),
+                                            noWatermarks(),
+                                            "source" + inputIndex)
+                                    .slotSharingGroup(
+                                            slotSharing ? "default" : 
("source" + inputIndex))
+                                    .disableChaining();
+                    combinedSource = combinedSource == null ? source : 
combinedSource.union(source);
+                }
 
-    private static UnalignedSettings createPipelineSettings(
-            int parallelism, int slotsPerTaskManager, boolean slotSharing) {
-        return createPipelineSettings(parallelism, slotsPerTaskManager, 
slotSharing, 0);
-    }
+                final SingleOutputStreamOperator<Long> deduplicated =
+                        combinedSource
+                                .partitionCustom(
+                                        (key, numPartitions) ->
+                                                (int) (withoutHeader(key) % 
numPartitions),
+                                        l -> l)
+                                .flatMap(new CountingMapFunction(NUM_SOURCES));
+                addFailingPipeline(minCheckpoints, slotSharing, deduplicated);
+            }
+        };
 
-    private static UnalignedSettings createPipelineSettings(
-            int parallelism, int slotsPerTaskManager, boolean slotSharing, int 
timeout) {
-        int numShuffles = 4;
-        return new UnalignedSettings(UnalignedCheckpointITCase::createPipeline)
-                .setParallelism(parallelism)
-                .setSlotSharing(slotSharing)
-                .setNumSlots(slotSharing ? parallelism : parallelism * 
numShuffles)
-                .setSlotsPerTaskManager(slotsPerTaskManager)
-                .setExpectedFailures(5)
-                .setFailuresAfterSourceFinishes(1)
-                .setAlignmentTimeout(timeout);
+        @Override
+        public String toString() {
+            return name().toLowerCase();
+        }
     }
 
-    private static UnalignedSettings createCogroupSettings(int parallelism) {
-        int numShuffles = 10;
-        return new 
UnalignedSettings(UnalignedCheckpointITCase::createMultipleInputTopology)
-                .setParallelism(parallelism)
-                .setSlotSharing(true)
-                .setNumSlots(parallelism * numShuffles)
-                .setSlotsPerTaskManager(parallelism)
-                .setExpectedFailures(5)
-                .setFailuresAfterSourceFinishes(1);
+    @Parameterized.Parameters(name = "{0} with {2} channels, p = {1}, timeout 
= {3}")
+    public static Object[][] parameters() {
+        Object[] defaults = {Topology.PIPELINE, 1, MIXED, 0};
+
+        return Stream.of(
+                        new Object[] {Topology.PIPELINE, 1, LOCAL},
+                        new Object[] {Topology.PIPELINE, 1, REMOTE},
+                        new Object[] {Topology.PIPELINE, 5, LOCAL},
+                        new Object[] {Topology.PIPELINE, 5, REMOTE},
+                        new Object[] {Topology.PIPELINE, 20},
+                        new Object[] {Topology.PIPELINE, 20, MIXED, 1},
+                        new Object[] {Topology.MULTI_INPUT, 5},
+                        new Object[] {Topology.MULTI_INPUT, 10},
+                        new Object[] {Topology.UNION, 5},
+                        new Object[] {Topology.UNION, 10})
+                .map(params -> addDefaults(params, defaults))
+                .toArray(Object[][]::new);
     }
 
-    private static UnalignedSettings createUnionSettings(int parallelism) {
-        int numShuffles = 6;
-        return new 
UnalignedSettings(UnalignedCheckpointITCase::createUnionTopology)
-                .setParallelism(parallelism)
-                .setSlotSharing(true)
-                .setNumSlots(parallelism * numShuffles)
-                .setSlotsPerTaskManager(parallelism)
-                .setExpectedFailures(5)
-                .setFailuresAfterSourceFinishes(1);
+    private static Object[] addDefaults(Object[] params, Object[] defaults) {
+        return ArrayUtils.addAll(
+                params, ArrayUtils.subarray(defaults, params.length, 
defaults.length));
     }
 
     private final UnalignedSettings settings;
 
-    public UnalignedCheckpointITCase(String desc, UnalignedSettings settings) {
-        this.settings = settings;
+    public UnalignedCheckpointITCase(
+            Topology topology, int parallelism, ChannelType channelType, int 
timeout) {
+        settings =
+                new UnalignedSettings(topology)
+                        .setParallelism(parallelism)
+                        .setChannelTypes(channelType)
+                        .setExpectedFailures(5)
+                        .setFailuresAfterSourceFinishes(1)
+                        .setAlignmentTimeout(timeout);
     }
 
-    @Test(timeout = 60_000)
+    @Test

Review comment:
       Shouldn't we leave the timeout?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to