pnowojski commented on a change in pull request #15313:
URL: https://github.com/apache/flink/pull/15313#discussion_r598709748
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
##########
@@ -126,69 +202,38 @@ public void processBarrier(CheckpointBarrier barrier,
InputChannelInfo channelIn
if (currentCheckpointId > barrierId
|| (currentCheckpointId == barrierId &&
!isCheckpointPending())) {
- controller.obsoleteBarrierReceived(channelInfo, barrier);
Review comment:
What has happened to that call? It had an important purpose of
resuming/unblocking network channel if this obsoleted barrier was an aligned
one.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
##########
@@ -126,69 +202,38 @@ public void processBarrier(CheckpointBarrier barrier,
InputChannelInfo channelIn
if (currentCheckpointId > barrierId
|| (currentCheckpointId == barrierId &&
!isCheckpointPending())) {
- controller.obsoleteBarrierReceived(channelInfo, barrier);
return;
}
- checkSubsumedCheckpoint(barrier);
+ checkNewCheckpoint(barrier);
- if (numBarriersReceived == 0) {
+ if (numBarriersReceived++ == 0) {
Review comment:
nit: moving of this increment also ideally should be in another commit,
just for the sake of making this large refactoring commit as small as possible.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
##########
@@ -365,4 +409,88 @@ public String toString() {
"%s: current checkpoint: %d, current barriers: %d, open
channels: %d",
taskName, currentCheckpointId, numBarriersReceived,
numOpenChannels);
}
+
+ class ContextImpl implements BarrierHandleState.Context {
+ private final Map<InputChannelInfo, Integer>
sequenceNumberInAnnouncedChannels =
+ new HashMap<>();
+
+ private final Set<InputChannelInfo> blockedChannels = new HashSet<>();
Review comment:
Maybe those state fields should be moved to `BarrierHandleState`, while
`ContextImpl` would become a stateless class - just a proxy for action calls?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
##########
@@ -126,69 +202,38 @@ public void processBarrier(CheckpointBarrier barrier,
InputChannelInfo channelIn
if (currentCheckpointId > barrierId
|| (currentCheckpointId == barrierId &&
!isCheckpointPending())) {
- controller.obsoleteBarrierReceived(channelInfo, barrier);
return;
}
- checkSubsumedCheckpoint(barrier);
+ checkNewCheckpoint(barrier);
- if (numBarriersReceived == 0) {
+ if (numBarriersReceived++ == 0) {
if (getNumOpenChannels() == 1) {
markAlignmentStartAndEnd(barrier.getTimestamp());
} else {
markAlignmentStart(barrier.getTimestamp());
}
allBarriersReceivedFuture = new CompletableFuture<>();
-
- if (!handleBarrier(barrier, channelInfo,
controller::preProcessFirstBarrier)) {
- return;
- }
- }
-
- if (!handleBarrier(barrier, channelInfo, controller::barrierReceived))
{
- return;
}
- if (currentCheckpointId == barrierId) {
Review comment:
What has happened with this check? `if (currentCheckpointId ==
barrierId) {`? I'm not sure if it was important or not, but I just noticed it
has disappeared.
Was it a no-op? If so it would be probably a better idea to have it removed
in a separate small hotfix/refactor commit?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]