[
https://issues.apache.org/jira/browse/FLINK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18060735#comment-18060735
]
Salva edited comment on FLINK-39131 at 2/24/26 4:57 PM:
--------------------------------------------------------
An alternative would be to support side inputs as described in
[FLIP-17.|https://cwiki.apache.org/confluence/display/FLINK/FLIP-17%3A+Side+Inputs+for+DataStream+API]
was (Author: JIRAUSER287051):
An alternative would be to support side inputs as described in
[FLIP-17|https://cwiki.apache.org/confluence/display/FLINK/FLIP-17%3A+Side+Inputs+for+DataStream+API].
> Multi-Input Processors
> ----------------------
>
> Key: FLINK-39131
> URL: https://issues.apache.org/jira/browse/FLINK-39131
> Project: Flink
> Issue Type: New Feature
> Components: API / DataStream
> Reporter: Salva
> Priority: Major
>
> As a DataStream API user, I would like to have a built-in primitive that
> allows me to process multiple (N>2) inputs within a single operator, that is,
> going beyond the current co-processors. This would not only dramatically
> improve the user ergonomics when writing multi-way joins—saving the tedious
> of having to manually union/multiplex all the inputs using a custom tuple—but
> also help prevent the state-explosion issue recently handled in
> [FLIP-516|https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator]
> for SQL.
> The Operator API already contains all the necessary building blocks, and some
> users have reported successfully leveraging them, e.g., see the replies in
> this
> [thread|https://lists.apache.org/thread/mt4pb9z55d4p5m0wdwrsoxnpcc5hon0n].
> I'm personally doing this too, so I wonder whether something like this could
> be officially supported—or at least documented.
> The basic idea—providing something that feels like a
> {{{}(Keyed)MultiProcessFunction{}}}—can be illustrated with the following
> test:
> {code:java}
> public class MultiInputITCase {
> StreamExecutionEnvironment env;
> DataStream<X> xs;
> DataStream<Y> ys;
> DataStream<Z> zs;
> @Before
> public void setup() {
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> xs = env.fromData(new X("a", 10), new X("b", 100));
> ys = env.fromData(new Y("a", 20), new Y("b", 200));
> zs = env.fromData(new Z("a", 30), new Z("b", 300));
> }
> @Test
> public void testKeyedThreeWayJoin() throws Exception {
> KeyedMultiInputOperatorBuilder<String, Out> builder =
> new KeyedMultiInputOperatorBuilder<>(
> env,
> KeyedThreeInputOperator.class,
> TypeInformation.of(Out.class),
> Types.STRING
> );
> builder
> .addInput(xs, X::getKey)
> .addInput(ys, Y::getKey)
> .addInput(zs, Z::getKey);
> DataStream<Out> joined = builder.build("xyz-join");
> TestListResultSink<Out> resultSink = new TestListResultSink<>();
> joined.addSink(resultSink);
> env.execute("Keyed Three-Way Join Test");
> List<Out> result = resultSink.getResult();
> result.sort(Comparator.comparing(Out::getId).thenComparing(Out::getSum));
> assertEquals(2, result.size());
> assertEquals(new Out("a", 60), result.get(0));
> assertEquals(new Out("b", 600), result.get(1));
> }
> } {code}
> with the user-defined processor looking like this:
> {code:java}
> /**
> * Example of three-way keyed join:
> * <p>
> * X(id, x), Y(id, y), Z(id, z) -> Out(id, x + y + z)
> * <p>
> * Uses the KeyedMultiInputOperator3 base to get CoProcess-style handlers:
> * processElement1(X value, Context ctx, Collector<Out> out)
> * processElement2(Y value, Context ctx, Collector<Out> out)
> * processElement3(Z value, Context ctx, Collector<Out> out)
> */
> public class KeyedThreeInputOperator
> extends KeyedMultiInputOperator3<X, Y, Z, Out> {
> private transient ValueState<Integer> lastX;
> private transient ValueState<Integer> lastY;
> private transient ValueState<Integer> lastZ;
> public KeyedThreeInputOperator(StreamOperatorParameters<Out> params) {
> super(params);
> }
> @Override
> public void open() throws Exception {
> super.open();
> var store = getKeyedStateStore()
> .orElseThrow(() -> new IllegalStateException("MultiJoinOperator
> requires keyed state"));
> lastX = store.getState(new ValueStateDescriptor<>("x", Types.INT));
> lastY = store.getState(new ValueStateDescriptor<>("y", Types.INT));
> lastZ = store.getState(new ValueStateDescriptor<>("z", Types.INT));
> }
> //
> // Input channel callbacks
> //
> @Override
> protected void processElement1(X x, Context ctx, Collector<Out> out) throws
> Exception {
> lastX.update(x.getX());
> join(ctx, out);
> }
> @Override
> protected void processElement2(Y y, Context ctx, Collector<Out> out) throws
> Exception {
> lastY.update(y.getY());
> join(ctx, out);
> }
> @Override
> protected void processElement3(Z z, Context ctx, Collector<Out> out) throws
> Exception {
> lastZ.update(z.getZ());
> join(ctx, out);
> }
> //
> // Join logic (take the sum of the 3 current values for each key)
> //
> private void join(Context ctx, Collector<Out> out) throws Exception {
> Integer a = lastX.value();
> Integer b = lastY.value();
> Integer c = lastZ.value();
> if (a != null && b != null && c != null) {
> String key = ctx.getCurrentKey(String.class);
> out.collect(new Out(key, a + b + c));
> }
> }
> } {code}
> This is the simplest (N=3) case, but in my library I have generated up to
> N=25 (KeyedMultiInputOperator3...KeyedMultiInputOperator25) along the lines
> of what's done for
> [Tuples|https://github.com/apache/flink/tree/master/flink-core-api/src/main/java/org/apache/flink/api/java/tuple].
> I can provide more details but that should be enough for the sake of
> discussion/triaging.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)