[
https://issues.apache.org/jira/browse/FLINK-38728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhe Wang updated FLINK-38728:
-----------------------------
Attachment: 截屏2026-01-23 00.45.15.png
> fullWindowPartition().reduce() is not being chained
> ---------------------------------------------------
>
> Key: FLINK-38728
> URL: https://issues.apache.org/jira/browse/FLINK-38728
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 1.20.3
> Reporter: Jacob Jona Fahlenkamp
> Priority: Minor
> Attachments: image-2025-11-25-12-25-13-583.png,
> image-2025-11-25-12-34-28-789.png, 截屏2026-01-23 00.45.15.png
>
>
> We were hoping to do local global aggregation, to avoid persisting a large
> amount of data in a batch job. But this doesn't quite work because the reduce
> operator is not chained. This is because it its chaining strategy is not
> overriden. It uses instead the default HEAD strategy from its super class.
> I think the solution would be as simple as adding this line in the
> constructor of
> [PartitionReduceOperator|https://github.com/apache/flink/blob/1cc2f147ddf0ffd4ce37be7c6f55355cf34fa907/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/PartitionReduceOperator.java]
> {code:java}
> public PartitionReduceOperator(ReduceFunction<IN> reduceFunction) {
> super(reduceFunction);
> this.reduceFunction = reduceFunction;
> this.chainingStrategy = ChainingStrategy.ALWAYS; // add this line
> }
> {code}
> Here is a simple test to demonstrate this behavior:
> {code:java}
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.api.common.typeinfo.Types;
> import
> org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.configuration.RestOptions;
> import org.apache.flink.connector.datagen.source.DataGeneratorSource;
> import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.operators.ChainingStrategy;
> import org.apache.flink.test.junit5.MiniClusterExtension;
> import org.junit.jupiter.api.Test;
> import org.junit.jupiter.api.extension.RegisterExtension;
> public class FlinkTest {
> private static final Configuration FLINK_CONFIG = new
> Configuration().set(RestOptions.PORT, 8081);
> @RegisterExtension
> public static final MiniClusterExtension FLINK_CLUSTER = new
> MiniClusterExtension(
> new MiniClusterResourceConfiguration.Builder()
> .setNumberSlotsPerTaskManager(2)
> .setNumberTaskManagers(1)
> .setConfiguration(FLINK_CONFIG)
> .build());
> @Test
> void noChaining() throws Exception {
> var env =
> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);
> var datagen = new DataGeneratorSource<>(x -> x, Long.MAX_VALUE,
> RateLimiterStrategy.perSecond(1), Types.LONG);
> var reduce = env.fromSource(datagen, WatermarkStrategy.noWatermarks(),
> "source")
> .fullWindowPartition().reduce(Long::sum);
> reduce.print();
> env.execute();
> }
> @Test
> void withChaining() throws Exception {
> var env =
> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);
> var datagen = new DataGeneratorSource<>(x -> x, Long.MAX_VALUE,
> RateLimiterStrategy.perSecond(1), Types.LONG);
> var reduce = env.fromSource(datagen, WatermarkStrategy.noWatermarks(),
> "source")
> .fullWindowPartition().reduce(Long::sum);
> reduce.print();
> var m = SingleOutputStreamOperator.class
> .getDeclaredMethod("setChainingStrategy",
> ChainingStrategy.class);
> m.setAccessible(true);
> m.invoke(reduce, ChainingStrategy.ALWAYS);
> env.execute();
> }
> } {code}
> The first function "noChaining" generates a job graph like this:
> !image-2025-11-25-12-25-13-583.png!
> As you can see the operators are not chained.
> In the second function "withChaining" I use reflection to set the private
> field "chainingStrategy" in the PartitionReduceOperator. This produces the
> expected result with the operators being chained:
> !image-2025-11-25-12-34-28-789.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)