dawidwys commented on a change in pull request #16135:
URL: https://github.com/apache/flink/pull/16135#discussion_r654245153



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
##########
@@ -255,7 +255,6 @@ public void processBarrierAnnouncement(
             CheckpointBarrier announcedBarrier, int sequenceNumber, 
InputChannelInfo channelInfo)
             throws IOException {
         if (checkNewCheckpoint(announcedBarrier)) {
-            firstBarrierArrivalTime = getClock().relativeTimeNanos();
             if (alternating) {

Review comment:
       Shouldn't we register the Alignment timer on the first barrier as well 
If there were no announcements for that barrier? I think we could move this 
block into the `checkNewCheckpoint` method.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -117,12 +118,13 @@ private boolean executeIgnoreInFlightDataDuringRecovery() 
{
         env.enableCheckpointing(10);
         env.disableOperatorChaining();
         env.getCheckpointConfig().enableUnalignedCheckpoints();
+        env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
         
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
         env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
         env.setRestartStrategy(fixedDelayRestart(1, 0));
 
         env.addSource(new NumberSource(lastCheckpointValue))
-                .shuffle()
+                .rebalance()

Review comment:
       Why is this necessary?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -117,12 +118,13 @@ private boolean executeIgnoreInFlightDataDuringRecovery() 
{
         env.enableCheckpointing(10);
         env.disableOperatorChaining();
         env.getCheckpointConfig().enableUnalignedCheckpoints();
+        env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
         
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
         env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
         env.setRestartStrategy(fixedDelayRestart(1, 0));
 
         env.addSource(new NumberSource(lastCheckpointValue))
-                .shuffle()
+                .rebalance()

Review comment:
       Why can't we use the default `forward` partitioner then?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
##########
@@ -255,7 +255,6 @@ public void processBarrierAnnouncement(
             CheckpointBarrier announcedBarrier, int sequenceNumber, 
InputChannelInfo channelInfo)
             throws IOException {
         if (checkNewCheckpoint(announcedBarrier)) {
-            firstBarrierArrivalTime = getClock().relativeTimeNanos();
             if (alternating) {

Review comment:
       I think that's a valid yet, unrelated issue. It's the same independent 
if we first receive an announcement or a barrier for a given checkpoint.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
##########
@@ -255,7 +255,6 @@ public void processBarrierAnnouncement(
             CheckpointBarrier announcedBarrier, int sequenceNumber, 
InputChannelInfo channelInfo)
             throws IOException {
         if (checkNewCheckpoint(announcedBarrier)) {
-            firstBarrierArrivalTime = getClock().relativeTimeNanos();
             if (alternating) {

Review comment:
       cc @pnowojski 

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -117,12 +118,13 @@ private boolean executeIgnoreInFlightDataDuringRecovery() 
{
         env.enableCheckpointing(10);
         env.disableOperatorChaining();
         env.getCheckpointConfig().enableUnalignedCheckpoints();
+        env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
         
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
         env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
         env.setRestartStrategy(fixedDelayRestart(1, 0));
 
         env.addSource(new NumberSource(lastCheckpointValue))
-                .shuffle()
+                .rebalance()

Review comment:
       Does your source have different parallelism than the following map 
operator? I don't think so. Using `ForwardPartitioner` will result in subtask 0 
of the source pushing to subtask 0 of the map operator and so on. If your only 
goal is to have each of the map operators receive data, I think that's enough, 
isn't it? 
   
   Rebalance means that each source subtask will produce to every map subtask.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -117,12 +118,13 @@ private boolean executeIgnoreInFlightDataDuringRecovery() 
{
         env.enableCheckpointing(10);
         env.disableOperatorChaining();
         env.getCheckpointConfig().enableUnalignedCheckpoints();
+        env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
         
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
         env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
         env.setRestartStrategy(fixedDelayRestart(1, 0));
 
         env.addSource(new NumberSource(lastCheckpointValue))
-                .shuffle()
+                .rebalance()

Review comment:
       Does your source have lower parallelism than the following map operator? 
I don't think so. Using `ForwardPartitioner` will result in subtask 0 of the 
source pushing to subtask 0 of the map operator and so on. If your only goal is 
to have each of the map operators receive data, I think that's enough, isn't 
it? 
   
   Rebalance means that each source subtask will produce to every map subtask.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -117,12 +118,13 @@ private boolean executeIgnoreInFlightDataDuringRecovery() 
{
         env.enableCheckpointing(10);
         env.disableOperatorChaining();
         env.getCheckpointConfig().enableUnalignedCheckpoints();
+        env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
         
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
         env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
         env.setRestartStrategy(fixedDelayRestart(1, 0));
 
         env.addSource(new NumberSource(lastCheckpointValue))
-                .shuffle()
+                .rebalance()

Review comment:
       Are you sure? Where do you set the parallelism to `1` for the source? I 
can't seem to find it.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -117,12 +118,13 @@ private boolean executeIgnoreInFlightDataDuringRecovery() 
{
         env.enableCheckpointing(10);
         env.disableOperatorChaining();
         env.getCheckpointConfig().enableUnalignedCheckpoints();
+        env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
         
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
         env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
         env.setRestartStrategy(fixedDelayRestart(1, 0));
 
         env.addSource(new NumberSource(lastCheckpointValue))
-                .shuffle()
+                .rebalance()

Review comment:
       Are you sure? Where do you set the parallelism to `1` for the source? I 
can't find it.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -117,12 +118,13 @@ private boolean executeIgnoreInFlightDataDuringRecovery() 
{
         env.enableCheckpointing(10);
         env.disableOperatorChaining();
         env.getCheckpointConfig().enableUnalignedCheckpoints();
+        env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
         
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
         env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
         env.setRestartStrategy(fixedDelayRestart(1, 0));
 
         env.addSource(new NumberSource(lastCheckpointValue))
-                .shuffle()
+                .rebalance()

Review comment:
       Right, I keep forgetting about the `ParallelSourceFuntion` :facepalm: 
It's such a design failure imo.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -117,12 +118,13 @@ private boolean executeIgnoreInFlightDataDuringRecovery() 
{
         env.enableCheckpointing(10);
         env.disableOperatorChaining();
         env.getCheckpointConfig().enableUnalignedCheckpoints();
+        env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
         
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
         env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
         env.setRestartStrategy(fixedDelayRestart(1, 0));
 
         env.addSource(new NumberSource(lastCheckpointValue))
-                .shuffle()
+                .rebalance()

Review comment:
       Still, I believe you don't need to specify the partitioning explicitly. 
If you remove the `rebalance` whatsoever it will use the `RescalePartitioner` 
which behaves the same as `RebalancePartitioner` in this case, because you have 
just a single source task.
   
   If you do believe a `rebalance` is better, add a comment why you need the 
`rebalance` there. It's not straightforward to figure out the role of it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to