[ 
https://issues.apache.org/jira/browse/FLINK-38267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-38267:
----------------------------
    Description: 
h1. 1. Phenomenon:

Job cannot be recovered from UC(unaligned checkpoint) after rescaling, and the 
exception is:
{code:java}
java.lang.UnsupportedOperationException: Cannot rescale the given pointwise 
partitioner. 
Did you change the partitioner to forward or rescale? 
It may also help to add an explicit shuffle().{code}
 

UC is the abbreviation of unaligned checkpoint in this ticket.
h1. 2. Reason
h2. 2.1 What types of jobs trigger this bug?

When one upstream task has multiple output exchanges, which including UC 
SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED 
exchanges(likes Forward or rescale).

Or when one downstream task has multiple input exchanges, which including UC 
SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED 
exchanges(likes Forward or rescale).
h2. 2.2 Why does this bug happen?

When job is rescaled and recovered from unaligned checkpoint, flink need to 
redistribute inflight buffers (input buffers on downstream side and output 
buffers on upstream side).

The ForwardPartitioner and RescalePartitioner exchanges do not support 
unaligned checkpoint, so they are not expected to perform redistribution logic. 
From code implementation:
 * For input buffers redistribution[1], if current task has no input buffer 
state, and upstream task has no output buffer state, the code will return 
directly without any redistribution.

 * For output buffers redistribution[2], if current task has no output buffer 
state, and downstream task has no input buffer state, the code will return 
directly without any redistribution.

But it does not work when upstream tasks has multiple output exchanges. 
Following is an DAG example, there are 3 tasks and 2 exchanges (Hash and 
forward) .
 * The Hash exchange supports unaligned checkpoint

 * The Hash exchange does not support unaligned checkpoint

 

!image-2025-08-19-13-58-15-029.png|width=786,height=421!

When Job is recovered from UC(unaligned checkpoint) after rescaling, the *Map 
after forward* will check its input buffer state and Source’s output buffer 
state. Source task has output buffer state for this case, but these output 
buffer state is from Hash exchange instead of Forward exchange.

It caused the redistribution will be called for {*}Map after forward{*}, it is 
unexpected.

 
h2. 2.3 Reproduce

The following job can reproduce this bug easily.

 
{code:java}
import org.apache.commons.math3.random.RandomDataGenerator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * It could reproduce this issue:
 * Caused by: java.lang.UnsupportedOperationException: Cannot rescale the given 
pointwise partitioner.
 * Did you change the partitioner to forward or rescale?
 * It may also help to add an explicit shuffle().
 */
public class UnalignedCheckpointBugDemo {
    private static final Logger LOG = 
LoggerFactory.getLogger(UnalignedCheckpointBugDemo.class);
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("rest.port", "12348");
        conf.setString("execution.checkpointing.unaligned.enabled", "true");
        conf.setString("execution.checkpointing.interval", "10s");
        conf.setString("execution.checkpointing.min-pause", "8s");
        conf.setString("jobmanager.scheduler", "adaptive");
        conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.disableOperatorChaining();
        env.setParallelism(5);
        SingleOutputStreamOperator<String> stream1 = env.fromSource(
                        new DataGeneratorSource<>(
                                value -> new 
RandomDataGenerator().nextHexString(300),
                                Long.MAX_VALUE,
                                RateLimiterStrategy.perSecond(100000),
                                Types.STRING),
                        WatermarkStrategy.noWatermarks(),
                        "Source Task");
        stream1
                .keyBy(new KeySelectorFunction())
                .map(x -> {
                    Thread.sleep(50);
                    return x;
                }).name("Map after hash");
        stream1.map(x -> {
                    Thread.sleep(5);
                    return x;
            }).name("Map after forward");
        env.execute(UnalignedCheckpointBugDemo.class.getSimpleName());
    }

    private static class KeySelectorFunction implements KeySelector<String, 
Integer> {
        @Override
        public Integer getKey(String value) throws Exception {
            return 0;
        }
    }
}
{code}
 
h1. 3. Solution

When current task check whether the upstream task has output buffer state, it 
only check the corresponding exchange instead of all exchanges.

When current task check whether the downstream task has input buffer state, it 
only check the corresponding exchange instead of all exchanges.

 

[1] 
[https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L413]

[2] 
[https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L364]

  was:
h1. 1. Phenomenon:

Job cannot be recovered from UC(unaligned checkpoint) after rescaling, and the 
exception is:

{{}}
{code:java}

{code}
{{java.lang.UnsupportedOperationException: Cannot rescale the given pointwise 
partitioner. Did you change the partitioner to forward or rescale? It may also 
help to add an explicit shuffle().}}

 

UC is the abbreviation of unaligned checkpoint in this ticket.
h1. 2. Reason
h2. 2.1 What types of jobs trigger this bug?

When one upstream task has multiple output exchanges, which including UC 
SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED 
exchanges(likes Forward or rescale).

Or when one downstream task has multiple input exchanges, which including UC 
SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED 
exchanges(likes Forward or rescale).
h2. 2.2 Why does this bug happen?

When job is rescaled and recovered from unaligned checkpoint, flink need to 
redistribute inflight buffers (input buffers on downstream side and output 
buffers on upstream side).

The ForwardPartitioner and RescalePartitioner exchanges do not support 
unaligned checkpoint, so they are not expected to perform redistribution logic. 
From code implementation:
 * For input buffers redistribution[1], if current task has no input buffer 
state, and upstream task has no output buffer state, the code will return 
directly without any redistribution.

 * For output buffers redistribution[2], if current task has no output buffer 
state, and downstream task has no input buffer state, the code will return 
directly without any redistribution.

But it does not work when upstream tasks has multiple output exchanges. 
Following is an DAG example, there are 3 tasks and 2 exchanges (Hash and 
forward) .
 * The Hash exchange supports unaligned checkpoint

 * The Hash exchange does not support unaligned checkpoint

 

!image-2025-08-19-13-58-15-029.png!

When Job is recovered from UC(unaligned checkpoint) after rescaling, the *Map 
after forward* will check its input buffer state and Source’s output buffer 
state. Source task has output buffer state for this case, but these output 
buffer state is from Hash exchange instead of Forward exchange.

It caused the redistribution will be called for {*}Map after forward{*}, it is 
unexpected.

 
h2. 2.3 Reproduce

The following job can reproduce this bug easily.

 
{code:java}
import org.apache.commons.math3.random.RandomDataGenerator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * It could reproduce this issue:
 * Caused by: java.lang.UnsupportedOperationException: Cannot rescale the given 
pointwise partitioner.
 * Did you change the partitioner to forward or rescale?
 * It may also help to add an explicit shuffle().
 */
public class UnalignedCheckpointBugDemo {
    private static final Logger LOG = 
LoggerFactory.getLogger(UnalignedCheckpointBugDemo.class);
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("rest.port", "12348");
        conf.setString("execution.checkpointing.unaligned.enabled", "true");
        conf.setString("execution.checkpointing.interval", "10s");
        conf.setString("execution.checkpointing.min-pause", "8s");
        conf.setString("jobmanager.scheduler", "adaptive");
        conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.disableOperatorChaining();
        env.setParallelism(5);
        SingleOutputStreamOperator<String> stream1 = env.fromSource(
                        new DataGeneratorSource<>(
                                value -> new 
RandomDataGenerator().nextHexString(300),
                                Long.MAX_VALUE,
                                RateLimiterStrategy.perSecond(100000),
                                Types.STRING),
                        WatermarkStrategy.noWatermarks(),
                        "Source Task");
        stream1
                .keyBy(new KeySelectorFunction())
                .map(x -> {
                    Thread.sleep(50);
                    return x;
                }).name("Map after hash");
        stream1.map(x -> {
                    Thread.sleep(5);
                    return x;
            }).name("Map after forward");
        env.execute(UnalignedCheckpointBugDemo.class.getSimpleName());
    }

    private static class KeySelectorFunction implements KeySelector<String, 
Integer> {
        @Override
        public Integer getKey(String value) throws Exception {
            return 0;
        }
    }
}
{code}
 
h1. 3. Solution

When current task check whether the upstream task has output buffer state, it 
only check the corresponding exchange instead of all exchanges.

When current task check whether the downstream task has input buffer state, it 
only check the corresponding exchange instead of all exchanges.

 

[1] 
[https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L413]

[2] 
[https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L364]


> Job cannot be recovered from unaligned checkpoint after rescaling when one 
> task has multiple exchanges
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38267
>                 URL: https://issues.apache.org/jira/browse/FLINK-38267
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Rui Fan
>            Priority: Major
>         Attachments: image-2025-08-19-13-58-15-029.png
>
>
> h1. 1. Phenomenon:
> Job cannot be recovered from UC(unaligned checkpoint) after rescaling, and 
> the exception is:
> {code:java}
> java.lang.UnsupportedOperationException: Cannot rescale the given pointwise 
> partitioner. 
> Did you change the partitioner to forward or rescale? 
> It may also help to add an explicit shuffle().{code}
>  
> UC is the abbreviation of unaligned checkpoint in this ticket.
> h1. 2. Reason
> h2. 2.1 What types of jobs trigger this bug?
> When one upstream task has multiple output exchanges, which including UC 
> SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED 
> exchanges(likes Forward or rescale).
> Or when one downstream task has multiple input exchanges, which including UC 
> SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED 
> exchanges(likes Forward or rescale).
> h2. 2.2 Why does this bug happen?
> When job is rescaled and recovered from unaligned checkpoint, flink need to 
> redistribute inflight buffers (input buffers on downstream side and output 
> buffers on upstream side).
> The ForwardPartitioner and RescalePartitioner exchanges do not support 
> unaligned checkpoint, so they are not expected to perform redistribution 
> logic. From code implementation:
>  * For input buffers redistribution[1], if current task has no input buffer 
> state, and upstream task has no output buffer state, the code will return 
> directly without any redistribution.
>  * For output buffers redistribution[2], if current task has no output buffer 
> state, and downstream task has no input buffer state, the code will return 
> directly without any redistribution.
> But it does not work when upstream tasks has multiple output exchanges. 
> Following is an DAG example, there are 3 tasks and 2 exchanges (Hash and 
> forward) .
>  * The Hash exchange supports unaligned checkpoint
>  * The Hash exchange does not support unaligned checkpoint
>  
> !image-2025-08-19-13-58-15-029.png|width=786,height=421!
> When Job is recovered from UC(unaligned checkpoint) after rescaling, the *Map 
> after forward* will check its input buffer state and Source’s output buffer 
> state. Source task has output buffer state for this case, but these output 
> buffer state is from Hash exchange instead of Forward exchange.
> It caused the redistribution will be called for {*}Map after forward{*}, it 
> is unexpected.
>  
> h2. 2.3 Reproduce
> The following job can reproduce this bug easily.
>  
> {code:java}
> import org.apache.commons.math3.random.RandomDataGenerator;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.api.common.typeinfo.Types;
> import 
> org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.connector.datagen.source.DataGeneratorSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> /**
>  * It could reproduce this issue:
>  * Caused by: java.lang.UnsupportedOperationException: Cannot rescale the 
> given pointwise partitioner.
>  * Did you change the partitioner to forward or rescale?
>  * It may also help to add an explicit shuffle().
>  */
> public class UnalignedCheckpointBugDemo {
>     private static final Logger LOG = 
> LoggerFactory.getLogger(UnalignedCheckpointBugDemo.class);
>     public static void main(String[] args) throws Exception {
>         Configuration conf = new Configuration();
>         conf.setString("rest.port", "12348");
>         conf.setString("execution.checkpointing.unaligned.enabled", "true");
>         conf.setString("execution.checkpointing.interval", "10s");
>         conf.setString("execution.checkpointing.min-pause", "8s");
>         conf.setString("jobmanager.scheduler", "adaptive");
>         conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>         env.disableOperatorChaining();
>         env.setParallelism(5);
>         SingleOutputStreamOperator<String> stream1 = env.fromSource(
>                         new DataGeneratorSource<>(
>                                 value -> new 
> RandomDataGenerator().nextHexString(300),
>                                 Long.MAX_VALUE,
>                                 RateLimiterStrategy.perSecond(100000),
>                                 Types.STRING),
>                         WatermarkStrategy.noWatermarks(),
>                         "Source Task");
>         stream1
>                 .keyBy(new KeySelectorFunction())
>                 .map(x -> {
>                     Thread.sleep(50);
>                     return x;
>                 }).name("Map after hash");
>         stream1.map(x -> {
>                     Thread.sleep(5);
>                     return x;
>             }).name("Map after forward");
>         env.execute(UnalignedCheckpointBugDemo.class.getSimpleName());
>     }
>     private static class KeySelectorFunction implements KeySelector<String, 
> Integer> {
>         @Override
>         public Integer getKey(String value) throws Exception {
>             return 0;
>         }
>     }
> }
> {code}
>  
> h1. 3. Solution
> When current task check whether the upstream task has output buffer state, it 
> only check the corresponding exchange instead of all exchanges.
> When current task check whether the downstream task has input buffer state, 
> it only check the corresponding exchange instead of all exchanges.
>  
> [1] 
> [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L413]
> [2] 
> [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L364]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to