[ 
https://issues.apache.org/jira/browse/FLINK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Salva updated FLINK-39131:
--------------------------
    Description: 
As a DataStream API user, I would like to have a built-in primitive that allows 
me to process multiple (more than 2) inputs within a single operator. This 
would make it easy to write multi-way joins, preventing the state-explosion 
issue recently handled in 
[FLIP-516|https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator]
 for SQL.

The Operator API already contains all the necessary building blocks, and some 
users have reported successfully leveraging them. See, the replies in this 
[thread|https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator]
 for example.

I'm personally doing this too, so I wonder whether something like this could be 
officially supported out of the box.

The basic idea—providing something that feels like a 
{{{}(Keyed)MultiProcessFunction{}}}— can be illustrated with the following test:
{code:java}
public class MultiInputITCase {
  StreamExecutionEnvironment env;
  DataStream<X> xs;
  DataStream<Y> ys;
  DataStream<Z> zs;

  @Before
  public void setup() {
    env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(2);

    xs = env.fromData(new X("a", 10), new X("b", 100));
    ys = env.fromData(new Y("a", 20), new Y("b", 200));
    zs = env.fromData(new Z("a", 30), new Z("b", 300));
  }

  @Test
  public void testKeyedThreeWayJoin() throws Exception {
    KeyedMultiInputOperatorBuilder<String, Out> builder =
        new KeyedMultiInputOperatorBuilder<>(
            env,
            KeyedThreeInputOperator.class,
            TypeInformation.of(Out.class),
            Types.STRING
        );

    builder
        .addInput(xs, X::getKey)
        .addInput(ys, Y::getKey)
        .addInput(zs, Z::getKey);

    DataStream<Out> joined = builder.build("xyz-join");

    TestListResultSink<Out> resultSink = new TestListResultSink<>();
    joined.addSink(resultSink);

    env.execute("Keyed Three-Way Join Test");

    List<Out> result = resultSink.getResult();
    result.sort(Comparator.comparing(Out::getId).thenComparing(Out::getSum));

    assertEquals(2, result.size());
    assertEquals(new Out("a", 60), result.get(0));
    assertEquals(new Out("b", 600), result.get(1));
  }
} {code}
with the user-defined processor looking like this:
{code:java}
/**
 * Example of three-way keyed join:
 * <p>
 * X(id, x), Y(id, y), Z(id, z)  ->  Out(id, x + y + z)
 * <p>
 * Uses the KeyedMultiInputOperator3 base to get CoProcess-style handlers:
 * processElement1(X value, Context ctx, Collector<Out> out)
 * processElement2(Y value, Context ctx, Collector<Out> out)
 * processElement3(Z value, Context ctx, Collector<Out> out)
 */
public class KeyedThreeInputOperator
    extends KeyedMultiInputOperator3<X, Y, Z, Out> {

  private transient ValueState<Integer> lastX;
  private transient ValueState<Integer> lastY;
  private transient ValueState<Integer> lastZ;

  public KeyedThreeInputOperator(StreamOperatorParameters<Out> params) {
    super(params);
  }

  @Override
  public void open() throws Exception {
    super.open();

    var store = getKeyedStateStore()
        .orElseThrow(() -> new IllegalStateException("MultiJoinOperator 
requires keyed state"));

    lastX = store.getState(new ValueStateDescriptor<>("x", Types.INT));
    lastY = store.getState(new ValueStateDescriptor<>("y", Types.INT));
    lastZ = store.getState(new ValueStateDescriptor<>("z", Types.INT));
  }

  //
  // Input channel callbacks
  //

  @Override
  protected void processElement1(X x, Context ctx, Collector<Out> out) throws 
Exception {
    lastX.update(x.getX());
    join(ctx, out);
  }

  @Override
  protected void processElement2(Y y, Context ctx, Collector<Out> out) throws 
Exception {
    lastY.update(y.getY());
    join(ctx, out);
  }

  @Override
  protected void processElement3(Z z, Context ctx, Collector<Out> out) throws 
Exception {
    lastZ.update(z.getZ());
    join(ctx, out);
  }

  //
  // Join logic (take the sum of the 3 current values for each key)
  //

  private void join(Context ctx, Collector<Out> out) throws Exception {
    Integer a = lastX.value();
    Integer b = lastY.value();
    Integer c = lastZ.value();

    if (a != null && b != null && c != null) {
      String key = ctx.getCurrentKey(String.class);
      out.collect(new Out(key, a + b + c));
    }
  }
} {code}
I can provide more details but that should be enough for the sake of suggestion.

  was:
As a DataStream API user, I would like to have a built-in primitive that allows 
me to process multiple (more than 2) inputs within a single operator. This 
would make it easy to write multi-way joins, preventing the state-explosion 
issue recently handled in 
[FLIP-516|https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator]
 at the SQL level.

The Operator API already contains all the necessary building blocks, and some 
users have reported successfully leveraging them. See, the replies in this 
[thread|https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator]
 for example.

I'm personally doing this too, so I wonder whether something like this could be 
officially supported out of the box.

A test for that would look like this:

```
{code:java}
public class MultiInputITCase {
  StreamExecutionEnvironment env;
  DataStream<X> xs;
  DataStream<Y> ys;
  DataStream<Z> zs;

  @Before
  public void setup() {
    env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(2);

    xs = env.fromData(new X("a", 10), new X("b", 100));
    ys = env.fromData(new Y("a", 20), new Y("b", 200));
    zs = env.fromData(new Z("a", 30), new Z("b", 300));
  }

  @Test
  public void testKeyedThreeWayJoin() throws Exception {
    KeyedMultiInputOperatorBuilder<String, Out> builder =
        new KeyedMultiInputOperatorBuilder<>(
            env,
            KeyedThreeInputOperator.class,
            TypeInformation.of(Out.class),
            Types.STRING
        );

    builder
        .addInput(xs, X::getKey)
        .addInput(ys, Y::getKey)
        .addInput(zs, Z::getKey);

    DataStream<Out> joined = builder.build("xyz-join");

    TestListResultSink<Out> resultSink = new TestListResultSink<>();
    joined.addSink(resultSink);

    env.execute("Keyed Three-Way Join Test");

    List<Out> result = resultSink.getResult();
    result.sort(Comparator.comparing(Out::getId).thenComparing(Out::getSum));

    assertEquals(2, result.size());
    assertEquals(new Out("a", 60), result.get(0));
    assertEquals(new Out("b", 600), result.get(1));
  }
} {code}
with the user-defined processor looking like this:
{code:java}
/**
 * Example of three-way keyed join:
 * <p>
 * X(id, x), Y(id, y), Z(id, z)  ->  Out(id, x + y + z)
 * <p>
 * Uses the KeyedMultiInputOperator3 base to get CoProcess-style handlers:
 * processElement1(X value, Context ctx, Collector<Out> out)
 * processElement2(Y value, Context ctx, Collector<Out> out)
 * processElement3(Z value, Context ctx, Collector<Out> out)
 */
public class KeyedThreeInputOperator
    extends KeyedMultiInputOperator3<X, Y, Z, Out> {

  private transient ValueState<Integer> lastX;
  private transient ValueState<Integer> lastY;
  private transient ValueState<Integer> lastZ;

  public KeyedThreeInputOperator(StreamOperatorParameters<Out> params) {
    super(params);
  }

  @Override
  public void open() throws Exception {
    super.open();

    var store = getKeyedStateStore()
        .orElseThrow(() -> new IllegalStateException("MultiJoinOperator 
requires keyed state"));

    lastX = store.getState(new ValueStateDescriptor<>("x", Types.INT));
    lastY = store.getState(new ValueStateDescriptor<>("y", Types.INT));
    lastZ = store.getState(new ValueStateDescriptor<>("z", Types.INT));
  }

  //
  // Input channel callbacks
  //

  @Override
  protected void processElement1(X x, Context ctx, Collector<Out> out) throws 
Exception {
    lastX.update(x.getX());
    join(ctx, out);
  }

  @Override
  protected void processElement2(Y y, Context ctx, Collector<Out> out) throws 
Exception {
    lastY.update(y.getY());
    join(ctx, out);
  }

  @Override
  protected void processElement3(Z z, Context ctx, Collector<Out> out) throws 
Exception {
    lastZ.update(z.getZ());
    join(ctx, out);
  }

  //
  // Join logic (take the sum of the 3 current values for each key)
  //

  private void join(Context ctx, Collector<Out> out) throws Exception {
    Integer a = lastX.value();
    Integer b = lastY.value();
    Integer c = lastZ.value();

    if (a != null && b != null && c != null) {
      String key = ctx.getCurrentKey(String.class);
      out.collect(new Out(key, a + b + c));
    }
  }
} {code}
I can provide more details but that should be enough for the sake of suggestion.


> Multi-Input Processors
> ----------------------
>
>                 Key: FLINK-39131
>                 URL: https://issues.apache.org/jira/browse/FLINK-39131
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / DataStream
>            Reporter: Salva
>            Priority: Major
>
> As a DataStream API user, I would like to have a built-in primitive that 
> allows me to process multiple (more than 2) inputs within a single operator. 
> This would make it easy to write multi-way joins, preventing the 
> state-explosion issue recently handled in 
> [FLIP-516|https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator]
>  for SQL.
> The Operator API already contains all the necessary building blocks, and some 
> users have reported successfully leveraging them. See, the replies in this 
> [thread|https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator]
>  for example.
> I'm personally doing this too, so I wonder whether something like this could 
> be officially supported out of the box.
> The basic idea—providing something that feels like a 
> {{{}(Keyed)MultiProcessFunction{}}}— can be illustrated with the following 
> test:
> {code:java}
> public class MultiInputITCase {
>   StreamExecutionEnvironment env;
>   DataStream<X> xs;
>   DataStream<Y> ys;
>   DataStream<Z> zs;
>   @Before
>   public void setup() {
>     env = StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(2);
>     xs = env.fromData(new X("a", 10), new X("b", 100));
>     ys = env.fromData(new Y("a", 20), new Y("b", 200));
>     zs = env.fromData(new Z("a", 30), new Z("b", 300));
>   }
>   @Test
>   public void testKeyedThreeWayJoin() throws Exception {
>     KeyedMultiInputOperatorBuilder<String, Out> builder =
>         new KeyedMultiInputOperatorBuilder<>(
>             env,
>             KeyedThreeInputOperator.class,
>             TypeInformation.of(Out.class),
>             Types.STRING
>         );
>     builder
>         .addInput(xs, X::getKey)
>         .addInput(ys, Y::getKey)
>         .addInput(zs, Z::getKey);
>     DataStream<Out> joined = builder.build("xyz-join");
>     TestListResultSink<Out> resultSink = new TestListResultSink<>();
>     joined.addSink(resultSink);
>     env.execute("Keyed Three-Way Join Test");
>     List<Out> result = resultSink.getResult();
>     result.sort(Comparator.comparing(Out::getId).thenComparing(Out::getSum));
>     assertEquals(2, result.size());
>     assertEquals(new Out("a", 60), result.get(0));
>     assertEquals(new Out("b", 600), result.get(1));
>   }
> } {code}
> with the user-defined processor looking like this:
> {code:java}
> /**
>  * Example of three-way keyed join:
>  * <p>
>  * X(id, x), Y(id, y), Z(id, z)  ->  Out(id, x + y + z)
>  * <p>
>  * Uses the KeyedMultiInputOperator3 base to get CoProcess-style handlers:
>  * processElement1(X value, Context ctx, Collector<Out> out)
>  * processElement2(Y value, Context ctx, Collector<Out> out)
>  * processElement3(Z value, Context ctx, Collector<Out> out)
>  */
> public class KeyedThreeInputOperator
>     extends KeyedMultiInputOperator3<X, Y, Z, Out> {
>   private transient ValueState<Integer> lastX;
>   private transient ValueState<Integer> lastY;
>   private transient ValueState<Integer> lastZ;
>   public KeyedThreeInputOperator(StreamOperatorParameters<Out> params) {
>     super(params);
>   }
>   @Override
>   public void open() throws Exception {
>     super.open();
>     var store = getKeyedStateStore()
>         .orElseThrow(() -> new IllegalStateException("MultiJoinOperator 
> requires keyed state"));
>     lastX = store.getState(new ValueStateDescriptor<>("x", Types.INT));
>     lastY = store.getState(new ValueStateDescriptor<>("y", Types.INT));
>     lastZ = store.getState(new ValueStateDescriptor<>("z", Types.INT));
>   }
>   //
>   // Input channel callbacks
>   //
>   @Override
>   protected void processElement1(X x, Context ctx, Collector<Out> out) throws 
> Exception {
>     lastX.update(x.getX());
>     join(ctx, out);
>   }
>   @Override
>   protected void processElement2(Y y, Context ctx, Collector<Out> out) throws 
> Exception {
>     lastY.update(y.getY());
>     join(ctx, out);
>   }
>   @Override
>   protected void processElement3(Z z, Context ctx, Collector<Out> out) throws 
> Exception {
>     lastZ.update(z.getZ());
>     join(ctx, out);
>   }
>   //
>   // Join logic (take the sum of the 3 current values for each key)
>   //
>   private void join(Context ctx, Collector<Out> out) throws Exception {
>     Integer a = lastX.value();
>     Integer b = lastY.value();
>     Integer c = lastZ.value();
>     if (a != null && b != null && c != null) {
>       String key = ctx.getCurrentKey(String.class);
>       out.collect(new Out(key, a + b + c));
>     }
>   }
> } {code}
> I can provide more details but that should be enough for the sake of 
> suggestion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to