This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 86ee4b58e38 [FLINK-37763][table] Support multiple table arguments in 
PTFs
86ee4b58e38 is described below

commit 86ee4b58e3878335d15aeb425025dd8b05973823
Author: Timo Walther <[email protected]>
AuthorDate: Wed May 28 21:43:08 2025 +0200

    [FLINK-37763][table] Support multiple table arguments in PTFs
    
    This closes #26600.
---
 docs/content.zh/docs/dev/table/functions/ptfs.md   | 163 +++++++++++++++------
 docs/content/docs/dev/table/functions/ptfs.md      | 163 +++++++++++++++------
 .../generated/optimizer_config_configuration.html  |   6 +
 .../KeyedMultipleInputTransformation.java          |  12 ++
 .../table/api/config/OptimizerConfigOptions.java   |  11 ++
 .../resolver/rules/ResolveCallByArgumentsRule.java |   5 -
 .../flink/table/annotation/ArgumentTrait.java      |   6 +-
 .../flink/table/functions/ChangelogFunction.java   |   3 +-
 .../flink/table/functions/TableSemantics.java      |  10 --
 .../table/types/inference/SystemTypeInference.java | 137 ++++++++++++-----
 .../inference/CallBindingCallContext.java          |   5 -
 .../inference/OperatorBindingCallContext.java      |   5 -
 .../planner/plan/QueryOperationConverter.java      |  11 +-
 .../stream/StreamExecProcessTableFunction.java     |  99 ++++++++-----
 .../plan/nodes/exec/utils/ExecNodeUtil.java        |  30 ++++
 .../stream/StreamPhysicalProcessTableFunction.java |  62 ++++++--
 .../codegen/ProcessTableRunnerGenerator.scala      |   7 +-
 .../plan/metadata/FlinkRelMdUniqueKeys.scala       |   3 +-
 .../FlinkChangelogModeInferenceProgram.scala       |  13 --
 .../stream/ProcessTableFunctionSemanticTests.java  |   5 +-
 .../stream/ProcessTableFunctionTestPrograms.java   | 109 ++++++++++++++
 .../exec/stream/ProcessTableFunctionTestUtils.java | 142 ++++++++++++++++--
 .../plan/stream/sql/ProcessTableFunctionTest.java  | 105 +++++++++++--
 .../runtime/generated/ProcessTableRunner.java      |  12 +-
 ...ator.java => AbstractProcessTableOperator.java} |  73 ++++-----
 .../operators/process/PassAllCollector.java        |  10 +-
 .../process/PassPartitionKeysCollector.java        |  24 ++-
 .../process/PassThroughCollectorBase.java          |  22 ++-
 .../operators/process/ProcessRowTableOperator.java |  95 ++++++++++++
 .../operators/process/ProcessSetTableOperator.java |  82 +++++++++++
 .../process/ProcessTableOperatorFactory.java       |  45 +++---
 .../runtime/operators/process/RepeatedRowData.java | 145 ++++++++++++++++++
 .../operators/process/RuntimeTableSemantics.java   |   6 -
 33 files changed, 1290 insertions(+), 336 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/functions/ptfs.md 
b/docs/content.zh/docs/dev/table/functions/ptfs.md
index 104274b50ce..f3c94960533 100644
--- a/docs/content.zh/docs/dev/table/functions/ptfs.md
+++ b/docs/content.zh/docs/dev/table/functions/ptfs.md
@@ -1015,6 +1015,107 @@ not needed anymore via `Context#clearAllTimers()` or 
`TimeContext#clearTimer(Str
 
 {{< top >}}
 
+Multiple Tables
+---------------
+
+A PTF can process multiple tables simultaneously. This enables a variety of 
use cases, including:
+
+- Implementing **custom joins** that efficiently manage state.
+- Enriching the main table with information from dimension tables as **side 
inputs**.
+- Sending **control events** to the keyed virtual processor during runtime.
+
+The `eval()` method can specify multiple table arguments to support multiple 
inputs. All table arguments must be declared
+with set semantics and use consistent partitioning. In other words, the number 
of columns and their data types in the
+`PARTITION BY` clause must match across all involved table arguments.
+
+Rows from either input are passed to the function one at a time. Thus, only 
one table argument is non-null at a time. Use
+null checks to determine which input is currently being processed.
+
+{{< hint warning >}}
+The system decides which input row is streamed through the virtual processor 
next. If not handled properly in the PTF,
+this can lead to race conditions between inputs and, consequently, to 
non-deterministic results. It is recommended to
+design the function in such a way that the join is either time-based (i.e., 
waiting for all rows to arrive up to a given
+watermark) or condition-based, where the PTF buffers one or more input rows 
until a specific condition is met.
+{{< /hint >}}
+
+### Example: Custom Join
+
+The following example illustrates how to implement a custom join between two 
tables:
+
+{{< tabs "2137eeed-3d13-455c-8e2f-5e164da9f844" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment env = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+env.executeSql("CREATE VIEW Visits(name) AS VALUES ('Bob'), ('Alice'), 
('Bob')");
+env.executeSql("CREATE VIEW Purchases(customer, item) AS VALUES ('Alice', 
'milk')");
+
+env.createFunction("Greeting", GreetingWithLastPurchase.class);
+
+env
+  .executeSql("SELECT * FROM Greeting(TABLE Visits PARTITION BY name, TABLE 
Purchases PARTITION BY customer)")
+  .print();
+
+// --------------------
+// Function declaration
+// --------------------
+
+// Function that greets a customer and suggests the last purchase made, if 
available.
+public static class GreetingWithLastPurchase extends 
ProcessTableFunction<String> {
+
+  // Keep the last purchased item in state
+  public static class LastItemState {
+    public String lastItem;
+  }
+
+  // The eval() method takes two @ArgumentHint(TABLE_AS_SET) arguments
+  public void eval(
+      @StateHint LastItemState state,
+      @ArgumentHint(TABLE_AS_SET) Row visit,
+      @ArgumentHint(TABLE_AS_SET) Row purchase) {
+
+    // Process row from table Purchases
+    if (purchase != null) {
+      state.lastItem = purchase.getFieldAs("item");
+    }
+
+    // Process row from table Visits
+    else if (visit != null) {
+      if (state.lastItem == null) {
+        collect("Hello " + visit.getFieldAs("name") + ", let me know if I can 
help!");
+      } else {
+        collect("Hello " + visit.getFieldAs("name") + ", here to buy " + 
state.lastItem + " again?");
+      }
+    }
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The result will look similar to:
+
+```text
++----+--------------------------------+--------------------------------+--------------------------------+
+| op |                           name |                       customer |       
                  EXPR$0 |
++----+--------------------------------+--------------------------------+--------------------------------+
+| +I |                            Bob |                            Bob | Hello 
Bob, let me know if I... |
+| +I |                          Alice |                          Alice | Hello 
Alice, here to buy Pr... |
+| +I |                            Bob |                            Bob | Hello 
Bob, let me know if I... |
++----+--------------------------------+--------------------------------+--------------------------------+
+```
+
+### Efficiency and Design Principles
+
+A high number of input tables can negatively impact a single TaskManager or 
subtask. Network buffers must be allocated
+for each input, resulting in increased memory consumption which is why the 
number of table arguments is limited to a
+maximum of 20 tables.
+
+Unevenly distributed keys may overload a single virtual processor, leading to 
backpressure. It is important to select
+appropriate partition keys.
+
+{{< top >}}
+
 Query Evolution with UIDs
 -------------------------
 
@@ -1061,7 +1162,8 @@ END;
 
 {{< top >}}
 
-## Pass-Through Columns
+Pass-Through Columns
+--------------------
 
 Depending on the table semantics and whether an `on_time` argument has been 
defined, the system adds addition columns for
 every function output.
@@ -1089,7 +1191,7 @@ With pass-through columns: | k | v | c1 | c2 |
 
 This allows the PTF to focus on the main aggregation without the need to 
manually forward input columns.
 
-*Note*: Timers are not available when pass-through columns are enabled.
+*Note*: Pass-through columns are only available for append-only PTFs taking a 
single table argument and don't use timers.
 
 {{< top >}}
 
@@ -1610,9 +1712,6 @@ while the PTF exists once in the pipeline.
 The following example shows how a PTF can be used for joining. Additionally, 
it also showcases how a PTF can be used as
 a data generator for creating bounded tables with dummy data.
 
-Because PTFs don't support multiple table arguments yet, we use `unionAll` to 
for passing multiple partitioned tables
-into the PTF. Because a union requires a unified schema, the data generators 
transform the data into a `UnifiedEvent`.
-
 {{< tabs "1637eeed-3d13-455c-8e2f-5e164da9f844" >}}
 {{< tab "Java" >}}
 ```java
@@ -1625,11 +1724,11 @@ TableEnvironment env = 
TableEnvironment.create(EnvironmentSettings.inStreamingMo
 Table orders = env.fromCall(OrderGenerator.class);
 Table payments = env.fromCall(PaymentGenerator.class);
 
-// Union orders and payments before
-// partitioning and passing them into the Joiner function
-Table joined = orders.unionAll(payments)
-  .partitionBy($("orderId"))
-  .process(Joiner.class);
+// Partition orders and payments and pass them into the Joiner function
+Table joined = env.fromCall(
+  Joiner.class,
+  orders.partitionBy($("id")).asArgument("order"),
+  payments.partitionBy($("orderId")).asArgument("payment"));
 
 joined.execute().print();
 
@@ -1637,24 +1736,8 @@ joined.execute().print();
 // Data Generation
 // ---------------------------
 
-// A unified event for all input tables.
-// One of the sides is al empty.
-public static class UnifiedEvent {
-  public int orderId;
-  public Order order;
-  public Payment payment;
-
-  public static UnifiedEvent of(int orderId, Order order, Payment payment) {
-    UnifiedEvent unifiedEvent = new UnifiedEvent();
-    unifiedEvent.orderId = orderId;
-    unifiedEvent.order = order;
-    unifiedEvent.payment = payment;
-    return unifiedEvent;
-  }
-}
-
 // A PTF that generates Orders
-public static class OrderGenerator extends ProcessTableFunction<UnifiedEvent> {
+public static class OrderGenerator extends ProcessTableFunction<Order> {
   public void eval() {
     Stream.of(
       Order.of("Bob", 1000001, 23.46, "USD"),
@@ -1662,13 +1745,12 @@ public static class OrderGenerator extends 
ProcessTableFunction<UnifiedEvent> {
       Order.of("Alice", 1000601, 0.79, "EUR"),
       Order.of("Charly", 1000703, 100.60, "EUR")
     )
-    .map(order -> UnifiedEvent.of(order.id, order, null))
     .forEach(this::collect);
   }
 }
 
 // A PTF that generates Payments
-public static class PaymentGenerator extends 
ProcessTableFunction<UnifiedEvent> {
+public static class PaymentGenerator extends ProcessTableFunction<Payment> {
   public void eval() {
     Stream.of(
       Payment.of(999997870, 1000001),
@@ -1676,7 +1758,6 @@ public static class PaymentGenerator extends 
ProcessTableFunction<UnifiedEvent>
       Payment.of(999993331, 1000021),
       Payment.of(999994111, 1000601)
     )
-    .map(payment -> UnifiedEvent.of(payment.orderId, null, payment))
     .forEach(this::collect);
   }
 }
@@ -1714,8 +1795,8 @@ public static class Payment {
 {{< /tab >}}
 {{< /tabs >}}
 
-After generating the data and performing the union, the stateful Joiner 
buffers events until a matching pair is
-found. Any duplicates in either of the input tables are ignored.
+After generating the data, the stateful Joiner buffers events until a matching 
pair is found. Any duplicates in either
+of the input tables are ignored.
 
 {{< tabs "1737eeed-3d13-455c-8e2f-5e164da9f844" >}}
 {{< tab "Java" >}}
@@ -1727,7 +1808,8 @@ public static class Joiner extends 
ProcessTableFunction<JoinResult> {
   public void eval(
     Context ctx,
     @StateHint(ttl = "1 hour") JoinResult seen,
-    @ArgumentHint(TABLE_AS_SET) UnifiedEvent input
+    @ArgumentHint(TABLE_AS_SET) Order order,
+    @ArgumentHint(TABLE_AS_SET) Payment payment
   ) {
     if (input.order != null) {
       if (seen.order != null) {
@@ -1767,19 +1849,18 @@ The output could look similar to the following. 
Duplicate events for payment `99
 for `Charly` could not be found.
 
 ```text
-+----+-------------+--------------------------------+--------------------------------+
-| op |     orderId |                          order |                        
payment |
-+----+-------------+--------------------------------+--------------------------------+
-| +I |     1000021 | (amount=6.99, currency=USD,... | (id=999993331, 
orderId=1000... |
-| +I |     1000601 | (amount=0.79, currency=EUR,... | (id=999994111, 
orderId=1000... |
-| +I |     1000001 | (amount=23.46, currency=USD... | (id=999997870, 
orderId=1000... |
-+----+-------------+--------------------------------+--------------------------------+
++----+-------------+-------------+--------------------------------+--------------------------------+
+| op |          id |     orderId |                          order |            
            payment |
++----+-------------+-------------+--------------------------------+--------------------------------+
+| +I |     1000021 |     1000021 | (amount=6.99, currency=USD,... | 
(id=999993331, orderId=1000... |
+| +I |     1000601 |     1000601 | (amount=0.79, currency=EUR,... | 
(id=999994111, orderId=1000... |
+| +I |     1000001 |     1000001 | (amount=23.46, currency=USD... | 
(id=999997870, orderId=1000... |
++----+-------------+-------------+--------------------------------+--------------------------------+
 ```
 
 Limitations
 -----------
 
 PTFs are in an early stage. The following limitations apply:
-- Multiple table arguments are not supported.
 - PTFs cannot run in batch mode.
 - Broadcast state
diff --git a/docs/content/docs/dev/table/functions/ptfs.md 
b/docs/content/docs/dev/table/functions/ptfs.md
index 104274b50ce..f3c94960533 100644
--- a/docs/content/docs/dev/table/functions/ptfs.md
+++ b/docs/content/docs/dev/table/functions/ptfs.md
@@ -1015,6 +1015,107 @@ not needed anymore via `Context#clearAllTimers()` or 
`TimeContext#clearTimer(Str
 
 {{< top >}}
 
+Multiple Tables
+---------------
+
+A PTF can process multiple tables simultaneously. This enables a variety of 
use cases, including:
+
+- Implementing **custom joins** that efficiently manage state.
+- Enriching the main table with information from dimension tables as **side 
inputs**.
+- Sending **control events** to the keyed virtual processor during runtime.
+
+The `eval()` method can specify multiple table arguments to support multiple 
inputs. All table arguments must be declared
+with set semantics and use consistent partitioning. In other words, the number 
of columns and their data types in the
+`PARTITION BY` clause must match across all involved table arguments.
+
+Rows from either input are passed to the function one at a time. Thus, only 
one table argument is non-null at a time. Use
+null checks to determine which input is currently being processed.
+
+{{< hint warning >}}
+The system decides which input row is streamed through the virtual processor 
next. If not handled properly in the PTF,
+this can lead to race conditions between inputs and, consequently, to 
non-deterministic results. It is recommended to
+design the function in such a way that the join is either time-based (i.e., 
waiting for all rows to arrive up to a given
+watermark) or condition-based, where the PTF buffers one or more input rows 
until a specific condition is met.
+{{< /hint >}}
+
+### Example: Custom Join
+
+The following example illustrates how to implement a custom join between two 
tables:
+
+{{< tabs "2137eeed-3d13-455c-8e2f-5e164da9f844" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment env = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+env.executeSql("CREATE VIEW Visits(name) AS VALUES ('Bob'), ('Alice'), 
('Bob')");
+env.executeSql("CREATE VIEW Purchases(customer, item) AS VALUES ('Alice', 
'milk')");
+
+env.createFunction("Greeting", GreetingWithLastPurchase.class);
+
+env
+  .executeSql("SELECT * FROM Greeting(TABLE Visits PARTITION BY name, TABLE 
Purchases PARTITION BY customer)")
+  .print();
+
+// --------------------
+// Function declaration
+// --------------------
+
+// Function that greets a customer and suggests the last purchase made, if 
available.
+public static class GreetingWithLastPurchase extends 
ProcessTableFunction<String> {
+
+  // Keep the last purchased item in state
+  public static class LastItemState {
+    public String lastItem;
+  }
+
+  // The eval() method takes two @ArgumentHint(TABLE_AS_SET) arguments
+  public void eval(
+      @StateHint LastItemState state,
+      @ArgumentHint(TABLE_AS_SET) Row visit,
+      @ArgumentHint(TABLE_AS_SET) Row purchase) {
+
+    // Process row from table Purchases
+    if (purchase != null) {
+      state.lastItem = purchase.getFieldAs("item");
+    }
+
+    // Process row from table Visits
+    else if (visit != null) {
+      if (state.lastItem == null) {
+        collect("Hello " + visit.getFieldAs("name") + ", let me know if I can 
help!");
+      } else {
+        collect("Hello " + visit.getFieldAs("name") + ", here to buy " + 
state.lastItem + " again?");
+      }
+    }
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The result will look similar to:
+
+```text
++----+--------------------------------+--------------------------------+--------------------------------+
+| op |                           name |                       customer |       
                  EXPR$0 |
++----+--------------------------------+--------------------------------+--------------------------------+
+| +I |                            Bob |                            Bob | Hello 
Bob, let me know if I... |
+| +I |                          Alice |                          Alice | Hello 
Alice, here to buy Pr... |
+| +I |                            Bob |                            Bob | Hello 
Bob, let me know if I... |
++----+--------------------------------+--------------------------------+--------------------------------+
+```
+
+### Efficiency and Design Principles
+
+A high number of input tables can negatively impact a single TaskManager or 
subtask. Network buffers must be allocated
+for each input, resulting in increased memory consumption which is why the 
number of table arguments is limited to a
+maximum of 20 tables.
+
+Unevenly distributed keys may overload a single virtual processor, leading to 
backpressure. It is important to select
+appropriate partition keys.
+
+{{< top >}}
+
 Query Evolution with UIDs
 -------------------------
 
@@ -1061,7 +1162,8 @@ END;
 
 {{< top >}}
 
-## Pass-Through Columns
+Pass-Through Columns
+--------------------
 
 Depending on the table semantics and whether an `on_time` argument has been 
defined, the system adds addition columns for
 every function output.
@@ -1089,7 +1191,7 @@ With pass-through columns: | k | v | c1 | c2 |
 
 This allows the PTF to focus on the main aggregation without the need to 
manually forward input columns.
 
-*Note*: Timers are not available when pass-through columns are enabled.
+*Note*: Pass-through columns are only available for append-only PTFs taking a 
single table argument and don't use timers.
 
 {{< top >}}
 
@@ -1610,9 +1712,6 @@ while the PTF exists once in the pipeline.
 The following example shows how a PTF can be used for joining. Additionally, 
it also showcases how a PTF can be used as
 a data generator for creating bounded tables with dummy data.
 
-Because PTFs don't support multiple table arguments yet, we use `unionAll` to 
for passing multiple partitioned tables
-into the PTF. Because a union requires a unified schema, the data generators 
transform the data into a `UnifiedEvent`.
-
 {{< tabs "1637eeed-3d13-455c-8e2f-5e164da9f844" >}}
 {{< tab "Java" >}}
 ```java
@@ -1625,11 +1724,11 @@ TableEnvironment env = 
TableEnvironment.create(EnvironmentSettings.inStreamingMo
 Table orders = env.fromCall(OrderGenerator.class);
 Table payments = env.fromCall(PaymentGenerator.class);
 
-// Union orders and payments before
-// partitioning and passing them into the Joiner function
-Table joined = orders.unionAll(payments)
-  .partitionBy($("orderId"))
-  .process(Joiner.class);
+// Partition orders and payments and pass them into the Joiner function
+Table joined = env.fromCall(
+  Joiner.class,
+  orders.partitionBy($("id")).asArgument("order"),
+  payments.partitionBy($("orderId")).asArgument("payment"));
 
 joined.execute().print();
 
@@ -1637,24 +1736,8 @@ joined.execute().print();
 // Data Generation
 // ---------------------------
 
-// A unified event for all input tables.
-// One of the sides is al empty.
-public static class UnifiedEvent {
-  public int orderId;
-  public Order order;
-  public Payment payment;
-
-  public static UnifiedEvent of(int orderId, Order order, Payment payment) {
-    UnifiedEvent unifiedEvent = new UnifiedEvent();
-    unifiedEvent.orderId = orderId;
-    unifiedEvent.order = order;
-    unifiedEvent.payment = payment;
-    return unifiedEvent;
-  }
-}
-
 // A PTF that generates Orders
-public static class OrderGenerator extends ProcessTableFunction<UnifiedEvent> {
+public static class OrderGenerator extends ProcessTableFunction<Order> {
   public void eval() {
     Stream.of(
       Order.of("Bob", 1000001, 23.46, "USD"),
@@ -1662,13 +1745,12 @@ public static class OrderGenerator extends 
ProcessTableFunction<UnifiedEvent> {
       Order.of("Alice", 1000601, 0.79, "EUR"),
       Order.of("Charly", 1000703, 100.60, "EUR")
     )
-    .map(order -> UnifiedEvent.of(order.id, order, null))
     .forEach(this::collect);
   }
 }
 
 // A PTF that generates Payments
-public static class PaymentGenerator extends 
ProcessTableFunction<UnifiedEvent> {
+public static class PaymentGenerator extends ProcessTableFunction<Payment> {
   public void eval() {
     Stream.of(
       Payment.of(999997870, 1000001),
@@ -1676,7 +1758,6 @@ public static class PaymentGenerator extends 
ProcessTableFunction<UnifiedEvent>
       Payment.of(999993331, 1000021),
       Payment.of(999994111, 1000601)
     )
-    .map(payment -> UnifiedEvent.of(payment.orderId, null, payment))
     .forEach(this::collect);
   }
 }
@@ -1714,8 +1795,8 @@ public static class Payment {
 {{< /tab >}}
 {{< /tabs >}}
 
-After generating the data and performing the union, the stateful Joiner 
buffers events until a matching pair is
-found. Any duplicates in either of the input tables are ignored.
+After generating the data, the stateful Joiner buffers events until a matching 
pair is found. Any duplicates in either
+of the input tables are ignored.
 
 {{< tabs "1737eeed-3d13-455c-8e2f-5e164da9f844" >}}
 {{< tab "Java" >}}
@@ -1727,7 +1808,8 @@ public static class Joiner extends 
ProcessTableFunction<JoinResult> {
   public void eval(
     Context ctx,
     @StateHint(ttl = "1 hour") JoinResult seen,
-    @ArgumentHint(TABLE_AS_SET) UnifiedEvent input
+    @ArgumentHint(TABLE_AS_SET) Order order,
+    @ArgumentHint(TABLE_AS_SET) Payment payment
   ) {
     if (input.order != null) {
       if (seen.order != null) {
@@ -1767,19 +1849,18 @@ The output could look similar to the following. 
Duplicate events for payment `99
 for `Charly` could not be found.
 
 ```text
-+----+-------------+--------------------------------+--------------------------------+
-| op |     orderId |                          order |                        
payment |
-+----+-------------+--------------------------------+--------------------------------+
-| +I |     1000021 | (amount=6.99, currency=USD,... | (id=999993331, 
orderId=1000... |
-| +I |     1000601 | (amount=0.79, currency=EUR,... | (id=999994111, 
orderId=1000... |
-| +I |     1000001 | (amount=23.46, currency=USD... | (id=999997870, 
orderId=1000... |
-+----+-------------+--------------------------------+--------------------------------+
++----+-------------+-------------+--------------------------------+--------------------------------+
+| op |          id |     orderId |                          order |            
            payment |
++----+-------------+-------------+--------------------------------+--------------------------------+
+| +I |     1000021 |     1000021 | (amount=6.99, currency=USD,... | 
(id=999993331, orderId=1000... |
+| +I |     1000601 |     1000601 | (amount=0.79, currency=EUR,... | 
(id=999994111, orderId=1000... |
+| +I |     1000001 |     1000001 | (amount=23.46, currency=USD... | 
(id=999997870, orderId=1000... |
++----+-------------+-------------+--------------------------------+--------------------------------+
 ```
 
 Limitations
 -----------
 
 PTFs are in an early stage. The following limitations apply:
-- Multiple table arguments are not supported.
 - PTFs cannot run in batch mode.
 - Broadcast state
diff --git 
a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html 
b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
index ae2caa7aad8..7c3af56716e 100644
--- a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
@@ -77,6 +77,12 @@ ONE_PHASE: Enforce to use one stage aggregate which only has 
CompleteGlobalAggre
             <td><p>Enum</p></td>
             <td>When it is `TRY_RESOLVE`, the optimizer tries to resolve the 
correctness issue caused by 'Non-Deterministic Updates' (NDU) in a changelog 
pipeline. Changelog may contain kinds of message types: Insert (I), Delete (D), 
Update_Before (UB), Update_After (UA). There's no NDU problem in an insert only 
changelog pipeline. For updates, there are  three main NDU problems:<br />1. 
Non-deterministic functions, include scalar, table, aggregate functions, both 
builtin and custom ones [...]
         </tr>
+        <tr>
+            <td><h5>table.optimizer.ptf.max-tables</h5><br> <span class="label 
label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">20</td>
+            <td>Integer</td>
+            <td>The maximum number of table arguments for a Process Table 
Function (PTF). In theory, a PTF can accept an arbitrary number of input 
tables. In practice, however, each input requires reserving network buffers, 
which impacts memory usage. For this reason, the number of input tables is 
limited to 20.</td>
+        </tr>
         <tr>
             
<td><h5>table.optimizer.reuse-optimize-block-with-digest-enabled</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/KeyedMultipleInputTransformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/KeyedMultipleInputTransformation.java
index 9adb1280dd1..00da25b3ffa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/KeyedMultipleInputTransformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/KeyedMultipleInputTransformation.java
@@ -45,6 +45,18 @@ public class KeyedMultipleInputTransformation<OUT>
         updateManagedMemoryStateBackendUseCase(true);
     }
 
+    public KeyedMultipleInputTransformation(
+            String name,
+            StreamOperatorFactory<OUT> operatorFactory,
+            TypeInformation<OUT> outputType,
+            int parallelism,
+            boolean parallelismConfigured,
+            TypeInformation<?> stateKeyType) {
+        super(name, operatorFactory, outputType, parallelism, 
parallelismConfigured);
+        this.stateKeyType = stateKeyType;
+        updateManagedMemoryStateBackendUseCase(true);
+    }
+
     public KeyedMultipleInputTransformation<OUT> addInput(
             Transformation<?> input, KeySelector<?, ?> keySelector) {
         inputs.add(input);
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
index 62d085d5cd6..71577258819 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
@@ -362,6 +362,17 @@ public class OptimizerConfigOptions {
                                     + "it receives incremental accumulators 
and outputs incremental results). "
                                     + "In this way, we can reduce some state 
overhead and resources. Default is enabled.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Integer> TABLE_OPTIMIZER_PTF_MAX_TABLES =
+            key("table.optimizer.ptf.max-tables")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription(
+                            "The maximum number of table arguments for a 
Process Table Function (PTF). In theory, a PTF "
+                                    + "can accept an arbitrary number of input 
tables. In practice, however, each input "
+                                    + "requires reserving network buffers, 
which impacts memory usage. For this reason, "
+                                    + "the number of input tables is limited 
to 20.");
+
     /** Strategy for handling non-deterministic updates. */
     @PublicEvolving
     public enum NonDeterministicUpdateStrategy {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index a24379f4001..f5c585087dd 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -727,11 +727,6 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
             return -1;
         }
 
-        @Override
-        public List<String> coPartitionArgs() {
-            return List.of();
-        }
-
         @Override
         public Optional<ChangelogMode> changelogMode() {
             return Optional.empty();
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java
index 2e4829f8bb4..2468f6c7493 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java
@@ -99,10 +99,8 @@ public enum ArgumentTrait {
      * With pass-through columns: | k | v | c1 | c2 |
      * </pre>
      *
-     * <p>In case of multiple table arguments, pass-through columns are added 
according to the
-     * declaration order in the PTF signature.
-     *
-     * <p>Timers are not available when pass-through columns are enabled.
+     * <p>Pass-through columns are only available for append-only PTFs taking 
a single table
+     * argument and don't use timers.
      *
      * <p>Note: This trait is valid for {@link #TABLE_AS_ROW} and {@link 
#TABLE_AS_SET} arguments.
      */
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java
index 744484db2d2..df182cb0f20 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java
@@ -32,8 +32,7 @@ import org.apache.flink.types.RowKind;
  *
  * <p>Note: This interface is intended for advanced use cases and should be 
implemented with care.
  * Emitting an incorrect changelog from the PTF may lead to undefined behavior 
in the overall query.
- * Many features such as the `on_time` argument and pass-through columns are 
not available for
- * updating PTFs.
+ * The `on_time` argument is unsupported for updating PTFs.
  *
  * <p>The resulting changelog mode can be influenced by:
  *
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java
index 6119b6ba791..4ff04789aa8 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.types.DataType;
 
-import java.util.List;
 import java.util.Optional;
 
 /**
@@ -106,15 +105,6 @@ public interface TableSemantics {
      */
     int timeColumn();
 
-    /**
-     * Returns information about which passed tables are co-partitioned with 
the passed table.
-     * Applies only to table arguments with set semantics.
-     *
-     * @return List of table argument names (not table names!) that are 
co-partitioned with the
-     *     passed table.
-     */
-    List<String> coPartitionArgs();
-
     /**
      * Actual changelog mode for the passed table. By default, table arguments 
take only {@link
      * ChangelogMode#insertOnly()}. They are able to take tables of other 
changelog modes, if
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
index 0dbdc20ae82..aac34f546de 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.RowType.RowField;
 import org.apache.flink.table.types.logical.TimestampKind;
 import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.table.types.logical.utils.LogicalTypeMerging;
 import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
@@ -43,11 +44,11 @@ import org.apache.flink.types.ColumnList;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Predicate;
@@ -143,7 +144,7 @@ public class SystemTypeInference {
 
         checkReservedArgs(declaredArgs);
         checkMultipleTableArgs(declaredArgs);
-        checkUpdatingPassThroughColumns(declaredArgs);
+        checkPassThroughColumns(declaredArgs);
 
         final List<StaticArgument> newStaticArgs = new 
ArrayList<>(declaredArgs);
         newStaticArgs.addAll(PROCESS_TABLE_FUNCTION_SYSTEM_ARGS);
@@ -166,22 +167,31 @@ public class SystemTypeInference {
     }
 
     private static void checkMultipleTableArgs(List<StaticArgument> 
staticArgs) {
-        if (staticArgs.stream().filter(arg -> 
arg.is(StaticArgumentTrait.TABLE)).count() > 1) {
+        if (staticArgs.stream().filter(arg -> 
arg.is(StaticArgumentTrait.TABLE)).count() <= 1) {
+            return;
+        }
+        if (staticArgs.stream().anyMatch(arg -> 
!arg.is(StaticArgumentTrait.TABLE_AS_SET))) {
             throw new ValidationException(
-                    "Currently, only signatures with at most one table 
argument are supported.");
+                    "All table arguments must use set semantics if multiple 
table arguments are declared.");
         }
     }
 
-    private static void checkUpdatingPassThroughColumns(List<StaticArgument> 
staticArgs) {
+    private static void checkPassThroughColumns(List<StaticArgument> 
staticArgs) {
         final Set<StaticArgumentTrait> traits =
                 staticArgs.stream()
                         .flatMap(arg -> arg.getTraits().stream())
                         .collect(Collectors.toSet());
-        if (traits.contains(StaticArgumentTrait.SUPPORT_UPDATES)
-                && traits.contains(StaticArgumentTrait.PASS_COLUMNS_THROUGH)) {
+        if (!traits.contains(StaticArgumentTrait.PASS_COLUMNS_THROUGH)) {
+            return;
+        }
+        if (traits.contains(StaticArgumentTrait.SUPPORT_UPDATES)) {
             throw new ValidationException(
                     "Signatures with updating inputs must not pass columns 
through.");
         }
+        if (staticArgs.stream().filter(arg -> 
arg.is(StaticArgumentTrait.TABLE)).count() > 1) {
+            throw new ValidationException(
+                    "Pass-through columns are not supported if multiple table 
arguments are declared.");
+        }
     }
 
     private static InputTypeStrategy deriveSystemInputStrategy(
@@ -323,37 +333,44 @@ public class SystemTypeInference {
 
             final Set<String> usedOnTimeFields = new HashSet<>();
 
-            final List<LogicalType> onTimeColumns =
-                    IntStream.range(0, staticArgs.size())
-                            .mapToObj(
-                                    pos -> {
-                                        final StaticArgument staticArg = 
staticArgs.get(pos);
-                                        if 
(!staticArg.is(StaticArgumentTrait.TABLE)) {
-                                            return null;
-                                        }
-                                        final RowType rowType =
-                                                LogicalTypeUtils.toRowType(
-                                                        
args.get(pos).getLogicalType());
-                                        final int onTimeColumn =
-                                                findUniqueOnTimeColumn(
-                                                        staticArg.getName(), 
rowType, onTimeFields);
-                                        if (onTimeColumn >= 0) {
-                                            usedOnTimeFields.add(
-                                                    
rowType.getFieldNames().get(onTimeColumn));
-                                            return 
rowType.getTypeAt(onTimeColumn);
-                                        }
-                                        if 
(staticArg.is(StaticArgumentTrait.REQUIRE_ON_TIME)) {
-                                            throw new ValidationException(
-                                                    String.format(
-                                                            "Table argument 
'%s' requires a time attribute. "
-                                                                    + "Please 
provide one using the implicit `on_time` argument. "
-                                                                    + "For 
example: myFunction(..., on_time => DESCRIPTOR(`my_timestamp`)",
-                                                            
staticArg.getName()));
-                                        }
-                                        return null;
-                                    })
-                            .filter(Objects::nonNull)
-                            .collect(Collectors.toList());
+            final List<LogicalType> onTimeColumns = new ArrayList<>();
+            final List<String> missingOnTimeColumns = new ArrayList<>();
+            IntStream.range(0, staticArgs.size())
+                    .forEach(
+                            pos -> {
+                                final StaticArgument staticArg = 
staticArgs.get(pos);
+                                if (!staticArg.is(StaticArgumentTrait.TABLE)) {
+                                    return;
+                                }
+                                final RowType rowType =
+                                        
LogicalTypeUtils.toRowType(args.get(pos).getLogicalType());
+                                final int onTimeColumn =
+                                        findUniqueOnTimeColumn(
+                                                staticArg.getName(), rowType, 
onTimeFields);
+                                if (onTimeColumn >= 0) {
+                                    
usedOnTimeFields.add(rowType.getFieldNames().get(onTimeColumn));
+                                    
onTimeColumns.add(rowType.getTypeAt(onTimeColumn));
+                                    return;
+                                }
+                                if 
(staticArg.is(StaticArgumentTrait.REQUIRE_ON_TIME)) {
+                                    throw new ValidationException(
+                                            String.format(
+                                                    "Table argument '%s' 
requires a time attribute. "
+                                                            + "Please provide 
one using the implicit `on_time` argument. "
+                                                            + "For example: 
myFunction(..., on_time => DESCRIPTOR(`my_timestamp`)",
+                                                    staticArg.getName()));
+                                } else {
+                                    
missingOnTimeColumns.add(staticArg.getName());
+                                }
+                            });
+
+            if (!onTimeColumns.isEmpty() && !missingOnTimeColumns.isEmpty()) {
+                throw new ValidationException(
+                        "Invalid time attribute declaration. If multiple 
tables are declared, the `on_time` argument "
+                                + "must reference a time column for each table 
argument or none. "
+                                + "Missing time attributes for: "
+                                + missingOnTimeColumns);
+            }
 
             final Set<String> unusedOnTimeFields = new HashSet<>(onTimeFields);
             unusedOnTimeFields.removeAll(usedOnTimeFields);
@@ -494,7 +511,7 @@ public class SystemTypeInference {
             }
 
             try {
-                checkTableArgTraits(staticArgs, callContext);
+                checkTableArgs(staticArgs, callContext);
                 checkUidArg(callContext);
             } catch (ValidationException e) {
                 return callContext.fail(throwOnFailure, e.getMessage());
@@ -527,8 +544,9 @@ public class SystemTypeInference {
             }
         }
 
-        private static void checkTableArgTraits(
+        private static void checkTableArgs(
                 List<StaticArgument> staticArgs, CallContext callContext) {
+            final List<TableSemantics> tableSemantics = new ArrayList<>();
             IntStream.range(0, staticArgs.size())
                     .forEach(
                             pos -> {
@@ -546,7 +564,46 @@ public class SystemTypeInference {
                                 }
                                 checkRowSemantics(staticArg, semantics);
                                 checkSetSemantics(staticArg, semantics);
+                                tableSemantics.add(semantics);
                             });
+            checkCoPartitioning(tableSemantics);
+        }
+
+        private static void checkCoPartitioning(List<TableSemantics> 
tableSemantics) {
+            if (tableSemantics.isEmpty()) {
+                return;
+            }
+            final List<LogicalType> partitioningTypes =
+                    tableSemantics.stream()
+                            .map(
+                                    semantics -> {
+                                        final LogicalType tableType =
+                                                
semantics.dataType().getLogicalType();
+                                        final List<LogicalType> fieldTypes =
+                                                
LogicalTypeChecks.getFieldTypes(tableType);
+                                        final LogicalType[] partitionTypes =
+                                                
Arrays.stream(semantics.partitionByColumns())
+                                                        
.mapToObj(fieldTypes::get)
+                                                        
.toArray(LogicalType[]::new);
+                                        return (LogicalType) 
RowType.of(partitionTypes);
+                                    })
+                            .collect(Collectors.toList());
+            final LogicalType commonType =
+                    
LogicalTypeMerging.findCommonType(partitioningTypes).orElse(null);
+            if (commonType == null
+                    || partitioningTypes.stream()
+                            .anyMatch(
+                                    partitioningType ->
+                                            
!LogicalTypeCasts.supportsAvoidingCast(
+                                                    partitioningType, 
commonType))) {
+                throw new ValidationException(
+                        "Invalid PARTITION BY columns. The number of columns 
and their data types must match "
+                                + "across all involved table arguments. Given 
partition key sets: "
+                                + partitioningTypes.stream()
+                                        .map(LogicalType::getChildren)
+                                        .map(Object::toString)
+                                        .collect(Collectors.joining(", ")));
+            }
         }
 
         private static void checkRowSemantics(StaticArgument staticArg, 
TableSemantics semantics) {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
index f5fb5c62f22..091fcae385d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
@@ -256,11 +256,6 @@ public final class CallBindingCallContext extends 
AbstractSqlCallContext {
             return -1;
         }
 
-        @Override
-        public List<String> coPartitionArgs() {
-            return List.of();
-        }
-
         @Override
         public Optional<ChangelogMode> changelogMode() {
             return Optional.empty();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
index dd1b9d45695..cd226f9b417 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
@@ -317,11 +317,6 @@ public final class OperatorBindingCallContext extends 
AbstractSqlCallContext {
             return timeColumn;
         }
 
-        @Override
-        public List<String> coPartitionArgs() {
-            return List.of();
-        }
-
         @Override
         public Optional<ChangelogMode> changelogMode() {
             return Optional.ofNullable(changelogMode);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
index 9b39f4d549b..08ebe72f9e4 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
@@ -302,7 +302,7 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
             final RelDataType outputRelDataType =
                     typeFactory.buildRelNodeRowType((RowType) outputType);
 
-            final List<RelNode> inputs = new ArrayList<>();
+            final List<RelNode> inputStack = new ArrayList<>();
             final List<RexNode> rexNodeArgs =
                     resolvedArgs.stream()
                             .map(
@@ -329,16 +329,19 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
                                             final RexTableArgCall tableArgCall 
=
                                                     new RexTableArgCall(
                                                             rowType,
-                                                            inputs.size(),
+                                                            inputStack.size(),
                                                             partitionKeys,
                                                             new int[0]);
-                                            inputs.add(relBuilder.build());
+                                            inputStack.add(relBuilder.build());
                                             return tableArgCall;
                                         }
                                         return 
convertExprToRexNode(resolvedArg);
                                     })
                             .collect(Collectors.toList());
 
+            // relBuilder.build() works in LIFO fashion, this restores the 
original input order
+            Collections.reverse(inputStack);
+
             final BridgingSqlFunction sqlFunction =
                     BridgingSqlFunction.of(relBuilder.getCluster(), 
contextFunction);
 
@@ -349,7 +352,7 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
             final RelNode functionScan =
                     LogicalTableFunctionScan.create(
                             relBuilder.getCluster(),
-                            inputs,
+                            inputStack,
                             call,
                             null,
                             outputRelDataType,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
index bb607b8d65c..4fa75ee2d6e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
@@ -20,10 +20,11 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.stream;
 
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import 
org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.ProcessTableFunction;
@@ -166,10 +167,6 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
                 getInputEdges().stream()
                         .map(e -> (Transformation<RowData>) 
e.translateToPlan(planner))
                         .collect(Collectors.toList());
-        if (inputTransforms.size() != 1) {
-            throw new TableException("Process table function only supports 
exactly one input.");
-        }
-        final Transformation<RowData> inputTransform = inputTransforms.get(0);
 
         final List<Ord<StaticArgument>> providedInputArgs =
                 
StreamPhysicalProcessTableFunction.getProvidedInputArgs(invocation);
@@ -224,20 +221,12 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
                         .map(t -> 
EqualiserCodeGenerator.generateRowEquals(ctx, t, "StateEquals"))
                         .toArray(GeneratedRecordEqualiser[]::new);
 
-        final RuntimeTableSemantics singleTableSemantics;
-        if (runtimeTableSemantics.isEmpty()) {
-            // For constant function calls
-            singleTableSemantics = null;
-        } else {
-            singleTableSemantics = runtimeTableSemantics.get(0);
-        }
-
         final RuntimeChangelogMode producedChangelogMode =
                 RuntimeChangelogMode.serialize(outputChangelogMode);
 
         final ProcessTableOperatorFactory operatorFactory =
                 new ProcessTableOperatorFactory(
-                        singleTableSemantics,
+                        runtimeTableSemantics,
                         runtimeStateInfos,
                         generatedRunner,
                         stateHashCode,
@@ -253,24 +242,17 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
                         createTransformationName(config),
                         createTransformationDescription(config));
 
-        final OneInputTransformation<RowData, RowData> transform =
-                ExecNodeUtil.createOneInputTransformation(
-                        inputTransform,
-                        metadata,
-                        operatorFactory,
-                        InternalTypeInfo.of(getOutputType()),
-                        inputTransform.getParallelism(),
-                        false);
-
-        // For one input (but non-constant) functions with set semantics
-        if (singleTableSemantics != null && 
singleTableSemantics.hasSetSemantics()) {
-            final RowDataKeySelector selector =
-                    KeySelectorUtil.getRowDataSelector(
-                            planner.getFlinkContext().getClassLoader(),
-                            singleTableSemantics.partitionByColumns(),
-                            (InternalTypeInfo<RowData>) 
inputTransform.getOutputType());
-            transform.setStateKeySelector(selector);
-            transform.setStateKeyType(selector.getProducedType());
+        final Transformation<RowData> transform;
+        if 
(runtimeTableSemantics.stream().anyMatch(RuntimeTableSemantics::hasSetSemantics))
 {
+            transform =
+                    createKeyedTransformation(
+                            inputTransforms,
+                            metadata,
+                            operatorFactory,
+                            planner,
+                            runtimeTableSemantics);
+        } else {
+            transform = createNonKeyedTransformation(inputTransforms, 
metadata, operatorFactory);
         }
 
         if (inputsContainSingleton()) {
@@ -306,6 +288,57 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
                 timeColumn);
     }
 
+    private Transformation<RowData> createKeyedTransformation(
+            List<Transformation<RowData>> inputTransforms,
+            TransformationMetadata metadata,
+            ProcessTableOperatorFactory operatorFactory,
+            PlannerBase planner,
+            List<RuntimeTableSemantics> runtimeTableSemantics) {
+        assert runtimeTableSemantics.size() == inputTransforms.size();
+
+        final List<KeySelector<RowData, RowData>> keySelectors =
+                runtimeTableSemantics.stream()
+                        .map(
+                                inputSemantics ->
+                                        KeySelectorUtil.getRowDataSelector(
+                                                
planner.getFlinkContext().getClassLoader(),
+                                                
inputSemantics.partitionByColumns(),
+                                                (InternalTypeInfo<RowData>)
+                                                        inputTransforms
+                                                                
.get(inputSemantics.getInputIndex())
+                                                                
.getOutputType()))
+                        .collect(Collectors.toList());
+
+        final KeyedMultipleInputTransformation<RowData> transform =
+                ExecNodeUtil.createKeyedMultiInputTransformation(
+                        inputTransforms,
+                        keySelectors,
+                        ((RowDataKeySelector) 
keySelectors.get(0)).getProducedType(),
+                        metadata,
+                        operatorFactory,
+                        InternalTypeInfo.of(getOutputType()),
+                        inputTransforms.get(0).getParallelism(),
+                        false);
+
+        transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
+
+        return transform;
+    }
+
+    private Transformation<RowData> createNonKeyedTransformation(
+            List<Transformation<RowData>> inputTransforms,
+            TransformationMetadata metadata,
+            ProcessTableOperatorFactory operatorFactory) {
+        final Transformation<RowData> inputTransform = inputTransforms.get(0);
+        return ExecNodeUtil.createOneInputTransformation(
+                inputTransform,
+                metadata,
+                operatorFactory,
+                InternalTypeInfo.of(getOutputType()),
+                inputTransform.getParallelism(),
+                false);
+    }
+
     private static RuntimeStateInfo createRuntimeStateInfo(
             String name, StateInfo stateInfo, ExecNodeConfig config) {
         return new RuntimeStateInfo(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/ExecNodeUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/ExecNodeUtil.java
index be18acc3d71..9d39cf8e02e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/ExecNodeUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/ExecNodeUtil.java
@@ -21,11 +21,13 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.utils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import 
org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
 import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
@@ -38,6 +40,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /** An Utility class that helps translating {@link ExecNode} to {@link 
Transformation}. */
 public class ExecNodeUtil {
@@ -359,6 +362,33 @@ public class ExecNodeUtil {
                 parallelismConfigured);
     }
 
+    /** Create a {@link KeyedMultipleInputTransformation}. */
+    public static <I, K, O> KeyedMultipleInputTransformation<O> 
createKeyedMultiInputTransformation(
+            List<Transformation<I>> inputs,
+            List<KeySelector<I, K>> keySelectors,
+            TypeInformation<?> keyType,
+            TransformationMetadata transformationMeta,
+            StreamOperatorFactory<O> operatorFactory,
+            TypeInformation<O> outputType,
+            int parallelism,
+            boolean parallelismConfigured) {
+        final KeyedMultipleInputTransformation<O> transformation =
+                new KeyedMultipleInputTransformation<>(
+                        transformationMeta.getName(),
+                        operatorFactory,
+                        outputType,
+                        parallelism,
+                        parallelismConfigured,
+                        keyType);
+        transformationMeta.fill(transformation);
+        IntStream.range(0, inputs.size())
+                .forEach(
+                        inputIdx ->
+                                transformation.addInput(
+                                        inputs.get(inputIdx), 
keySelectors.get(inputIdx)));
+        return transformation;
+    }
+
     /** Return description for multiple input node. */
     public static String getMultipleInputDescription(
             ExecNode<?> rootNode,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
index daf455688c2..6e5acf7efbf 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
@@ -17,8 +17,9 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical.stream;
 
-import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.catalog.ContextResolvedFunction;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.functions.FunctionDefinition;
@@ -41,6 +42,8 @@ import 
org.apache.flink.table.types.inference.StaticArgumentTrait;
 import org.apache.flink.table.types.inference.SystemTypeInference;
 import org.apache.flink.types.RowKind;
 
+import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableSet;
+
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
@@ -64,6 +67,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -100,6 +104,7 @@ public class StreamPhysicalProcessTableFunction extends 
AbstractRelNode
         this.rowType = rowType;
         this.scan = scan;
         this.uid = deriveUniqueIdentifier(scan);
+        verifyInputSize(ShortcutUtils.unwrapTableConfig(cluster), 
inputs.size());
     }
 
     public StreamPhysicalProcessTableFunction(
@@ -161,7 +166,8 @@ public class StreamPhysicalProcessTableFunction extends 
AbstractRelNode
                         .orElseThrow(IllegalStateException::new);
         final RexCall call = (RexCall) scan.getCall();
         verifyTimeAttributes(getInputs(), call, inputChangelogModes, 
outputChangelogMode);
-        verifyPassThroughColumnsForUpdates(call, outputChangelogMode);
+        final List<Ord<StaticArgument>> providedInputArgs = 
getProvidedInputArgs(call);
+        verifyPassThroughColumnsForUpdates(providedInputArgs, 
outputChangelogMode);
         return new StreamExecProcessTableFunction(
                 unwrapTableConfig(this),
                 getInputs().stream().map(i -> 
InputProperty.DEFAULT).collect(Collectors.toList()),
@@ -283,15 +289,25 @@ public class StreamPhysicalProcessTableFunction extends 
AbstractRelNode
     }
 
     private static void verifyPassThroughColumnsForUpdates(
-            RexCall call, ChangelogMode outputChangelogMode) {
-        if (!outputChangelogMode.containsOnly(RowKind.INSERT)
-                && getProvidedInputArgs(call).stream()
+            List<Ord<StaticArgument>> providedInputArgs, ChangelogMode 
requiredChangelogMode) {
+        if (!requiredChangelogMode.containsOnly(RowKind.INSERT)
+                && providedInputArgs.stream()
                         .anyMatch(arg -> 
arg.e.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH))) {
             throw new ValidationException(
                     "Pass-through columns are not supported for PTFs that 
produce updates.");
         }
     }
 
+    private static void verifyInputSize(TableConfig tableConfig, int 
providedInputArgs) {
+        final int maxCount = 
tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_PTF_MAX_TABLES);
+        if (providedInputArgs > maxCount) {
+            throw new ValidationException(
+                    String.format(
+                            "Unsupported table argument count. Currently, the 
number of input tables is limited to %s.",
+                            maxCount));
+        }
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Shared utilities
     // 
--------------------------------------------------------------------------------------------
@@ -414,26 +430,40 @@ public class StreamPhysicalProcessTableFunction extends 
AbstractRelNode
                 .collect(Collectors.toList());
     }
 
-    public static ImmutableBitSet toPartitionColumns(RexCall call) {
+    public static Set<ImmutableBitSet> toPartitionColumns(RexCall call) {
         final List<RexNode> operands = call.getOperands();
         final List<Ord<StaticArgument>> providedInputArgs =
                 StreamPhysicalProcessTableFunction.getProvidedInputArgs(call);
-        if (providedInputArgs.size() > 1) {
-            throw new TableException("More than one table input is not 
supported yet.");
-        }
-        final List<Integer> partitionColumns = new ArrayList<>();
+        final Set<ImmutableBitSet> partitionColumnsPerArg = new HashSet<>();
+        int pos = 0;
         for (Ord<StaticArgument> providedInputArg : providedInputArgs) {
             final RexTableArgCall tableArgCall = (RexTableArgCall) 
operands.get(providedInputArg.i);
             if 
(providedInputArg.e.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH)) {
-                // Output preserved key positions
-                
Arrays.stream(tableArgCall.getPartitionKeys()).forEach(partitionColumns::add);
+                // System type inference ensures that at most one table
+                // argument can pass columns through. In that case, the
+                // output preserves the position of partition columns.
+                // f(t(c1, c2, k1, k2, c3) PARTITION BY (k1, k2))
+                // -> [c1, c2, k1, k2, c3, function out...]
+                assert providedInputArgs.size() == 1;
+                final List<Integer> partitionColumns =
+                        Arrays.stream(tableArgCall.getPartitionKeys())
+                                .boxed()
+                                .collect(Collectors.toList());
+                
partitionColumnsPerArg.add(ImmutableBitSet.of(partitionColumns));
             } else {
-                // Output is prefixed with partition keys only
-                IntStream.range(0, tableArgCall.getPartitionKeys().length)
-                        .forEach(partitionColumns::add);
+                final int partitionKeyCount = 
tableArgCall.getPartitionKeys().length;
+                // Output is prefixed with partition keys:
+                // f(t1 PARTITION BY (k1, k2), t2 PARTITION BY (k3, k4))
+                // -> [k1, k2, k3, k4, function out...]
+                final List<Integer> partitionColumns =
+                        IntStream.range(pos, partitionKeyCount)
+                                .boxed()
+                                .collect(Collectors.toList());
+                pos += partitionKeyCount;
+                
partitionColumnsPerArg.add(ImmutableBitSet.of(partitionColumns));
             }
         }
-        return ImmutableBitSet.of(partitionColumns);
+        return ImmutableSet.copyOf(partitionColumnsPerArg);
     }
 
     public static CallContext toCallContext(
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
index 04f3af55ffd..4f1d8d65d41 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
@@ -34,7 +34,6 @@ import 
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil
 import 
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil.{verifyFunctionAwareOutputType,
 DefaultExpressionEvaluatorFactory}
 import org.apache.flink.table.planner.delegation.PlannerBase
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
-import 
org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalProcessTableFunction
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
 import org.apache.flink.table.runtime.dataview.DataViewUtils
@@ -43,17 +42,15 @@ import 
org.apache.flink.table.runtime.dataview.StateMapView.KeyedStateMapViewWit
 import org.apache.flink.table.runtime.generated.{GeneratedProcessTableRunner, 
ProcessTableRunner}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.extraction.ExtractionUtils
-import org.apache.flink.table.types.inference.{StaticArgument, 
StaticArgumentTrait, SystemTypeInference, TypeInferenceUtil}
+import org.apache.flink.table.types.inference.TypeInferenceUtil
 import org.apache.flink.table.types.inference.TypeInferenceUtil.StateInfo
 import org.apache.flink.table.types.logical.LogicalType
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
 import org.apache.flink.types.Row
 
-import org.apache.calcite.rex.{RexCall, RexCallBinding, RexNode, RexUtil}
-import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.rex.{RexCall, RexNode}
 
 import java.util
-import java.util.Collections
 
 import scala.collection.JavaConverters._
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
index 80a8c8608a3..2a082de0526 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
@@ -664,8 +664,7 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
         if (isUpsert) {
           // Upsert PTFs use the partition keys as upsert keys,
           // thus the keys are unique
-          val partitionColumns = 
StreamPhysicalProcessTableFunction.toPartitionColumns(rel.getCall)
-          ImmutableSet.of(partitionColumns)
+          StreamPhysicalProcessTableFunction.toPartitionColumns(rel.getCall)
         } else {
           null
         }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index 0560bcedc58..e84edf8ad2e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -1529,7 +1529,6 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         val changelogMode = 
changelogFunction.getChangelogMode(changelogContext)
         if (!changelogMode.containsOnly(RowKind.INSERT)) {
           verifyPtfTableArgsForUpdates(call)
-          verifyPtfRequirementsForUpdates(call, requiredChangelogMode, 
changelogMode)
         }
         toTraitSet(changelogMode)
       case _ =>
@@ -1551,16 +1550,4 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
           }
       }
   }
-
-  private def verifyPtfRequirementsForUpdates(
-      call: RexCall,
-      required: ChangelogMode,
-      returned: ChangelogMode): Unit = {
-    if (!required.keyOnlyDeletes() && returned.keyOnlyDeletes()) {
-      throw new ValidationException(
-        s"Unsupported changelog mode returned from PTF 
'${call.getOperator.toString}'. " +
-          s"The system requires that deletions include all fields in DELETE 
messages. " +
-          s"Key-only deletes are not sufficient.")
-    }
-  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
index 7140a578f45..341c3c8a9db 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
@@ -82,6 +82,9 @@ public class ProcessTableFunctionSemanticTests extends 
SemanticTestBase {
                 
ProcessTableFunctionTestPrograms.PROCESS_INVALID_TABLE_AS_ROW_TIMERS,
                 
ProcessTableFunctionTestPrograms.PROCESS_INVALID_PASS_THROUGH_TIMERS,
                 ProcessTableFunctionTestPrograms.PROCESS_LIST_STATE,
-                ProcessTableFunctionTestPrograms.PROCESS_MAP_STATE);
+                ProcessTableFunctionTestPrograms.PROCESS_MAP_STATE,
+                ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT,
+                
ProcessTableFunctionTestPrograms.PROCESS_STATEFUL_MULTI_INPUT_WITH_TIMEOUT,
+                ProcessTableFunctionTestPrograms.PROCESS_UPDATING_MULTI_INPUT);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
index 13c8f1bfb27..59a861289d8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.LateTimersFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ListStateFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MapStateFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MultiInputFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MultiStateFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.NamedTimersFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.OptionalOnTimeFunction;
@@ -55,9 +56,11 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TableAsSetUpdatingArgFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TimeConversionsFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TimeToLiveStateFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TimedJoinFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TypedTableAsRowFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TypedTableAsSetFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.UnnamedTimersFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.UpdatingJoinFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.UpdatingRetractFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.UpdatingUpsertFunction;
 import org.apache.flink.table.test.program.SinkTestStep;
@@ -74,11 +77,15 @@ import static 
org.apache.flink.table.api.Expressions.descriptor;
 import static org.apache.flink.table.api.Expressions.lit;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.BASE_SINK_SCHEMA;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.BASIC_VALUES;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.CITY_VALUES;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.KEYED_BASE_SINK_SCHEMA;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.KEYED_TIMED_BASE_SINK_SCHEMA;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MULTI_BASE_SINK_SCHEMA;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MULTI_VALUES;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PASS_THROUGH_BASE_SINK_SCHEMA;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TIMED_BASE_SINK_SCHEMA;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TIMED_CITY_SOURCE;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TIMED_MULTI_BASE_SINK_SCHEMA;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TIMED_SOURCE;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TIMED_SOURCE_LATE_EVENTS;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.UPDATING_VALUES;
@@ -1216,4 +1223,106 @@ public class ProcessTableFunctionTestPrograms {
                                     .build())
                     .runSql("INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name)")
                     .build();
+
+    public static final TableTestProgram PROCESS_MULTI_INPUT =
+            TableTestProgram.of("process-multi-input", "takes multiple tables")
+                    .setupTemporarySystemFunction("f", 
MultiInputFunction.class)
+                    .setupSql(MULTI_VALUES)
+                    .setupSql(CITY_VALUES)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(MULTI_BASE_SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[Bob, Bob, {+I[Bob, 12], 
null}]",
+                                            "+I[Bob, Bob, {null, +I[Bob, 
London]}]",
+                                            "+I[Alice, Alice, {+I[Alice, 42], 
null}]",
+                                            "+I[Alice, Alice, {null, +I[Alice, 
Berlin]}]",
+                                            "+I[Bob, Bob, {+I[Bob, 99], 
null}]",
+                                            "+I[Charly, Charly, {null, 
+I[Charly, Paris]}]",
+                                            "+I[Bob, Bob, {+I[Bob, 100], 
null}]",
+                                            "+I[Alice, Alice, {+I[Alice, 400], 
null}]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM f(in1 => TABLE t 
PARTITION BY name, in2 => TABLE city PARTITION BY name)")
+                    .build();
+
+    public static final TableTestProgram 
PROCESS_STATEFUL_MULTI_INPUT_WITH_TIMEOUT =
+            TableTestProgram.of(
+                            "process-stateful-multi-input-with-timeout",
+                            "joins two tables and emits the left side after a 
timeout if there is no right side")
+                    .setupTemporarySystemFunction("f", TimedJoinFunction.class)
+                    .setupTableSource(TIMED_SOURCE)
+                    .setupTableSource(TIMED_CITY_SOURCE)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(TIMED_MULTI_BASE_SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[Bob, Bob, 1 score in city 
London, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, Bob, 2 score in city 
London, 1970-01-01T00:00:00.002Z]",
+                                            "+I[Bob, Bob, 3 score in city 
London, 1970-01-01T00:00:00.003Z]",
+                                            "+I[Bob, Bob, 4 score in city 
London, 1970-01-01T00:00:00.004Z]",
+                                            "+I[Bob, Bob, 5 score in city 
London, 1970-01-01T00:00:00.005Z]",
+                                            "+I[Bob, Bob, 6 score in city 
London, 1970-01-01T00:00:00.006Z]",
+                                            "+I[Alice, Alice, no city found 
for score 1, 1970-01-01T00:00:01.001Z]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM f("
+                                    + "scoreTable => TABLE t PARTITION BY 
name, "
+                                    + "cityTable => TABLE city PARTITION BY 
name, "
+                                    + "on_time => DESCRIPTOR(ts))")
+                    .build();
+
+    public static final TableTestProgram PROCESS_UPDATING_MULTI_INPUT =
+            TableTestProgram.of(
+                            "process-updating-multi-input",
+                            "joins two tables with input and output updates")
+                    .setupTemporarySystemFunction("f", 
UpdatingJoinFunction.class)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("scores")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED",
+                                            "score INT NOT NULL")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    
.addOption("source.produces-delete-by-key", "true")
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
5),
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 2),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Bob", 3),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
2),
+                                            Row.ofKind(RowKind.DELETE, 
"Alice", null))
+                                    .build())
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("city")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED",
+                                            "city STRING NOT NULL")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    
.addOption("source.produces-delete-by-key", "true")
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
"London"),
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", "Zurich"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Bob", "Berlin"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(
+                                            "`name` STRING PRIMARY KEY NOT 
ENFORCED",
+                                            "`out` STRING")
+                                    .addOption("sink-changelog-mode-enforced", 
"I,UA,D")
+                                    .addOption("sink.supports-delete-by-key", 
"true")
+                                    .consumedValues(
+                                            "+I[Bob, score 5 in city London]",
+                                            "+I[Alice, score 2 in city 
Zurich]",
+                                            "+U[Bob, score 3 in city London]",
+                                            "+U[Bob, score 3 in city Berlin]",
+                                            "-D[Bob, null]",
+                                            "+I[Bob, score 2 in city Berlin]",
+                                            "-D[Alice, null]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT `name`, `out` FROM f("
+                                    + "scoreTable => TABLE scores PARTITION BY 
name, "
+                                    + "cityTable => TABLE city PARTITION BY 
name)")
+                    .build();
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
index 110297904eb..5082ebda06b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.annotation.ArgumentTrait;
 import org.apache.flink.table.annotation.DataTypeHint;
@@ -30,7 +31,7 @@ import org.apache.flink.table.functions.ChangelogFunction;
 import org.apache.flink.table.functions.ProcessTableFunction;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableSemantics;
-import org.apache.flink.table.runtime.operators.process.ProcessTableOperator;
+import 
org.apache.flink.table.runtime.operators.process.AbstractProcessTableOperator.RunnerContext;
 import org.apache.flink.table.test.program.SourceTestStep;
 import org.apache.flink.types.ColumnList;
 import org.apache.flink.types.Row;
@@ -67,6 +68,20 @@ public class ProcessTableFunctionTestUtils {
             "CREATE VIEW t AS SELECT * FROM "
                     + "(VALUES ('Bob', 12), ('Alice', 42), ('Bob', 99), 
('Bob', 100), ('Alice', 400)) AS T(name, score)";
 
+    public static final String CITY_VALUES =
+            "CREATE VIEW city AS SELECT * FROM "
+                    + "(VALUES ('Bob', 'London'), ('Alice', 'Berlin'), 
('Charly', 'Paris')) AS T(name, city)";
+
+    public static final SourceTestStep TIMED_CITY_SOURCE =
+            SourceTestStep.newBuilder("city")
+                    .addSchema(
+                            "name STRING",
+                            "city STRING",
+                            "ts TIMESTAMP_LTZ(3)",
+                            "WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND")
+                    .producedValues(Row.of("Bob", "London", 
Instant.ofEpochMilli(0)))
+                    .build();
+
     public static final String UPDATING_VALUES =
             "CREATE VIEW t AS SELECT name, COUNT(*) FROM "
                     + "(VALUES ('Bob', 12), ('Alice', 42), ('Bob', 14)) AS 
T(name, score) "
@@ -119,6 +134,18 @@ public class ProcessTableFunctionTestUtils {
     public static final List<String> KEYED_BASE_SINK_SCHEMA =
             List.of("`name` STRING", "`out` STRING");
 
+    /** Corresponds to {@link AppendProcessTableFunctionBase}. */
+    public static final List<String> MULTI_BASE_SINK_SCHEMA =
+            List.of("`name` STRING", "`name0` STRING", "`out` STRING");
+
+    /** Corresponds to {@link AppendProcessTableFunctionBase}. */
+    public static final List<String> TIMED_MULTI_BASE_SINK_SCHEMA =
+            List.of(
+                    "`name` STRING",
+                    "`name0` STRING",
+                    "`out` STRING",
+                    "`rowtime` TIMESTAMP_LTZ(3)");
+
     /** Corresponds to {@link AppendProcessTableFunctionBase}. */
     public static final List<String> PASS_THROUGH_BASE_SINK_SCHEMA =
             List.of("`name` STRING", "`score` INT", "`out` STRING");
@@ -377,8 +404,7 @@ public class ProcessTableFunctionTestUtils {
                 @StateHint(ttl = "0") Score s2,
                 @StateHint Score s3,
                 @ArgumentHint({TABLE_AS_SET, OPTIONAL_PARTITION_BY}) Row r) {
-            final ProcessTableOperator.RunnerContext internalContext =
-                    (ProcessTableOperator.RunnerContext) ctx;
+            final RunnerContext internalContext = (RunnerContext) ctx;
             if (s0.getFieldAs("emitted") == null) {
                 collect(
                         String.format(
@@ -751,19 +777,6 @@ public class ProcessTableFunctionTestUtils {
         }
     }
 
-    /** Testing function. */
-    public static class UpdatingUpsertPartialDeletesFunction
-            extends ChangelogProcessTableFunctionBase {
-        public void eval(Context ctx, @ArgumentHint({TABLE_AS_SET, 
SUPPORT_UPDATES}) Row r) {
-            collectUpdate(ctx, r);
-        }
-
-        @Override
-        public ChangelogMode getChangelogMode(ChangelogContext 
changelogContext) {
-            return ChangelogMode.upsert();
-        }
-    }
-
     /** Testing function. */
     public static class UpdatingUpsertFullDeletesFunction
             extends ChangelogProcessTableFunctionBase {
@@ -798,6 +811,103 @@ public class ProcessTableFunctionTestUtils {
         }
     }
 
+    /** Testing function. */
+    public static class MultiInputFunction extends 
AppendProcessTableFunctionBase {
+        public void eval(
+                Context ctx,
+                @ArgumentHint(TABLE_AS_SET) Row in1,
+                @ArgumentHint({TABLE_AS_SET, OPTIONAL_PARTITION_BY}) Row in2)
+                throws Exception {
+            collectObjects(in1, in2);
+        }
+    }
+
+    /** Testing function. */
+    public static class TimedJoinFunction extends 
AppendProcessTableFunctionBase {
+        public void eval(
+                Context ctx,
+                @StateHint Tuple1<Integer> score,
+                @StateHint Tuple1<String> city,
+                @ArgumentHint({TABLE_AS_SET, REQUIRE_ON_TIME}) Row scoreTable,
+                @ArgumentHint({TABLE_AS_SET, REQUIRE_ON_TIME}) Row cityTable)
+                throws Exception {
+            final TimeContext<Instant> timeCtx = 
ctx.timeContext(Instant.class);
+            if (scoreTable != null) {
+                score.f0 = scoreTable.getFieldAs("score");
+                timeCtx.registerOnTime("timeout", 
timeCtx.time().plusMillis(1000));
+            }
+            if (cityTable != null) {
+                city.f0 = cityTable.getFieldAs("city");
+            }
+            if (score.f0 != null && city.f0 != null) {
+                collect(Row.of(score.f0 + " score in city " + city.f0));
+                ctx.clearAllTimers();
+            }
+        }
+
+        public void onTimer(OnTimerContext ctx, Tuple1<Integer> score, 
Tuple1<String> city) {
+            collect(Row.of("no city found for score " + score.f0));
+            score.f0 = null;
+        }
+    }
+
+    /**
+     * Implements a custom join that acts like kind of an outer join and never 
produces deletions.
+     * Both the score and city can change at any time. The join will output an 
update if a matching
+     * pair could be found.
+     */
+    @DataTypeHint("ROW<out STRING>")
+    public static class UpdatingJoinFunction extends ProcessTableFunction<Row>
+            implements ChangelogFunction {
+        public void eval(
+                @StateHint Tuple1<Integer> score,
+                @StateHint Tuple1<String> city,
+                @ArgumentHint({TABLE_AS_SET, SUPPORT_UPDATES}) Row scoreTable,
+                @ArgumentHint({TABLE_AS_SET, SUPPORT_UPDATES}) Row cityTable)
+                throws Exception {
+            final boolean wasMatch = isMatch(score, city);
+            if (isDelete(scoreTable) || isDelete(cityTable)) {
+                if (wasMatch) {
+                    collect(Row.ofKind(RowKind.DELETE, (Object) null));
+                }
+            }
+
+            if (scoreTable != null) {
+                apply(score, scoreTable.getFieldAs("score"), 
scoreTable.getKind());
+            }
+            if (cityTable != null) {
+                apply(city, cityTable.getFieldAs("city"), cityTable.getKind());
+            }
+            if (isMatch(score, city)) {
+                collect(
+                        Row.ofKind(
+                                wasMatch ? RowKind.UPDATE_AFTER : 
RowKind.INSERT,
+                                "score " + score.f0 + " in city " + city.f0));
+            }
+        }
+
+        public boolean isDelete(Row r) {
+            return r != null && r.getKind() == RowKind.DELETE;
+        }
+
+        public boolean isMatch(Tuple1<Integer> score, Tuple1<String> city) {
+            return score.f0 != null && city.f0 != null;
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode(ChangelogContext 
changelogContext) {
+            return ChangelogMode.upsert();
+        }
+
+        private static <T> void apply(Tuple1<T> t, T o, RowKind op) {
+            if (op == RowKind.INSERT || op == RowKind.UPDATE_AFTER) {
+                t.f0 = o;
+            } else {
+                t.f0 = null;
+            }
+        }
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Helpers
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
index 45fb5dbb9b7..185eec0ae88 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
@@ -25,9 +25,11 @@ import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.functions.ProcessTableFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.AppendProcessTableFunctionBase;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.DescriptorFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.EmptyArgFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidUpdatingSemanticsFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MultiInputFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RequiredTimeFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ScalarArgsFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TableAsRowFunction;
@@ -37,7 +39,6 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TypedTableAsRowFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TypedTableAsSetFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.UpdatingUpsertFunction;
-import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.UpdatingUpsertPartialDeletesFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.User;
 import org.apache.flink.table.planner.utils.TableTestBase;
 import org.apache.flink.table.planner.utils.TableTestUtil;
@@ -285,9 +286,9 @@ public class ProcessTableFunctionTest extends TableTestBase 
{
                         "Function signature must not declare system arguments. 
Reserved argument names are: [on_time, uid]"),
                 ErrorSpec.ofSelect(
                         "multiple table args",
-                        MultiTableFunction.class,
+                        InvalidMultiTableWithRowFunction.class,
                         "SELECT * FROM f(r1 => TABLE t, r2 => TABLE t)",
-                        "Currently, only signatures with at most one table 
argument are supported."),
+                        "All table arguments must use set semantics if 
multiple table arguments are declared."),
                 ErrorSpec.ofSelect(
                         "row instead of table",
                         TableAsRowFunction.class,
@@ -375,12 +376,58 @@ public class ProcessTableFunctionTest extends 
TableTestBase {
                         "SELECT * FROM f(r => TABLE t_watermarked PARTITION BY 
name, on_time => DESCRIPTOR(ts))",
                         "Time operations using the `on_time` argument are 
currently not supported for "
                                 + "PTFs that consume or produce updates."),
-                ErrorSpec.ofInsertInto(
-                        "PTF returns partial deletes but full deleted are 
required",
-                        UpdatingUpsertPartialDeletesFunction.class,
-                        "INSERT INTO t_full_delete_sink SELECT * FROM f(r => 
TABLE t PARTITION BY name)",
-                        "Unsupported changelog mode returned from PTF 'f'. The 
system requires that deletions include "
-                                + "all fields in DELETE messages. Key-only 
deletes are not sufficient."));
+                ErrorSpec.ofSelect(
+                        "no pass-through for multiple table args",
+                        InvalidPassThroughTables.class,
+                        "SELECT * FROM f(r1 => TABLE t, r2 => TABLE t)",
+                        "Pass-through columns are not supported if multiple 
table arguments are declared."),
+                ErrorSpec.ofSelect(
+                        "on_time must be declared for multiple table args",
+                        MultiInputFunction.class,
+                        "SELECT * FROM f(in1 => TABLE t PARTITION BY name, in2 
=> TABLE t_watermarked PARTITION BY name, "
+                                + "on_time => DESCRIPTOR(ts))",
+                        "Invalid time attribute declaration. If multiple 
tables are declared, "
+                                + "the `on_time` argument must reference a 
time column for each table argument "
+                                + "or none. Missing time attributes for: 
[in1]"),
+                ErrorSpec.ofSelect(
+                        "different partition keys by data type",
+                        MultiInputFunction.class,
+                        "SELECT * FROM f(in1 => TABLE t PARTITION BY score, 
in2 => TABLE t PARTITION BY name)",
+                        "Invalid PARTITION BY columns. The number of columns 
and their data types must match across all "
+                                + "involved table arguments. Given partition 
key sets: [INT NOT NULL], [VARCHAR(5) NOT NULL]"),
+                ErrorSpec.ofSelect(
+                        "different partition keys by column count",
+                        MultiInputFunction.class,
+                        "SELECT * FROM f(in1 => TABLE t PARTITION BY score, 
in2 => TABLE t)",
+                        "Invalid PARTITION BY columns. The number of columns 
and their data types must match across all "
+                                + "involved table arguments. Given partition 
key sets: [INT NOT NULL], []"),
+                ErrorSpec.ofSelect(
+                        "maximum table arguments reached",
+                        HighMultiInputFunction.class,
+                        "SELECT * FROM f("
+                                + "in1 => TABLE t PARTITION BY score, "
+                                + "in2 => TABLE t PARTITION BY score, "
+                                + "in3 => TABLE t PARTITION BY score, "
+                                + "in4 => TABLE t PARTITION BY score, "
+                                + "in5 => TABLE t PARTITION BY score, "
+                                + "in6 => TABLE t PARTITION BY score, "
+                                + "in7 => TABLE t PARTITION BY score, "
+                                + "in8 => TABLE t PARTITION BY score, "
+                                + "in9 => TABLE t PARTITION BY score, "
+                                + "in10 => TABLE t PARTITION BY score, "
+                                + "in11 => TABLE t PARTITION BY score, "
+                                + "in12 => TABLE t PARTITION BY score, "
+                                + "in13 => TABLE t PARTITION BY score, "
+                                + "in14 => TABLE t PARTITION BY score, "
+                                + "in15 => TABLE t PARTITION BY score, "
+                                + "in16 => TABLE t PARTITION BY score, "
+                                + "in17 => TABLE t PARTITION BY score, "
+                                + "in18 => TABLE t PARTITION BY score, "
+                                + "in19 => TABLE t PARTITION BY score, "
+                                + "in20 => TABLE t PARTITION BY score, "
+                                + "in21 => TABLE t PARTITION BY score"
+                                + ")",
+                        "Unsupported table argument count. Currently, the 
number of input tables is limited to 20."));
     }
 
     /** Testing function. */
@@ -390,11 +437,11 @@ public class ProcessTableFunctionTest extends 
TableTestBase {
     }
 
     /** Testing function. */
-    public static class MultiTableFunction extends 
ProcessTableFunction<String> {
+    public static class InvalidMultiTableWithRowFunction extends 
ProcessTableFunction<String> {
         @SuppressWarnings("unused")
         public void eval(
                 @ArgumentHint({TABLE_AS_SET, OPTIONAL_PARTITION_BY}) Row r1,
-                @ArgumentHint({TABLE_AS_SET, OPTIONAL_PARTITION_BY}) Row r2) {}
+                @ArgumentHint(TABLE_AS_ROW) Row r2) {}
     }
 
     /** Testing function. */
@@ -436,6 +483,42 @@ public class ProcessTableFunctionTest extends 
TableTestBase {
         public void eval(@ArgumentHint(value = TABLE_AS_ROW, isOptional = 
true) Row r) {}
     }
 
+    /** Testing function. */
+    public static class InvalidPassThroughTables extends 
ProcessTableFunction<String> {
+        @SuppressWarnings("unused")
+        public void eval(
+                @ArgumentHint({TABLE_AS_SET, PASS_COLUMNS_THROUGH}) Row r1,
+                @ArgumentHint({TABLE_AS_SET, PASS_COLUMNS_THROUGH}) Row r2) {}
+    }
+
+    /** Testing function. */
+    public static class HighMultiInputFunction extends 
AppendProcessTableFunctionBase {
+        @SuppressWarnings("unused")
+        public void eval(
+                @ArgumentHint(TABLE_AS_SET) Row in1,
+                @ArgumentHint(TABLE_AS_SET) Row in2,
+                @ArgumentHint(TABLE_AS_SET) Row in3,
+                @ArgumentHint(TABLE_AS_SET) Row in4,
+                @ArgumentHint(TABLE_AS_SET) Row in5,
+                @ArgumentHint(TABLE_AS_SET) Row in6,
+                @ArgumentHint(TABLE_AS_SET) Row in7,
+                @ArgumentHint(TABLE_AS_SET) Row in8,
+                @ArgumentHint(TABLE_AS_SET) Row in9,
+                @ArgumentHint(TABLE_AS_SET) Row in10,
+                @ArgumentHint(TABLE_AS_SET) Row in11,
+                @ArgumentHint(TABLE_AS_SET) Row in12,
+                @ArgumentHint(TABLE_AS_SET) Row in13,
+                @ArgumentHint(TABLE_AS_SET) Row in14,
+                @ArgumentHint(TABLE_AS_SET) Row in15,
+                @ArgumentHint(TABLE_AS_SET) Row in16,
+                @ArgumentHint(TABLE_AS_SET) Row in17,
+                @ArgumentHint(TABLE_AS_SET) Row in18,
+                @ArgumentHint(TABLE_AS_SET) Row in19,
+                @ArgumentHint(TABLE_AS_SET) Row in20,
+                @ArgumentHint(TABLE_AS_SET) Row in21)
+                throws Exception {}
+    }
+
     private static class ErrorSpec {
         private final String description;
         private final Class<? extends UserDefinedFunction> functionClass;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
index f285f0c3d46..eda3a6c2442 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
@@ -25,11 +25,11 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.functions.ProcessTableFunction;
+import 
org.apache.flink.table.runtime.operators.process.AbstractProcessTableOperator;
+import 
org.apache.flink.table.runtime.operators.process.AbstractProcessTableOperator.RunnerContext;
+import 
org.apache.flink.table.runtime.operators.process.AbstractProcessTableOperator.RunnerOnTimerContext;
 import org.apache.flink.table.runtime.operators.process.PassAllCollector;
 import 
org.apache.flink.table.runtime.operators.process.PassThroughCollectorBase;
-import org.apache.flink.table.runtime.operators.process.ProcessTableOperator;
-import 
org.apache.flink.table.runtime.operators.process.ProcessTableOperator.RunnerContext;
-import 
org.apache.flink.table.runtime.operators.process.ProcessTableOperator.RunnerOnTimerContext;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.function.RunnableWithException;
 
@@ -40,7 +40,7 @@ import java.util.Arrays;
 
 /**
  * Abstraction of code-generated calls to {@link ProcessTableFunction} to be 
used within {@link
- * ProcessTableOperator}.
+ * AbstractProcessTableOperator}.
  */
 @Internal
 public abstract class ProcessTableRunner extends AbstractRichFunction {
@@ -105,7 +105,7 @@ public abstract class ProcessTableRunner extends 
AbstractRichFunction {
     }
 
     public void ingestTableEvent(int pos, RowData row, int timeColumn) {
-        evalCollector.setPrefix(row);
+        evalCollector.setPrefix(pos, row);
         if (timeColumn == -1) {
             rowtime = null;
         } else {
@@ -120,7 +120,7 @@ public abstract class ProcessTableRunner extends 
AbstractRichFunction {
     }
 
     public void ingestTimerEvent(RowData key, @Nullable StringData name, long 
timerTime) {
-        onTimerCollector.setPrefix(key);
+        onTimerCollector.setPrefix(-1, key);
         if (emitRowtime) {
             onTimerCollector.setRowtime(timerTime);
         }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessTableOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
similarity index 88%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessTableOperator.java
rename to 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
index cf7a6bda043..5432a826cc2 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessTableOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
@@ -33,14 +33,12 @@ import 
org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.api.TableRuntimeException;
 import org.apache.flink.table.api.dataview.ListView;
 import org.apache.flink.table.api.dataview.MapView;
@@ -73,15 +71,17 @@ import java.time.LocalDateTime;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
+import java.util.stream.Collectors;
 
-/** Operator for {@link ProcessTableFunction}. */
-public class ProcessTableOperator extends AbstractStreamOperator<RowData>
-        implements OneInputStreamOperator<RowData, RowData>, 
Triggerable<RowData, Object> {
+/** Base class for operators for {@link ProcessTableFunction}. */
+@Internal
+public abstract class AbstractProcessTableOperator extends 
AbstractStreamOperatorV2<RowData>
+        implements Triggerable<RowData, Object> {
+
+    protected final List<RuntimeTableSemantics> tableSemantics;
+    protected final ProcessTableRunner processTableRunner;
 
-    private final @Nullable RuntimeTableSemantics tableSemantics;
     private final List<RuntimeStateInfo> stateInfos;
-    private final ProcessTableRunner processTableRunner;
     private final HashFunction[] stateHashCode;
     private final RecordEqualiser[] stateEquals;
     private final RuntimeChangelogMode producedChangelogMode;
@@ -97,15 +97,16 @@ public class ProcessTableOperator extends 
AbstractStreamOperator<RowData>
     private transient @Nullable InternalTimerService<StringData> 
namedTimerService;
     private transient @Nullable InternalTimerService<VoidNamespace> 
unnamedTimerService;
 
-    public ProcessTableOperator(
+    public AbstractProcessTableOperator(
             StreamOperatorParameters<RowData> parameters,
-            @Nullable RuntimeTableSemantics tableSemantics,
+            List<RuntimeTableSemantics> tableSemantics,
             List<RuntimeStateInfo> stateInfos,
             ProcessTableRunner processTableRunner,
             HashFunction[] stateHashCode,
             RecordEqualiser[] stateEquals,
             RuntimeChangelogMode producedChangelogMode) {
-        super(parameters);
+        // Operator always has at least one input (i.e. empty values)
+        super(parameters, Math.max(tableSemantics.size(), 1));
         this.tableSemantics = tableSemantics;
         this.stateInfos = stateInfos;
         this.processTableRunner = processTableRunner;
@@ -144,15 +145,6 @@ public class ProcessTableOperator extends 
AbstractStreamOperator<RowData>
         FunctionUtils.openFunction(processTableRunner, 
DefaultOpenContext.INSTANCE);
     }
 
-    @Override
-    public void processElement(StreamRecord<RowData> element) throws Exception 
{
-        // Set table argument
-        if (tableSemantics != null) {
-            processTableRunner.ingestTableEvent(0, element.getValue(), 
tableSemantics.timeColumn());
-        }
-        processTableRunner.processEval();
-    }
-
     @Override
     public void processWatermark(Watermark mark) throws Exception {
         super.processWatermark(mark);
@@ -185,9 +177,9 @@ public class ProcessTableOperator extends 
AbstractStreamOperator<RowData>
         }
 
         private Map<String, RuntimeTableSemantics> createTableSemanticsMap() {
-            return Optional.ofNullable(tableSemantics)
-                    .map(s -> Map.of(tableSemantics.getArgName(), 
tableSemantics))
-                    .orElse(Map.of());
+            return tableSemantics.stream()
+                    .map(inputSemantics -> 
Map.entry(inputSemantics.getArgName(), inputSemantics))
+                    .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
         }
 
         private Map<String, Integer> createStateNameToPosMap() {
@@ -289,7 +281,8 @@ public class ProcessTableOperator extends 
AbstractStreamOperator<RowData>
     @SuppressWarnings({"unchecked", "rawtypes"})
     private void setTimerServices() {
         if (shouldEnableTimers()) {
-            final KeyedStateStore keyedStateStore = getKeyedStateStore();
+            final KeyedStateStore keyedStateStore =
+                    
getKeyedStateStore().orElseThrow(IllegalStateException::new);
             final MapStateDescriptor<StringData, Long> namedTimersDescriptor =
                     new MapStateDescriptor<>(
                             "internal-named-timers-map",
@@ -322,14 +315,20 @@ public class ProcessTableOperator extends 
AbstractStreamOperator<RowData>
     }
 
     private void setCollectors() {
-        if (tableSemantics == null || tableSemantics.passColumnsThrough()) {
-            evalCollector = new PassAllCollector(output, changelogMode);
+        final int tableCount = tableSemantics.size();
+        if (tableCount == 0
+                || 
tableSemantics.stream().anyMatch(RuntimeTableSemantics::passColumnsThrough)) {
+            assert tableCount <= 1;
+            // Collect from table event with all input columns (potentially 
none)
+            evalCollector = new PassAllCollector(output, changelogMode, 1);
         } else {
-            evalCollector =
-                    new PassPartitionKeysCollector(
-                            output, changelogMode, 
tableSemantics.partitionByColumns());
+            // Collect from table event with partition keys for each table
+            evalCollector = new PassPartitionKeysCollector(output, 
changelogMode, tableSemantics);
         }
-        onTimerCollector = new PassAllCollector(output, changelogMode);
+
+        // Collect with partition keys for each table but from timer events 
which only contains the
+        // key, so passing all columns is the right strategy
+        onTimerCollector = new PassAllCollector(output, changelogMode, 
tableCount);
     }
 
     private void setStateDescriptors() {
@@ -374,9 +373,10 @@ public class ProcessTableOperator extends 
AbstractStreamOperator<RowData>
     }
 
     private void setStateHandles() {
-        final KeyedStateStore keyedStateStore = getKeyedStateStore();
         final State[] stateHandles = new State[stateDescriptors.length];
         for (int i = 0; i < stateDescriptors.length; i++) {
+            final KeyedStateStore keyedStateStore =
+                    
getKeyedStateStore().orElseThrow(IllegalStateException::new);
             final StateDescriptor<?, ?> stateDescriptor = stateDescriptors[i];
             final State stateHandle;
             if (stateDescriptor instanceof ValueStateDescriptor) {
@@ -396,12 +396,13 @@ public class ProcessTableOperator extends 
AbstractStreamOperator<RowData>
     }
 
     private boolean shouldEmitRowtime() {
-        return tableSemantics != null && tableSemantics.timeColumn() != -1;
+        return !tableSemantics.isEmpty()
+                && tableSemantics.stream().allMatch(input -> 
input.timeColumn() != -1);
     }
 
     private boolean shouldEnableTimers() {
-        return tableSemantics != null
-                && tableSemantics.hasSetSemantics()
-                && !tableSemantics.passColumnsThrough();
+        return !tableSemantics.isEmpty()
+                && tableSemantics.stream()
+                        .allMatch(input -> input.hasSetSemantics() && 
!input.passColumnsThrough());
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/PassAllCollector.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/PassAllCollector.java
index d82b115a684..ac00126f1e8 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/PassAllCollector.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/PassAllCollector.java
@@ -28,11 +28,15 @@ import org.apache.flink.table.data.RowData;
 @Internal
 public class PassAllCollector extends PassThroughCollectorBase {
 
-    public PassAllCollector(Output<StreamRecord<RowData>> output, 
ChangelogMode changelogMode) {
-        super(output, changelogMode);
+    public PassAllCollector(
+            Output<StreamRecord<RowData>> output,
+            ChangelogMode changelogMode,
+            int prefixRepetition) {
+        super(output, changelogMode, prefixRepetition);
     }
 
-    public void setPrefix(RowData input) {
+    @Override
+    public void setPrefix(int pos, RowData input) {
         prefix = input;
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/PassPartitionKeysCollector.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/PassPartitionKeysCollector.java
index e37b4f37a14..f5a309de568 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/PassPartitionKeysCollector.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/PassPartitionKeysCollector.java
@@ -25,19 +25,31 @@ import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.utils.ProjectedRowData;
 
-/** Forwards partition keys of the given row. */
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** Forwards partition keys of the given input's row. */
 @Internal
 public class PassPartitionKeysCollector extends PassThroughCollectorBase {
 
+    private final ProjectedRowData[] partitionKeys;
+
     public PassPartitionKeysCollector(
             Output<StreamRecord<RowData>> output,
             ChangelogMode changelogMode,
-            int[] partitionKeys) {
-        super(output, changelogMode);
-        prefix = ProjectedRowData.from(partitionKeys);
+            List<RuntimeTableSemantics> tableSemantics) {
+        super(output, changelogMode, tableSemantics.size());
+        partitionKeys = new ProjectedRowData[tableSemantics.size()];
+        IntStream.range(0, tableSemantics.size())
+                .forEach(
+                        pos ->
+                                partitionKeys[pos] =
+                                        ProjectedRowData.from(
+                                                
tableSemantics.get(pos).partitionByColumns()));
     }
 
-    public void setPrefix(RowData input) {
-        ((ProjectedRowData) prefix).replaceRow(input);
+    @Override
+    public void setPrefix(int pos, RowData input) {
+        prefix = partitionKeys[pos].replaceRow(input);
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/PassThroughCollectorBase.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/PassThroughCollectorBase.java
index e8cc1015f85..5b032c3891f 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/PassThroughCollectorBase.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/PassThroughCollectorBase.java
@@ -34,25 +34,30 @@ import org.apache.flink.types.RowKind;
 @Internal
 public abstract class PassThroughCollectorBase extends 
StreamRecordCollector<RowData> {
 
-    private final JoinedRowData withPrefix;
+    private final RepeatedRowData repeatedPrefix;
+    private final JoinedRowData withFunctionOutput;
     private final JoinedRowData withRowtime;
     private final ChangelogMode changelogMode;
 
-    private RowData rowtime;
     protected RowData prefix;
 
+    private RowData rowtime;
+
     public PassThroughCollectorBase(
-            Output<StreamRecord<RowData>> output, ChangelogMode changelogMode) 
{
+            Output<StreamRecord<RowData>> output,
+            ChangelogMode changelogMode,
+            int prefixRepetition) {
         super(output);
         this.changelogMode = changelogMode;
-        // constructs a flattened row of [[prefix | function output] | rowtime]
-        withPrefix = new JoinedRowData();
+        // constructs a flattened row of [[[prefix]{1,n} | function output] | 
rowtime]
+        repeatedPrefix = new RepeatedRowData(prefixRepetition);
+        withFunctionOutput = new JoinedRowData();
         withRowtime = new JoinedRowData();
         prefix = GenericRowData.of();
         rowtime = GenericRowData.of();
     }
 
-    public abstract void setPrefix(RowData input);
+    public abstract void setPrefix(int pos, RowData input);
 
     public void setRowtime(Long time) {
         rowtime = GenericRowData.of(TimestampData.fromEpochMillis(time));
@@ -60,8 +65,9 @@ public abstract class PassThroughCollectorBase extends 
StreamRecordCollector<Row
 
     @Override
     public void collect(RowData functionOutput) {
-        withPrefix.replace(prefix, functionOutput);
-        withRowtime.replace(withPrefix, rowtime);
+        repeatedPrefix.replace(prefix);
+        withFunctionOutput.replace(repeatedPrefix, functionOutput);
+        withRowtime.replace(withFunctionOutput, rowtime);
         // Forward supported change flags.
         final RowKind kind = functionOutput.getRowKind();
         if (!changelogMode.contains(kind)) {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessRowTableOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessRowTableOperator.java
new file mode 100644
index 00000000000..075f589e926
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessRowTableOperator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.process;
+
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.runtime.generated.HashFunction;
+import org.apache.flink.table.runtime.generated.ProcessTableRunner;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Implementation of {@link OneInputStreamOperator} for {@link 
ProcessTableFunction} with at most
+ * one table with row semantics.
+ *
+ * <p>This class is required because {@link MultipleInputStreamOperator} has 
issues with chaining
+ * when the transformation is not keyed.
+ */
+public class ProcessRowTableOperator extends AbstractProcessTableOperator
+        implements OneInputStreamOperator<RowData, RowData> {
+
+    private final @Nullable TableSemantics inputSemantics;
+
+    public ProcessRowTableOperator(
+            StreamOperatorParameters<RowData> parameters,
+            List<RuntimeTableSemantics> tableSemantics,
+            List<RuntimeStateInfo> stateInfos,
+            ProcessTableRunner processTableRunner,
+            HashFunction[] stateHashCode,
+            RecordEqualiser[] stateEquals,
+            RuntimeChangelogMode producedChangelogMode) {
+        super(
+                parameters,
+                tableSemantics,
+                stateInfos,
+                processTableRunner,
+                stateHashCode,
+                stateEquals,
+                producedChangelogMode);
+        if (tableSemantics.isEmpty()) {
+            inputSemantics = null;
+        } else {
+            inputSemantics = tableSemantics.get(0);
+        }
+    }
+
+    @Override
+    public void setKeyContextElement1(StreamRecord<?> record) {
+        // not applicable
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        if (inputSemantics != null) {
+            processTableRunner.ingestTableEvent(0, element.getValue(), 
inputSemantics.timeColumn());
+        }
+        processTableRunner.processEval();
+    }
+
+    @Override
+    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws 
Exception {
+        super.processWatermarkStatus(watermarkStatus, 1);
+    }
+
+    @Override
+    public void processLatencyMarker(LatencyMarker latencyMarker) throws 
Exception {
+        super.reportOrForwardLatencyMarker(latencyMarker);
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperator.java
new file mode 100644
index 00000000000..948464d5f3f
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.process;
+
+import org.apache.flink.streaming.api.operators.AbstractInput;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.runtime.generated.HashFunction;
+import org.apache.flink.table.runtime.generated.ProcessTableRunner;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Implementation of {@link MultipleInputStreamOperator} for {@link 
ProcessTableFunction} with at
+ * least one table with set semantics.
+ */
+public class ProcessSetTableOperator extends AbstractProcessTableOperator
+        implements MultipleInputStreamOperator<RowData> {
+
+    public ProcessSetTableOperator(
+            StreamOperatorParameters<RowData> parameters,
+            List<RuntimeTableSemantics> tableSemantics,
+            List<RuntimeStateInfo> stateInfos,
+            ProcessTableRunner processTableRunner,
+            HashFunction[] stateHashCode,
+            RecordEqualiser[] stateEquals,
+            RuntimeChangelogMode producedChangelogMode) {
+        super(
+                parameters,
+                tableSemantics,
+                stateInfos,
+                processTableRunner,
+                stateHashCode,
+                stateEquals,
+                producedChangelogMode);
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    public List<Input> getInputs() {
+        return IntStream.range(0, tableSemantics.size())
+                .mapToObj(
+                        inputIdx -> {
+                            final TableSemantics inputSemantics = 
tableSemantics.get(inputIdx);
+                            final int timeColumn = inputSemantics.timeColumn();
+                            return new AbstractInput<RowData, RowData>(this, 
inputIdx + 1) {
+                                @Override
+                                public void 
processElement(StreamRecord<RowData> element)
+                                        throws Exception {
+                                    processTableRunner.ingestTableEvent(
+                                            inputIdx, element.getValue(), 
timeColumn);
+                                    processTableRunner.processEval();
+                                }
+                            };
+                        })
+                .collect(Collectors.toList());
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessTableOperatorFactory.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessTableOperatorFactory.java
index 68373af78e5..d0eed417f11 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessTableOperatorFactory.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessTableOperatorFactory.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.runtime.operators.process;
 
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.table.data.RowData;
@@ -30,18 +29,15 @@ import 
org.apache.flink.table.runtime.generated.HashFunction;
 import org.apache.flink.table.runtime.generated.ProcessTableRunner;
 import org.apache.flink.table.runtime.generated.RecordEqualiser;
 
-import javax.annotation.Nullable;
-
 import java.util.Arrays;
 import java.util.List;
 
-/** The factory of {@link ProcessTableOperator}. */
-public class ProcessTableOperatorFactory extends 
AbstractStreamOperatorFactory<RowData>
-        implements OneInputStreamOperatorFactory<RowData, RowData> {
+/** The factory for subclasses of {@link AbstractProcessTableOperator}. */
+public class ProcessTableOperatorFactory extends 
AbstractStreamOperatorFactory<RowData> {
 
     private static final long serialVersionUID = 1L;
 
-    private final @Nullable RuntimeTableSemantics tableSemantics;
+    private final List<RuntimeTableSemantics> tableSemantics;
     private final List<RuntimeStateInfo> stateInfos;
     private final GeneratedProcessTableRunner generatedProcessTableRunner;
     private final GeneratedHashFunction[] generatedStateHashCode;
@@ -49,7 +45,7 @@ public class ProcessTableOperatorFactory extends 
AbstractStreamOperatorFactory<R
     private final RuntimeChangelogMode producedChangelogMode;
 
     public ProcessTableOperatorFactory(
-            @Nullable RuntimeTableSemantics tableSemantics,
+            List<RuntimeTableSemantics> tableSemantics,
             List<RuntimeStateInfo> stateInfos,
             GeneratedProcessTableRunner generatedProcessTableRunner,
             GeneratedHashFunction[] generatedStateHashCode,
@@ -76,19 +72,34 @@ public class ProcessTableOperatorFactory extends 
AbstractStreamOperatorFactory<R
                 Arrays.stream(generatedStateEquals)
                         .map(g -> g.newInstance(classLoader))
                         .toArray(RecordEqualiser[]::new);
-        return new ProcessTableOperator(
-                parameters,
-                tableSemantics,
-                stateInfos,
-                runner,
-                stateHashCode,
-                stateEquals,
-                producedChangelogMode);
+        if 
(tableSemantics.stream().anyMatch(RuntimeTableSemantics::hasSetSemantics)) {
+            return new ProcessSetTableOperator(
+                    parameters,
+                    tableSemantics,
+                    stateInfos,
+                    runner,
+                    stateHashCode,
+                    stateEquals,
+                    producedChangelogMode);
+        } else {
+            return new ProcessRowTableOperator(
+                    parameters,
+                    tableSemantics,
+                    stateInfos,
+                    runner,
+                    stateHashCode,
+                    stateEquals,
+                    producedChangelogMode);
+        }
     }
 
     @Override
     @SuppressWarnings("rawtypes")
     public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader 
classLoader) {
-        return ProcessTableOperator.class;
+        if 
(tableSemantics.stream().anyMatch(RuntimeTableSemantics::hasSetSemantics)) {
+            return ProcessSetTableOperator.class;
+        } else {
+            return ProcessRowTableOperator.class;
+        }
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RepeatedRowData.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RepeatedRowData.java
new file mode 100644
index 00000000000..3cdfa23a00b
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RepeatedRowData.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.process;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+/** A row that repeats the columns of a given row by the given count. */
+public class RepeatedRowData implements RowData {
+
+    private final int count;
+    private RowData row;
+
+    public RepeatedRowData(int count) {
+        this.count = count;
+    }
+
+    /**
+     * Replaces the {@link RowData} backing this {@link RepeatedRowData}.
+     *
+     * <p>This method replaces the backing rows in place and does not return a 
new object. This is
+     * done for performance reasons.
+     */
+    public RepeatedRowData replace(RowData row) {
+        this.row = row;
+        return this;
+    }
+
+    @Override
+    public int getArity() {
+        return row.getArity() * count;
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return row.getRowKind();
+    }
+
+    @Override
+    public void setRowKind(RowKind kind) {
+        row.setRowKind(kind);
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        return row.isNullAt(pos / count);
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return row.getBoolean(pos / count);
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        return row.getByte(pos / count);
+    }
+
+    @Override
+    public short getShort(int pos) {
+        return row.getShort(pos / count);
+    }
+
+    @Override
+    public int getInt(int pos) {
+        return row.getInt(pos / count);
+    }
+
+    @Override
+    public long getLong(int pos) {
+        return row.getLong(pos / count);
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return row.getFloat(pos / count);
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return row.getDouble(pos / count);
+    }
+
+    @Override
+    public StringData getString(int pos) {
+        return row.getString(pos / count);
+    }
+
+    @Override
+    public DecimalData getDecimal(int pos, int precision, int scale) {
+        return row.getDecimal(pos / count, precision, scale);
+    }
+
+    @Override
+    public TimestampData getTimestamp(int pos, int precision) {
+        return row.getTimestamp(pos / count, precision);
+    }
+
+    @Override
+    public <T> RawValueData<T> getRawValue(int pos) {
+        return row.getRawValue(pos / count);
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        return row.getBinary(pos / count);
+    }
+
+    @Override
+    public ArrayData getArray(int pos) {
+        return row.getArray(pos / count);
+    }
+
+    @Override
+    public MapData getMap(int pos) {
+        return row.getMap(pos / count);
+    }
+
+    @Override
+    public RowData getRow(int pos, int numFields) {
+        return row.getRow(pos / count, numFields);
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java
index 378f494b79e..c5d4860d8f4 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.functions.TableSemantics;
 import org.apache.flink.table.types.DataType;
 
 import java.io.Serializable;
-import java.util.List;
 import java.util.Optional;
 
 /**
@@ -108,11 +107,6 @@ public class RuntimeTableSemantics implements 
TableSemantics, Serializable {
         return timeColumn;
     }
 
-    @Override
-    public List<String> coPartitionArgs() {
-        return List.of();
-    }
-
     @Override
     public Optional<ChangelogMode> changelogMode() {
         return Optional.of(getChangelogMode());


Reply via email to