Rui Fan created FLINK-38267:
-------------------------------

             Summary: Job cannot be recovered from unaligned checkpoint after 
rescaling when one task has multiple exchanges
                 Key: FLINK-38267
                 URL: https://issues.apache.org/jira/browse/FLINK-38267
             Project: Flink
          Issue Type: Bug
            Reporter: Rui Fan
         Attachments: image-2025-08-19-13-58-15-029.png

h1. 1. Phenomenon:

Job cannot be recovered from UC(unaligned checkpoint) after rescaling, and the 
exception is:

{{}}
{code:java}

{code}
{{java.lang.UnsupportedOperationException: Cannot rescale the given pointwise 
partitioner. Did you change the partitioner to forward or rescale? It may also 
help to add an explicit shuffle().}}

 

UC is the abbreviation of unaligned checkpoint in this ticket.
h1. 2. Reason
h2. 2.1 What types of jobs trigger this bug?

When one upstream task has multiple output exchanges, which including UC 
SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED 
exchanges(likes Forward or rescale).

Or when one downstream task has multiple input exchanges, which including UC 
SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED 
exchanges(likes Forward or rescale).
h2. 2.2 Why does this bug happen?

When job is rescaled and recovered from unaligned checkpoint, flink need to 
redistribute inflight buffers (input buffers on downstream side and output 
buffers on upstream side).

The ForwardPartitioner and RescalePartitioner exchanges do not support 
unaligned checkpoint, so they are not expected to perform redistribution logic. 
From code implementation:
 * For input buffers redistribution[1], if current task has no input buffer 
state, and upstream task has no output buffer state, the code will return 
directly without any redistribution.

 * For output buffers redistribution[2], if current task has no output buffer 
state, and downstream task has no input buffer state, the code will return 
directly without any redistribution.

But it does not work when upstream tasks has multiple output exchanges. 
Following is an DAG example, there are 3 tasks and 2 exchanges (Hash and 
forward) .
 * The Hash exchange supports unaligned checkpoint

 * The Hash exchange does not support unaligned checkpoint

 

!image-2025-08-19-13-58-15-029.png!

When Job is recovered from UC(unaligned checkpoint) after rescaling, the *Map 
after forward* will check its input buffer state and Source’s output buffer 
state. Source task has output buffer state for this case, but these output 
buffer state is from Hash exchange instead of Forward exchange.

It caused the redistribution will be called for {*}Map after forward{*}, it is 
unexpected.

 
h2. 2.3 Reproduce

The following job can reproduce this bug easily.

 
{code:java}
import org.apache.commons.math3.random.RandomDataGenerator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * It could reproduce this issue:
 * Caused by: java.lang.UnsupportedOperationException: Cannot rescale the given 
pointwise partitioner.
 * Did you change the partitioner to forward or rescale?
 * It may also help to add an explicit shuffle().
 */
public class UnalignedCheckpointBugDemo {
    private static final Logger LOG = 
LoggerFactory.getLogger(UnalignedCheckpointBugDemo.class);
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("rest.port", "12348");
        conf.setString("execution.checkpointing.unaligned.enabled", "true");
        conf.setString("execution.checkpointing.interval", "10s");
        conf.setString("execution.checkpointing.min-pause", "8s");
        conf.setString("jobmanager.scheduler", "adaptive");
        conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.disableOperatorChaining();
        env.setParallelism(5);
        SingleOutputStreamOperator<String> stream1 = env.fromSource(
                        new DataGeneratorSource<>(
                                value -> new 
RandomDataGenerator().nextHexString(300),
                                Long.MAX_VALUE,
                                RateLimiterStrategy.perSecond(100000),
                                Types.STRING),
                        WatermarkStrategy.noWatermarks(),
                        "Source Task");
        stream1
                .keyBy(new KeySelectorFunction())
                .map(x -> {
                    Thread.sleep(50);
                    return x;
                }).name("Map after hash");
        stream1.map(x -> {
                    Thread.sleep(5);
                    return x;
            }).name("Map after forward");
        env.execute(UnalignedCheckpointBugDemo.class.getSimpleName());
    }

    private static class KeySelectorFunction implements KeySelector<String, 
Integer> {
        @Override
        public Integer getKey(String value) throws Exception {
            return 0;
        }
    }
}
{code}
 
h1. 3. Solution

When current task check whether the upstream task has output buffer state, it 
only check the corresponding exchange instead of all exchanges.

When current task check whether the downstream task has input buffer state, it 
only check the corresponding exchange instead of all exchanges.

 

[1] 
[https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L413]

[2] 
[https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L364]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to