[ https://issues.apache.org/jira/browse/FLINK-38267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rui Fan updated FLINK-38267: ---------------------------- Description: h1. 1. Phenomenon: Job cannot be recovered from UC(unaligned checkpoint) after rescaling, and the exception is: {code:java} java.lang.UnsupportedOperationException: Cannot rescale the given pointwise partitioner. Did you change the partitioner to forward or rescale? It may also help to add an explicit shuffle().{code} UC is the abbreviation of unaligned checkpoint in this ticket. h1. 2. Reason h2. 2.1 What types of jobs trigger this bug? When one upstream task has multiple output exchanges, which including UC SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED exchanges(likes Forward or rescale). Or when one downstream task has multiple input exchanges, which including UC SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED exchanges(likes Forward or rescale). h2. 2.2 Why does this bug happen? When job is rescaled and recovered from unaligned checkpoint, flink need to redistribute inflight buffers (input buffers on downstream side and output buffers on upstream side). The ForwardPartitioner and RescalePartitioner exchanges do not support unaligned checkpoint, so they are not expected to perform redistribution logic. From code implementation: * For input buffers redistribution[1], if current task has no input buffer state, and upstream task has no output buffer state, the code will return directly without any redistribution. * For output buffers redistribution[2], if current task has no output buffer state, and downstream task has no input buffer state, the code will return directly without any redistribution. But it does not work when upstream tasks has multiple output exchanges. Following is an DAG example, there are 3 tasks and 2 exchanges (Hash and forward) . * The Hash exchange supports unaligned checkpoint * The Hash exchange does not support unaligned checkpoint !image-2025-08-19-13-58-15-029.png|width=786,height=421! When Job is recovered from UC(unaligned checkpoint) after rescaling, the *Map after forward* will check its input buffer state and Source’s output buffer state. Source task has output buffer state for this case, but these output buffer state is from Hash exchange instead of Forward exchange. It caused the redistribution will be called for {*}Map after forward{*}, it is unexpected. Of course, from the perspective of upstream task(Source task), it has 2 output exchanges, the forward exchange should not call rescale logic even if hash exchange has state. h2. 2.3 Reproduce The following job can reproduce this bug easily. {code:java} import org.apache.commons.math3.random.RandomDataGenerator; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * It could reproduce this issue: * Caused by: java.lang.UnsupportedOperationException: Cannot rescale the given pointwise partitioner. * Did you change the partitioner to forward or rescale? * It may also help to add an explicit shuffle(). */ public class UnalignedCheckpointBugDemo { private static final Logger LOG = LoggerFactory.getLogger(UnalignedCheckpointBugDemo.class); public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setString("rest.port", "12348"); conf.setString("execution.checkpointing.unaligned.enabled", "true"); conf.setString("execution.checkpointing.interval", "10s"); conf.setString("execution.checkpointing.min-pause", "8s"); conf.setString("jobmanager.scheduler", "adaptive"); conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.disableOperatorChaining(); env.setParallelism(5); SingleOutputStreamOperator<String> stream1 = env.fromSource( new DataGeneratorSource<>( value -> new RandomDataGenerator().nextHexString(300), Long.MAX_VALUE, RateLimiterStrategy.perSecond(100000), Types.STRING), WatermarkStrategy.noWatermarks(), "Source Task"); stream1 .keyBy(new KeySelectorFunction()) .map(x -> { Thread.sleep(50); return x; }).name("Map after hash"); stream1.map(x -> { Thread.sleep(5); return x; }).name("Map after forward"); env.execute(UnalignedCheckpointBugDemo.class.getSimpleName()); } private static class KeySelectorFunction implements KeySelector<String, Integer> { @Override public Integer getKey(String value) throws Exception { return 0; } } } {code} h1. 3. Solution The implemented solution was to make the state redistribution logic more granular by checking for in-flight data on a *per-exchange* basis instead of a per-task basis. # *Precise State Tracking:* The {{TaskStateAssignment}} class was refactored to no longer use a simple boolean flag. It now precisely tracks which specific input gates and result partitions contain in-flight data. # *Per-Channel/Partition Checks:* The core redistribution methods, {{reDistributeInputChannelStates}} and {{{}reDistributeResultSubpartitionStates{}}}, were modified. Their internal logic now iterates through each input gate or output partition and uses new helper methods ({{{}hasInFlightDataForInputGate{}}} and {{{}hasInFlightDataForResultPartition{}}}) to check if that _specific channel_ has state. # *Conditional Logic:* The state redistribution logic is now wrapped in a conditional block. It is only invoked for a channel if the per-exchange check passes. This ensures that stateless exchanges (like {{forward}} or {{{}rescale{}}}) are correctly skipped, avoiding the exception. This approach fixes the bug by applying the redistribution logic only where it is actually needed, allowing jobs with mixed partitioner types to rescale from an unaligned checkpoint successfully. [1] [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L413] [2] [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L364] was: h1. 1. Phenomenon: Job cannot be recovered from UC(unaligned checkpoint) after rescaling, and the exception is: {code:java} java.lang.UnsupportedOperationException: Cannot rescale the given pointwise partitioner. Did you change the partitioner to forward or rescale? It may also help to add an explicit shuffle().{code} UC is the abbreviation of unaligned checkpoint in this ticket. h1. 2. Reason h2. 2.1 What types of jobs trigger this bug? When one upstream task has multiple output exchanges, which including UC SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED exchanges(likes Forward or rescale). Or when one downstream task has multiple input exchanges, which including UC SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED exchanges(likes Forward or rescale). h2. 2.2 Why does this bug happen? When job is rescaled and recovered from unaligned checkpoint, flink need to redistribute inflight buffers (input buffers on downstream side and output buffers on upstream side). The ForwardPartitioner and RescalePartitioner exchanges do not support unaligned checkpoint, so they are not expected to perform redistribution logic. From code implementation: * For input buffers redistribution[1], if current task has no input buffer state, and upstream task has no output buffer state, the code will return directly without any redistribution. * For output buffers redistribution[2], if current task has no output buffer state, and downstream task has no input buffer state, the code will return directly without any redistribution. But it does not work when upstream tasks has multiple output exchanges. Following is an DAG example, there are 3 tasks and 2 exchanges (Hash and forward) . * The Hash exchange supports unaligned checkpoint * The Hash exchange does not support unaligned checkpoint !image-2025-08-19-13-58-15-029.png|width=786,height=421! When Job is recovered from UC(unaligned checkpoint) after rescaling, the *Map after forward* will check its input buffer state and Source’s output buffer state. Source task has output buffer state for this case, but these output buffer state is from Hash exchange instead of Forward exchange. It caused the redistribution will be called for {*}Map after forward{*}, it is unexpected. h2. 2.3 Reproduce The following job can reproduce this bug easily. {code:java} import org.apache.commons.math3.random.RandomDataGenerator; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * It could reproduce this issue: * Caused by: java.lang.UnsupportedOperationException: Cannot rescale the given pointwise partitioner. * Did you change the partitioner to forward or rescale? * It may also help to add an explicit shuffle(). */ public class UnalignedCheckpointBugDemo { private static final Logger LOG = LoggerFactory.getLogger(UnalignedCheckpointBugDemo.class); public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setString("rest.port", "12348"); conf.setString("execution.checkpointing.unaligned.enabled", "true"); conf.setString("execution.checkpointing.interval", "10s"); conf.setString("execution.checkpointing.min-pause", "8s"); conf.setString("jobmanager.scheduler", "adaptive"); conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.disableOperatorChaining(); env.setParallelism(5); SingleOutputStreamOperator<String> stream1 = env.fromSource( new DataGeneratorSource<>( value -> new RandomDataGenerator().nextHexString(300), Long.MAX_VALUE, RateLimiterStrategy.perSecond(100000), Types.STRING), WatermarkStrategy.noWatermarks(), "Source Task"); stream1 .keyBy(new KeySelectorFunction()) .map(x -> { Thread.sleep(50); return x; }).name("Map after hash"); stream1.map(x -> { Thread.sleep(5); return x; }).name("Map after forward"); env.execute(UnalignedCheckpointBugDemo.class.getSimpleName()); } private static class KeySelectorFunction implements KeySelector<String, Integer> { @Override public Integer getKey(String value) throws Exception { return 0; } } } {code} h1. 3. Solution When current task check whether the upstream task has output buffer state, it only check the corresponding exchange instead of all exchanges. When current task check whether the downstream task has input buffer state, it only check the corresponding exchange instead of all exchanges. [1] [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L413] [2] [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L364] > Job cannot be recovered from unaligned checkpoint after rescaling when one > task has multiple exchanges > ------------------------------------------------------------------------------------------------------ > > Key: FLINK-38267 > URL: https://issues.apache.org/jira/browse/FLINK-38267 > Project: Flink > Issue Type: Bug > Affects Versions: 2.0.0, 1.20.2, 2.1.0, 2.2.0 > Reporter: Rui Fan > Assignee: Rui Fan > Priority: Major > Labels: pull-request-available > Attachments: image-2025-08-19-13-58-15-029.png > > > h1. 1. Phenomenon: > Job cannot be recovered from UC(unaligned checkpoint) after rescaling, and > the exception is: > {code:java} > java.lang.UnsupportedOperationException: Cannot rescale the given pointwise > partitioner. > Did you change the partitioner to forward or rescale? > It may also help to add an explicit shuffle().{code} > > UC is the abbreviation of unaligned checkpoint in this ticket. > h1. 2. Reason > h2. 2.1 What types of jobs trigger this bug? > When one upstream task has multiple output exchanges, which including UC > SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED > exchanges(likes Forward or rescale). > Or when one downstream task has multiple input exchanges, which including UC > SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED > exchanges(likes Forward or rescale). > h2. 2.2 Why does this bug happen? > When job is rescaled and recovered from unaligned checkpoint, flink need to > redistribute inflight buffers (input buffers on downstream side and output > buffers on upstream side). > The ForwardPartitioner and RescalePartitioner exchanges do not support > unaligned checkpoint, so they are not expected to perform redistribution > logic. From code implementation: > * For input buffers redistribution[1], if current task has no input buffer > state, and upstream task has no output buffer state, the code will return > directly without any redistribution. > * For output buffers redistribution[2], if current task has no output buffer > state, and downstream task has no input buffer state, the code will return > directly without any redistribution. > But it does not work when upstream tasks has multiple output exchanges. > Following is an DAG example, there are 3 tasks and 2 exchanges (Hash and > forward) . > * The Hash exchange supports unaligned checkpoint > * The Hash exchange does not support unaligned checkpoint > > !image-2025-08-19-13-58-15-029.png|width=786,height=421! > When Job is recovered from UC(unaligned checkpoint) after rescaling, the *Map > after forward* will check its input buffer state and Source’s output buffer > state. Source task has output buffer state for this case, but these output > buffer state is from Hash exchange instead of Forward exchange. > It caused the redistribution will be called for {*}Map after forward{*}, it > is unexpected. > Of course, from the perspective of upstream task(Source task), it has 2 > output exchanges, the forward exchange should not call rescale logic even if > hash exchange has state. > h2. 2.3 Reproduce > The following job can reproduce this bug easily. > > {code:java} > import org.apache.commons.math3.random.RandomDataGenerator; > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.flink.api.common.typeinfo.Types; > import > org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; > import org.apache.flink.api.java.functions.KeySelector; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.connector.datagen.source.DataGeneratorSource; > import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > /** > * It could reproduce this issue: > * Caused by: java.lang.UnsupportedOperationException: Cannot rescale the > given pointwise partitioner. > * Did you change the partitioner to forward or rescale? > * It may also help to add an explicit shuffle(). > */ > public class UnalignedCheckpointBugDemo { > private static final Logger LOG = > LoggerFactory.getLogger(UnalignedCheckpointBugDemo.class); > public static void main(String[] args) throws Exception { > Configuration conf = new Configuration(); > conf.setString("rest.port", "12348"); > conf.setString("execution.checkpointing.unaligned.enabled", "true"); > conf.setString("execution.checkpointing.interval", "10s"); > conf.setString("execution.checkpointing.min-pause", "8s"); > conf.setString("jobmanager.scheduler", "adaptive"); > conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob"); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(conf); > env.disableOperatorChaining(); > env.setParallelism(5); > SingleOutputStreamOperator<String> stream1 = env.fromSource( > new DataGeneratorSource<>( > value -> new > RandomDataGenerator().nextHexString(300), > Long.MAX_VALUE, > RateLimiterStrategy.perSecond(100000), > Types.STRING), > WatermarkStrategy.noWatermarks(), > "Source Task"); > stream1 > .keyBy(new KeySelectorFunction()) > .map(x -> { > Thread.sleep(50); > return x; > }).name("Map after hash"); > stream1.map(x -> { > Thread.sleep(5); > return x; > }).name("Map after forward"); > env.execute(UnalignedCheckpointBugDemo.class.getSimpleName()); > } > private static class KeySelectorFunction implements KeySelector<String, > Integer> { > @Override > public Integer getKey(String value) throws Exception { > return 0; > } > } > } > {code} > > h1. 3. Solution > The implemented solution was to make the state redistribution logic more > granular by checking for in-flight data on a *per-exchange* basis instead of > a per-task basis. > # *Precise State Tracking:* The {{TaskStateAssignment}} class was refactored > to no longer use a simple boolean flag. It now precisely tracks which > specific input gates and result partitions contain in-flight data. > # *Per-Channel/Partition Checks:* The core redistribution methods, > {{reDistributeInputChannelStates}} and > {{{}reDistributeResultSubpartitionStates{}}}, were modified. Their internal > logic now iterates through each input gate or output partition and uses new > helper methods ({{{}hasInFlightDataForInputGate{}}} and > {{{}hasInFlightDataForResultPartition{}}}) to check if that _specific > channel_ has state. > # *Conditional Logic:* The state redistribution logic is now wrapped in a > conditional block. It is only invoked for a channel if the per-exchange check > passes. This ensures that stateless exchanges (like {{forward}} or > {{{}rescale{}}}) are correctly skipped, avoiding the exception. > This approach fixes the bug by applying the redistribution logic only where > it is actually needed, allowing jobs with mixed partitioner types to rescale > from an unaligned checkpoint successfully. > > [1] > [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L413] > [2] > [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L364] -- This message was sent by Atlassian Jira (v8.20.10#820010)