Jacob Jona Fahlenkamp created FLINK-38728:
---------------------------------------------

             Summary: fullWindowPartition().reduce() is not being chained
                 Key: FLINK-38728
                 URL: https://issues.apache.org/jira/browse/FLINK-38728
             Project: Flink
          Issue Type: Improvement
          Components: API / DataStream
    Affects Versions: 1.20.3
            Reporter: Jacob Jona Fahlenkamp
         Attachments: image-2025-11-25-12-25-13-583.png, 
image-2025-11-25-12-34-28-789.png

We were hoping to do local global aggregation, to avoid persisting a large 
amount of data in a batch job. But this doesn't quite work because the reduce 
operator is not chained. This is because it its chaining strategy is not 
overriden. It uses instead the default HEAD strategy from its super class.

I think the solution would be as simple as adding this line in the constructor 
of 
[PartitionReduceOperator|https://github.com/apache/flink/blob/1cc2f147ddf0ffd4ce37be7c6f55355cf34fa907/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/PartitionReduceOperator.java]
{code:java}
public PartitionReduceOperator(ReduceFunction<IN> reduceFunction) {
    super(reduceFunction);
    this.reduceFunction = reduceFunction;
    this.chainingStrategy = ChainingStrategy.ALWAYS; // add this line
}
{code}
Here is a simple test to demonstrate this behavior:
{code:java}
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

public class FlinkTest {
    private static final Configuration FLINK_CONFIG = new 
Configuration().set(RestOptions.PORT, 8081);
    @RegisterExtension
    public static final MiniClusterExtension FLINK_CLUSTER = new 
MiniClusterExtension(
          new MiniClusterResourceConfiguration.Builder()
                .setNumberSlotsPerTaskManager(2)
                .setNumberTaskManagers(1)
                .setConfiguration(FLINK_CONFIG)
                .build());

    @Test
    void noChaining() throws Exception {
       var env = 
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);
       var datagen = new DataGeneratorSource<>(x -> x, Long.MAX_VALUE, 
RateLimiterStrategy.perSecond(1), Types.LONG);
       var reduce = env.fromSource(datagen, WatermarkStrategy.noWatermarks(), 
"source")
                       .fullWindowPartition().reduce(Long::sum);
       reduce.print();
       env.execute();
    }

    @Test
    void withChaining() throws Exception {
       var env = 
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);
       var datagen = new DataGeneratorSource<>(x -> x, Long.MAX_VALUE, 
RateLimiterStrategy.perSecond(1), Types.LONG);
       var reduce = env.fromSource(datagen, WatermarkStrategy.noWatermarks(), 
"source")
                       .fullWindowPartition().reduce(Long::sum);
       reduce.print();
       var m = SingleOutputStreamOperator.class
             .getDeclaredMethod("setChainingStrategy", ChainingStrategy.class);
       m.setAccessible(true);
       m.invoke(reduce, ChainingStrategy.ALWAYS);
       env.execute();
    }
} {code}
The first function "noChaining" generates a job graph like this:

!image-2025-11-25-12-25-13-583.png!

As you can see the operators are not chained.

In the second function "withChaining" I use reflection to set the private field 
"chainingStrategy" in the PartitionReduceOperator. This produces the expected 
result with the operators being chained:

!image-2025-11-25-12-34-28-789.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to