akalash commented on a change in pull request #16135:
URL: https://github.com/apache/flink/pull/16135#discussion_r654276097
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
##########
@@ -255,7 +255,6 @@ public void processBarrierAnnouncement(
CheckpointBarrier announcedBarrier, int sequenceNumber,
InputChannelInfo channelInfo)
throws IOException {
if (checkNewCheckpoint(announcedBarrier)) {
- firstBarrierArrivalTime = getClock().relativeTimeNanos();
if (alternating) {
Review comment:
Good point. I believe it indeed makes sense to move it to
`checkNewCheckpoint`
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -117,12 +118,13 @@ private boolean executeIgnoreInFlightDataDuringRecovery()
{
env.enableCheckpointing(10);
env.disableOperatorChaining();
env.getCheckpointConfig().enableUnalignedCheckpoints();
+ env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
env.setRestartStrategy(fixedDelayRestart(1, 0));
env.addSource(new NumberSource(lastCheckpointValue))
- .shuffle()
+ .rebalance()
Review comment:
This test should guarantee that all three subtasks will receive at least
one record. Right now, the source emits fifty records which are enough to make
this test working even with `shuffle` but strictly to say `shuffle` doesn't
make sense here while `rebalance` does exactly what we need. So this change is
not really necessary but it provides more correct behaviour for this test.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
##########
@@ -255,7 +255,6 @@ public void processBarrierAnnouncement(
CheckpointBarrier announcedBarrier, int sequenceNumber,
InputChannelInfo channelInfo)
throws IOException {
if (checkNewCheckpoint(announcedBarrier)) {
- firstBarrierArrivalTime = getClock().relativeTimeNanos();
if (alternating) {
Review comment:
As I understand, this will be only partially working anyway: if we
received the first barrier from localChannel without announcement and the
timeout happened, then only barriers that have the announcements can be
prioritized but if the other channel is local there will be no effect.
So how do you think it is problem or not if we can prioritize the barriers
only for the remote channel(which has an announcement)?
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -117,12 +118,13 @@ private boolean executeIgnoreInFlightDataDuringRecovery()
{
env.enableCheckpointing(10);
env.disableOperatorChaining();
env.getCheckpointConfig().enableUnalignedCheckpoints();
+ env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
env.setRestartStrategy(fixedDelayRestart(1, 0));
env.addSource(new NumberSource(lastCheckpointValue))
- .shuffle()
+ .rebalance()
Review comment:
Perhaps, I don't understand something but who guarantees that all
subtasks will receive the records? As I can see in the implementation of
ForwardPartitioner it always returns the first subpartition(`return 0;`)
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -117,12 +118,13 @@ private boolean executeIgnoreInFlightDataDuringRecovery()
{
env.enableCheckpointing(10);
env.disableOperatorChaining();
env.getCheckpointConfig().enableUnalignedCheckpoints();
+ env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
env.setRestartStrategy(fixedDelayRestart(1, 0));
env.addSource(new NumberSource(lastCheckpointValue))
- .shuffle()
+ .rebalance()
Review comment:
> Does your source have lower parallelism than the following map
operator?
Actually, it does. I have only one source and three followed maps.I did it
because it is easier to manage the execution when you have only one source.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -117,12 +118,13 @@ private boolean executeIgnoreInFlightDataDuringRecovery()
{
env.enableCheckpointing(10);
env.disableOperatorChaining();
env.getCheckpointConfig().enableUnalignedCheckpoints();
+ env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
env.setRestartStrategy(fixedDelayRestart(1, 0));
env.addSource(new NumberSource(lastCheckpointValue))
- .shuffle()
+ .rebalance()
Review comment:
> Does your source have lower parallelism than the following map
operator?
Actually, it does. I have only one source and three followed maps.I did it
because it is easier to manage the execution when you have only one source.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -117,12 +118,13 @@ private boolean executeIgnoreInFlightDataDuringRecovery()
{
env.enableCheckpointing(10);
env.disableOperatorChaining();
env.getCheckpointConfig().enableUnalignedCheckpoints();
+ env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
env.setRestartStrategy(fixedDelayRestart(1, 0));
env.addSource(new NumberSource(lastCheckpointValue))
- .shuffle()
+ .rebalance()
Review comment:
Yes, I am sure. It happens because I implemented SourceFunction which
always has parallelism 1 rather than ParallelSourceFunction which has the
configured parallelism.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -117,12 +118,13 @@ private boolean executeIgnoreInFlightDataDuringRecovery()
{
env.enableCheckpointing(10);
env.disableOperatorChaining();
env.getCheckpointConfig().enableUnalignedCheckpoints();
+ env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
env.setRestartStrategy(fixedDelayRestart(1, 0));
env.addSource(new NumberSource(lastCheckpointValue))
- .shuffle()
+ .rebalance()
Review comment:
Ok, I see it works pretty well even without `rebalance`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]