Jacob Jona Fahlenkamp created FLINK-38728:
---------------------------------------------
Summary: fullWindowPartition().reduce() is not being chained
Key: FLINK-38728
URL: https://issues.apache.org/jira/browse/FLINK-38728
Project: Flink
Issue Type: Improvement
Components: API / DataStream
Affects Versions: 1.20.3
Reporter: Jacob Jona Fahlenkamp
Attachments: image-2025-11-25-12-25-13-583.png,
image-2025-11-25-12-34-28-789.png
We were hoping to do local global aggregation, to avoid persisting a large
amount of data in a batch job. But this doesn't quite work because the reduce
operator is not chained. This is because it its chaining strategy is not
overriden. It uses instead the default HEAD strategy from its super class.
I think the solution would be as simple as adding this line in the constructor
of
[PartitionReduceOperator|https://github.com/apache/flink/blob/1cc2f147ddf0ffd4ce37be7c6f55355cf34fa907/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/PartitionReduceOperator.java]
{code:java}
public PartitionReduceOperator(ReduceFunction<IN> reduceFunction) {
super(reduceFunction);
this.reduceFunction = reduceFunction;
this.chainingStrategy = ChainingStrategy.ALWAYS; // add this line
}
{code}
Here is a simple test to demonstrate this behavior:
{code:java}
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
public class FlinkTest {
private static final Configuration FLINK_CONFIG = new
Configuration().set(RestOptions.PORT, 8081);
@RegisterExtension
public static final MiniClusterExtension FLINK_CLUSTER = new
MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.setConfiguration(FLINK_CONFIG)
.build());
@Test
void noChaining() throws Exception {
var env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);
var datagen = new DataGeneratorSource<>(x -> x, Long.MAX_VALUE,
RateLimiterStrategy.perSecond(1), Types.LONG);
var reduce = env.fromSource(datagen, WatermarkStrategy.noWatermarks(),
"source")
.fullWindowPartition().reduce(Long::sum);
reduce.print();
env.execute();
}
@Test
void withChaining() throws Exception {
var env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);
var datagen = new DataGeneratorSource<>(x -> x, Long.MAX_VALUE,
RateLimiterStrategy.perSecond(1), Types.LONG);
var reduce = env.fromSource(datagen, WatermarkStrategy.noWatermarks(),
"source")
.fullWindowPartition().reduce(Long::sum);
reduce.print();
var m = SingleOutputStreamOperator.class
.getDeclaredMethod("setChainingStrategy", ChainingStrategy.class);
m.setAccessible(true);
m.invoke(reduce, ChainingStrategy.ALWAYS);
env.execute();
}
} {code}
The first function "noChaining" generates a job graph like this:
!image-2025-11-25-12-25-13-583.png!
As you can see the operators are not chained.
In the second function "withChaining" I use reflection to set the private field
"chainingStrategy" in the PartitionReduceOperator. This produces the expected
result with the operators being chained:
!image-2025-11-25-12-34-28-789.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)