[ 
https://issues.apache.org/jira/browse/FLINK-38728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhe Wang updated FLINK-38728:
-----------------------------
    Attachment: 截屏2026-01-23 00.45.15.png

> fullWindowPartition().reduce() is not being chained
> ---------------------------------------------------
>
>                 Key: FLINK-38728
>                 URL: https://issues.apache.org/jira/browse/FLINK-38728
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.20.3
>            Reporter: Jacob Jona Fahlenkamp
>            Priority: Minor
>         Attachments: image-2025-11-25-12-25-13-583.png, 
> image-2025-11-25-12-34-28-789.png, 截屏2026-01-23 00.45.15.png
>
>
> We were hoping to do local global aggregation, to avoid persisting a large 
> amount of data in a batch job. But this doesn't quite work because the reduce 
> operator is not chained. This is because it its chaining strategy is not 
> overriden. It uses instead the default HEAD strategy from its super class.
> I think the solution would be as simple as adding this line in the 
> constructor of 
> [PartitionReduceOperator|https://github.com/apache/flink/blob/1cc2f147ddf0ffd4ce37be7c6f55355cf34fa907/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/PartitionReduceOperator.java]
> {code:java}
> public PartitionReduceOperator(ReduceFunction<IN> reduceFunction) {
>     super(reduceFunction);
>     this.reduceFunction = reduceFunction;
>     this.chainingStrategy = ChainingStrategy.ALWAYS; // add this line
> }
> {code}
> Here is a simple test to demonstrate this behavior:
> {code:java}
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.api.common.typeinfo.Types;
> import 
> org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.configuration.RestOptions;
> import org.apache.flink.connector.datagen.source.DataGeneratorSource;
> import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.operators.ChainingStrategy;
> import org.apache.flink.test.junit5.MiniClusterExtension;
> import org.junit.jupiter.api.Test;
> import org.junit.jupiter.api.extension.RegisterExtension;
> public class FlinkTest {
>     private static final Configuration FLINK_CONFIG = new 
> Configuration().set(RestOptions.PORT, 8081);
>     @RegisterExtension
>     public static final MiniClusterExtension FLINK_CLUSTER = new 
> MiniClusterExtension(
>           new MiniClusterResourceConfiguration.Builder()
>                 .setNumberSlotsPerTaskManager(2)
>                 .setNumberTaskManagers(1)
>                 .setConfiguration(FLINK_CONFIG)
>                 .build());
>     @Test
>     void noChaining() throws Exception {
>        var env = 
> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);
>        var datagen = new DataGeneratorSource<>(x -> x, Long.MAX_VALUE, 
> RateLimiterStrategy.perSecond(1), Types.LONG);
>        var reduce = env.fromSource(datagen, WatermarkStrategy.noWatermarks(), 
> "source")
>                        .fullWindowPartition().reduce(Long::sum);
>        reduce.print();
>        env.execute();
>     }
>     @Test
>     void withChaining() throws Exception {
>        var env = 
> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);
>        var datagen = new DataGeneratorSource<>(x -> x, Long.MAX_VALUE, 
> RateLimiterStrategy.perSecond(1), Types.LONG);
>        var reduce = env.fromSource(datagen, WatermarkStrategy.noWatermarks(), 
> "source")
>                        .fullWindowPartition().reduce(Long::sum);
>        reduce.print();
>        var m = SingleOutputStreamOperator.class
>              .getDeclaredMethod("setChainingStrategy", 
> ChainingStrategy.class);
>        m.setAccessible(true);
>        m.invoke(reduce, ChainingStrategy.ALWAYS);
>        env.execute();
>     }
> } {code}
> The first function "noChaining" generates a job graph like this:
> !image-2025-11-25-12-25-13-583.png!
> As you can see the operators are not chained.
> In the second function "withChaining" I use reflection to set the private 
> field "chainingStrategy" in the PartitionReduceOperator. This produces the 
> expected result with the operators being chained:
> !image-2025-11-25-12-34-28-789.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to