Rui Fan created FLINK-38267: ------------------------------- Summary: Job cannot be recovered from unaligned checkpoint after rescaling when one task has multiple exchanges Key: FLINK-38267 URL: https://issues.apache.org/jira/browse/FLINK-38267 Project: Flink Issue Type: Bug Reporter: Rui Fan Attachments: image-2025-08-19-13-58-15-029.png
h1. 1. Phenomenon: Job cannot be recovered from UC(unaligned checkpoint) after rescaling, and the exception is: {{}} {code:java} {code} {{java.lang.UnsupportedOperationException: Cannot rescale the given pointwise partitioner. Did you change the partitioner to forward or rescale? It may also help to add an explicit shuffle().}} UC is the abbreviation of unaligned checkpoint in this ticket. h1. 2. Reason h2. 2.1 What types of jobs trigger this bug? When one upstream task has multiple output exchanges, which including UC SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED exchanges(likes Forward or rescale). Or when one downstream task has multiple input exchanges, which including UC SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED exchanges(likes Forward or rescale). h2. 2.2 Why does this bug happen? When job is rescaled and recovered from unaligned checkpoint, flink need to redistribute inflight buffers (input buffers on downstream side and output buffers on upstream side). The ForwardPartitioner and RescalePartitioner exchanges do not support unaligned checkpoint, so they are not expected to perform redistribution logic. From code implementation: * For input buffers redistribution[1], if current task has no input buffer state, and upstream task has no output buffer state, the code will return directly without any redistribution. * For output buffers redistribution[2], if current task has no output buffer state, and downstream task has no input buffer state, the code will return directly without any redistribution. But it does not work when upstream tasks has multiple output exchanges. Following is an DAG example, there are 3 tasks and 2 exchanges (Hash and forward) . * The Hash exchange supports unaligned checkpoint * The Hash exchange does not support unaligned checkpoint !image-2025-08-19-13-58-15-029.png! When Job is recovered from UC(unaligned checkpoint) after rescaling, the *Map after forward* will check its input buffer state and Source’s output buffer state. Source task has output buffer state for this case, but these output buffer state is from Hash exchange instead of Forward exchange. It caused the redistribution will be called for {*}Map after forward{*}, it is unexpected. h2. 2.3 Reproduce The following job can reproduce this bug easily. {code:java} import org.apache.commons.math3.random.RandomDataGenerator; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * It could reproduce this issue: * Caused by: java.lang.UnsupportedOperationException: Cannot rescale the given pointwise partitioner. * Did you change the partitioner to forward or rescale? * It may also help to add an explicit shuffle(). */ public class UnalignedCheckpointBugDemo { private static final Logger LOG = LoggerFactory.getLogger(UnalignedCheckpointBugDemo.class); public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setString("rest.port", "12348"); conf.setString("execution.checkpointing.unaligned.enabled", "true"); conf.setString("execution.checkpointing.interval", "10s"); conf.setString("execution.checkpointing.min-pause", "8s"); conf.setString("jobmanager.scheduler", "adaptive"); conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.disableOperatorChaining(); env.setParallelism(5); SingleOutputStreamOperator<String> stream1 = env.fromSource( new DataGeneratorSource<>( value -> new RandomDataGenerator().nextHexString(300), Long.MAX_VALUE, RateLimiterStrategy.perSecond(100000), Types.STRING), WatermarkStrategy.noWatermarks(), "Source Task"); stream1 .keyBy(new KeySelectorFunction()) .map(x -> { Thread.sleep(50); return x; }).name("Map after hash"); stream1.map(x -> { Thread.sleep(5); return x; }).name("Map after forward"); env.execute(UnalignedCheckpointBugDemo.class.getSimpleName()); } private static class KeySelectorFunction implements KeySelector<String, Integer> { @Override public Integer getKey(String value) throws Exception { return 0; } } } {code} h1. 3. Solution When current task check whether the upstream task has output buffer state, it only check the corresponding exchange instead of all exchanges. When current task check whether the downstream task has input buffer state, it only check the corresponding exchange instead of all exchanges. [1] [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L413] [2] [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L364] -- This message was sent by Atlassian Jira (v8.20.10#820010)