[
https://issues.apache.org/jira/browse/FLINK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Salva updated FLINK-39131:
--------------------------
Description:
As a DataStream API user, I would like to have a built-in primitive that allows
me to process multiple (N>2) inputs within a single operator, that is, going
beyond the current co-processors. This would not only dramatically improve the
user ergonomics when writing multi-way joins—saving the tedious of having to
manually union/multiplex all the inputs using a custom tuple—but also help
prevent the state-explosion issue recently handled in
[FLIP-516|https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator]
for SQL.
The Operator API already contains all the necessary building blocks, and some
users have reported successfully leveraging them, e.g., see the replies in this
[[thread|https://lists.apache.org/[email protected]:2024-12:salva]|https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator].
I'm personally doing this too, so I wonder whether something like this could be
officially supported—or at least documented.
The basic idea—providing something that feels like a
{{{}(Keyed)MultiProcessFunction{}}}—can be illustrated with the following test:
{code:java}
public class MultiInputITCase {
StreamExecutionEnvironment env;
DataStream<X> xs;
DataStream<Y> ys;
DataStream<Z> zs;
@Before
public void setup() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
xs = env.fromData(new X("a", 10), new X("b", 100));
ys = env.fromData(new Y("a", 20), new Y("b", 200));
zs = env.fromData(new Z("a", 30), new Z("b", 300));
}
@Test
public void testKeyedThreeWayJoin() throws Exception {
KeyedMultiInputOperatorBuilder<String, Out> builder =
new KeyedMultiInputOperatorBuilder<>(
env,
KeyedThreeInputOperator.class,
TypeInformation.of(Out.class),
Types.STRING
);
builder
.addInput(xs, X::getKey)
.addInput(ys, Y::getKey)
.addInput(zs, Z::getKey);
DataStream<Out> joined = builder.build("xyz-join");
TestListResultSink<Out> resultSink = new TestListResultSink<>();
joined.addSink(resultSink);
env.execute("Keyed Three-Way Join Test");
List<Out> result = resultSink.getResult();
result.sort(Comparator.comparing(Out::getId).thenComparing(Out::getSum));
assertEquals(2, result.size());
assertEquals(new Out("a", 60), result.get(0));
assertEquals(new Out("b", 600), result.get(1));
}
} {code}
with the user-defined processor looking like this:
{code:java}
/**
* Example of three-way keyed join:
* <p>
* X(id, x), Y(id, y), Z(id, z) -> Out(id, x + y + z)
* <p>
* Uses the KeyedMultiInputOperator3 base to get CoProcess-style handlers:
* processElement1(X value, Context ctx, Collector<Out> out)
* processElement2(Y value, Context ctx, Collector<Out> out)
* processElement3(Z value, Context ctx, Collector<Out> out)
*/
public class KeyedThreeInputOperator
extends KeyedMultiInputOperator3<X, Y, Z, Out> {
private transient ValueState<Integer> lastX;
private transient ValueState<Integer> lastY;
private transient ValueState<Integer> lastZ;
public KeyedThreeInputOperator(StreamOperatorParameters<Out> params) {
super(params);
}
@Override
public void open() throws Exception {
super.open();
var store = getKeyedStateStore()
.orElseThrow(() -> new IllegalStateException("MultiJoinOperator
requires keyed state"));
lastX = store.getState(new ValueStateDescriptor<>("x", Types.INT));
lastY = store.getState(new ValueStateDescriptor<>("y", Types.INT));
lastZ = store.getState(new ValueStateDescriptor<>("z", Types.INT));
}
//
// Input channel callbacks
//
@Override
protected void processElement1(X x, Context ctx, Collector<Out> out) throws
Exception {
lastX.update(x.getX());
join(ctx, out);
}
@Override
protected void processElement2(Y y, Context ctx, Collector<Out> out) throws
Exception {
lastY.update(y.getY());
join(ctx, out);
}
@Override
protected void processElement3(Z z, Context ctx, Collector<Out> out) throws
Exception {
lastZ.update(z.getZ());
join(ctx, out);
}
//
// Join logic (take the sum of the 3 current values for each key)
//
private void join(Context ctx, Collector<Out> out) throws Exception {
Integer a = lastX.value();
Integer b = lastY.value();
Integer c = lastZ.value();
if (a != null && b != null && c != null) {
String key = ctx.getCurrentKey(String.class);
out.collect(new Out(key, a + b + c));
}
}
} {code}
This is the simplest (N=3) case, but in my library I have generated up to N=25
(KeyedMultiInputOperator3...KeyedMultiInputOperator25) along the lines of
what's done for
[Tuples|https://github.com/apache/flink/tree/master/flink-core-api/src/main/java/org/apache/flink/api/java/tuple].
I can provide more details but that should be enough for the sake of
discussion/triaging.
was:
As a DataStream API user, I would like to have a built-in primitive that allows
me to process multiple (N>2) inputs within a single operator, that is, going
beyond the current co-processors. This would not only dramatically improve the
user ergonomics when writing multi-way joins—saving the tedious of having to
manually union/multiplex all the inputs using a custom tuple—but also help
prevent the state-explosion issue recently handled in
[FLIP-516|https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator]
for SQL.
The Operator API already contains all the necessary building blocks, and some
users have reported successfully leveraging them, e.g., see the replies in this
[thread|https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator].
I'm personally doing this too, so I wonder whether something like this could be
officially supported—or at least documented.
The basic idea—providing something that feels like a
{{{}(Keyed)MultiProcessFunction{}}}—can be illustrated with the following test:
{code:java}
public class MultiInputITCase {
StreamExecutionEnvironment env;
DataStream<X> xs;
DataStream<Y> ys;
DataStream<Z> zs;
@Before
public void setup() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
xs = env.fromData(new X("a", 10), new X("b", 100));
ys = env.fromData(new Y("a", 20), new Y("b", 200));
zs = env.fromData(new Z("a", 30), new Z("b", 300));
}
@Test
public void testKeyedThreeWayJoin() throws Exception {
KeyedMultiInputOperatorBuilder<String, Out> builder =
new KeyedMultiInputOperatorBuilder<>(
env,
KeyedThreeInputOperator.class,
TypeInformation.of(Out.class),
Types.STRING
);
builder
.addInput(xs, X::getKey)
.addInput(ys, Y::getKey)
.addInput(zs, Z::getKey);
DataStream<Out> joined = builder.build("xyz-join");
TestListResultSink<Out> resultSink = new TestListResultSink<>();
joined.addSink(resultSink);
env.execute("Keyed Three-Way Join Test");
List<Out> result = resultSink.getResult();
result.sort(Comparator.comparing(Out::getId).thenComparing(Out::getSum));
assertEquals(2, result.size());
assertEquals(new Out("a", 60), result.get(0));
assertEquals(new Out("b", 600), result.get(1));
}
} {code}
with the user-defined processor looking like this:
{code:java}
/**
* Example of three-way keyed join:
* <p>
* X(id, x), Y(id, y), Z(id, z) -> Out(id, x + y + z)
* <p>
* Uses the KeyedMultiInputOperator3 base to get CoProcess-style handlers:
* processElement1(X value, Context ctx, Collector<Out> out)
* processElement2(Y value, Context ctx, Collector<Out> out)
* processElement3(Z value, Context ctx, Collector<Out> out)
*/
public class KeyedThreeInputOperator
extends KeyedMultiInputOperator3<X, Y, Z, Out> {
private transient ValueState<Integer> lastX;
private transient ValueState<Integer> lastY;
private transient ValueState<Integer> lastZ;
public KeyedThreeInputOperator(StreamOperatorParameters<Out> params) {
super(params);
}
@Override
public void open() throws Exception {
super.open();
var store = getKeyedStateStore()
.orElseThrow(() -> new IllegalStateException("MultiJoinOperator
requires keyed state"));
lastX = store.getState(new ValueStateDescriptor<>("x", Types.INT));
lastY = store.getState(new ValueStateDescriptor<>("y", Types.INT));
lastZ = store.getState(new ValueStateDescriptor<>("z", Types.INT));
}
//
// Input channel callbacks
//
@Override
protected void processElement1(X x, Context ctx, Collector<Out> out) throws
Exception {
lastX.update(x.getX());
join(ctx, out);
}
@Override
protected void processElement2(Y y, Context ctx, Collector<Out> out) throws
Exception {
lastY.update(y.getY());
join(ctx, out);
}
@Override
protected void processElement3(Z z, Context ctx, Collector<Out> out) throws
Exception {
lastZ.update(z.getZ());
join(ctx, out);
}
//
// Join logic (take the sum of the 3 current values for each key)
//
private void join(Context ctx, Collector<Out> out) throws Exception {
Integer a = lastX.value();
Integer b = lastY.value();
Integer c = lastZ.value();
if (a != null && b != null && c != null) {
String key = ctx.getCurrentKey(String.class);
out.collect(new Out(key, a + b + c));
}
}
} {code}
This is the simplest (N=3) case, but in my library I have generated up to N=25
(KeyedMultiInputOperator3...KeyedMultiInputOperator25) along the lines of
what's done for
[Tuples|https://github.com/apache/flink/tree/master/flink-core-api/src/main/java/org/apache/flink/api/java/tuple].
I can provide more details but that should be enough for the sake of
discussion/triaging.
> Multi-Input Processors
> ----------------------
>
> Key: FLINK-39131
> URL: https://issues.apache.org/jira/browse/FLINK-39131
> Project: Flink
> Issue Type: New Feature
> Components: API / DataStream
> Reporter: Salva
> Priority: Major
>
> As a DataStream API user, I would like to have a built-in primitive that
> allows me to process multiple (N>2) inputs within a single operator, that is,
> going beyond the current co-processors. This would not only dramatically
> improve the user ergonomics when writing multi-way joins—saving the tedious
> of having to manually union/multiplex all the inputs using a custom tuple—but
> also help prevent the state-explosion issue recently handled in
> [FLIP-516|https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator]
> for SQL.
> The Operator API already contains all the necessary building blocks, and some
> users have reported successfully leveraging them, e.g., see the replies in
> this
> [[thread|https://lists.apache.org/[email protected]:2024-12:salva]|https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator].
> I'm personally doing this too, so I wonder whether something like this could
> be officially supported—or at least documented.
> The basic idea—providing something that feels like a
> {{{}(Keyed)MultiProcessFunction{}}}—can be illustrated with the following
> test:
> {code:java}
> public class MultiInputITCase {
> StreamExecutionEnvironment env;
> DataStream<X> xs;
> DataStream<Y> ys;
> DataStream<Z> zs;
> @Before
> public void setup() {
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> xs = env.fromData(new X("a", 10), new X("b", 100));
> ys = env.fromData(new Y("a", 20), new Y("b", 200));
> zs = env.fromData(new Z("a", 30), new Z("b", 300));
> }
> @Test
> public void testKeyedThreeWayJoin() throws Exception {
> KeyedMultiInputOperatorBuilder<String, Out> builder =
> new KeyedMultiInputOperatorBuilder<>(
> env,
> KeyedThreeInputOperator.class,
> TypeInformation.of(Out.class),
> Types.STRING
> );
> builder
> .addInput(xs, X::getKey)
> .addInput(ys, Y::getKey)
> .addInput(zs, Z::getKey);
> DataStream<Out> joined = builder.build("xyz-join");
> TestListResultSink<Out> resultSink = new TestListResultSink<>();
> joined.addSink(resultSink);
> env.execute("Keyed Three-Way Join Test");
> List<Out> result = resultSink.getResult();
> result.sort(Comparator.comparing(Out::getId).thenComparing(Out::getSum));
> assertEquals(2, result.size());
> assertEquals(new Out("a", 60), result.get(0));
> assertEquals(new Out("b", 600), result.get(1));
> }
> } {code}
> with the user-defined processor looking like this:
> {code:java}
> /**
> * Example of three-way keyed join:
> * <p>
> * X(id, x), Y(id, y), Z(id, z) -> Out(id, x + y + z)
> * <p>
> * Uses the KeyedMultiInputOperator3 base to get CoProcess-style handlers:
> * processElement1(X value, Context ctx, Collector<Out> out)
> * processElement2(Y value, Context ctx, Collector<Out> out)
> * processElement3(Z value, Context ctx, Collector<Out> out)
> */
> public class KeyedThreeInputOperator
> extends KeyedMultiInputOperator3<X, Y, Z, Out> {
> private transient ValueState<Integer> lastX;
> private transient ValueState<Integer> lastY;
> private transient ValueState<Integer> lastZ;
> public KeyedThreeInputOperator(StreamOperatorParameters<Out> params) {
> super(params);
> }
> @Override
> public void open() throws Exception {
> super.open();
> var store = getKeyedStateStore()
> .orElseThrow(() -> new IllegalStateException("MultiJoinOperator
> requires keyed state"));
> lastX = store.getState(new ValueStateDescriptor<>("x", Types.INT));
> lastY = store.getState(new ValueStateDescriptor<>("y", Types.INT));
> lastZ = store.getState(new ValueStateDescriptor<>("z", Types.INT));
> }
> //
> // Input channel callbacks
> //
> @Override
> protected void processElement1(X x, Context ctx, Collector<Out> out) throws
> Exception {
> lastX.update(x.getX());
> join(ctx, out);
> }
> @Override
> protected void processElement2(Y y, Context ctx, Collector<Out> out) throws
> Exception {
> lastY.update(y.getY());
> join(ctx, out);
> }
> @Override
> protected void processElement3(Z z, Context ctx, Collector<Out> out) throws
> Exception {
> lastZ.update(z.getZ());
> join(ctx, out);
> }
> //
> // Join logic (take the sum of the 3 current values for each key)
> //
> private void join(Context ctx, Collector<Out> out) throws Exception {
> Integer a = lastX.value();
> Integer b = lastY.value();
> Integer c = lastZ.value();
> if (a != null && b != null && c != null) {
> String key = ctx.getCurrentKey(String.class);
> out.collect(new Out(key, a + b + c));
> }
> }
> } {code}
> This is the simplest (N=3) case, but in my library I have generated up to
> N=25 (KeyedMultiInputOperator3...KeyedMultiInputOperator25) along the lines
> of what's done for
> [Tuples|https://github.com/apache/flink/tree/master/flink-core-api/src/main/java/org/apache/flink/api/java/tuple].
> I can provide more details but that should be enough for the sake of
> discussion/triaging.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)