pnowojski commented on code in PR #27182:
URL: https://github.com/apache/flink/pull/27182#discussion_r2524099004
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##########
@@ -153,6 +157,20 @@ public void assignStates() {
}
}
+ // distribute output channel states to downstream tasks if needed
+ // Note: it has to be called after assignAttemptState for all tasks
since the
+ // redistributing of result subpartition states depend on the
inputSubtaskMappings
+ // of downstream tasks.
+ if (unalignedAllowOnRecovery) {
+ for (TaskStateAssignment stateAssignment :
vertexAssignments.values()) {
+ // If unalignedAllowOnRecovery is enabled, all upstream output
buffers have to be
+ // distributed to downstream since the upstream task side
doesn’t deserialize
+ // records generally. It is easy to filter records and
re-upload records if
+ // recovering output buffers on downstream task side directly.
+ stateAssignment.distributeOutputBuffersToDownstream();
+ }
+ }
Review Comment:
nit: this method is growing a bit too large, being separated via comments
into distinct parts. Maybe we could refactor it and extract those parts into
smaller methods? Like `repartitionState(...)`, `assignState(...)`?
##########
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##########
@@ -646,6 +646,15 @@ public class CheckpointingOptions {
+ "It can reduce the number of small files
when enable unaligned checkpoint. "
+ "Each subtask will create a new channel
state file when this is configured to 1.");
+ @Experimental
+ public static final ConfigOption<Boolean> UNALIGNED_ALLOW_ON_RECOVERY =
+
ConfigOptions.key("execution.checkpointing.unaligned.allow-on-recovery")
Review Comment:
nit: `execution.checkpointing.unaligned.during-recovery.enabled`?
also given you want to provide this feature in two steps:
- recovery output buffers on the input side
- support checkpointing during recovery
Would it be useful to have two separate feature flags for that? 🤔 (I'm not
sure)
##########
flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java:
##########
@@ -95,6 +96,9 @@ public void before() throws Exception {
Configuration configuration = new Configuration();
configuration.set(CACHE_IDLE_TIMEOUT, Duration.ofDays(365)); // cache
forever
+ // Disable UNALIGNED_ALLOW_ON_RECOVERY since the output buffer states
file may be opened
+ // from multiple subtasks
+ configuration.set(UNALIGNED_ALLOW_ON_RECOVERY, false);
Review Comment:
Can you enable this for randomisation in othe ITCases?
`org.apache.flink.streaming.util.TestStreamEnvironment#randomizeConfiguration`
And make sure that this flag/code path (with non empty output data) is
being actually used in our `UnalignedCheckpoint*****ITCase` tests (there are 6
files testing for various of scenarios)
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java:
##########
@@ -99,7 +100,12 @@ public void recover(
Buffer buffer = bufferWithContext.context;
try {
if (buffer.readableBytes() > 0) {
- for (final RecoveredInputChannel channel :
getMappedChannels(channelInfo)) {
+ List<RecoveredInputChannel> mappedChannels =
getMappedChannels(channelInfo);
+ checkState(
+ mappedChannels.size() == 1,
+ "One buffer is only distributed to one target
InputChannel since "
+ + "one buffer is expected to be processed once
by the same task.");
+ for (final RecoveredInputChannel channel : mappedChannels) {
Review Comment:
I'm not following this change? 🤔 Has this invariant/limitation always been
present in the code? Or is it something new?
Why do we assert list has exactly single element and than we still have a
code that loops over all the elements? Shouldn't we use sth like
`Iterables.getOnlyElement`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateAssignment.java:
##########
@@ -540,6 +580,75 @@ public boolean hasInFlightDataForResultPartition(int
partitionIndex) {
return false;
}
+ public int findInputGateIdxForResultPartition(int partitionIndex) {
Review Comment:
nit: move this method below `distributeOutputBuffersToDownstream` as it is
used only there. And also it is a bit weird that this method is public, while
the higher level method `distributeOutputBuffersToDownstream` that is actually
being called from the outside is package private. Make it private?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]