This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5749a07b072 [FLINK-37076][table-planner] Support PTFs until ExecNode 
level
5749a07b072 is described below

commit 5749a07b072803a6f106e287f40db069abc3feaf
Author: Timo Walther <twal...@apache.org>
AuthorDate: Tue Jan 21 15:36:31 2025 +0100

    [FLINK-37076][table-planner] Support PTFs until ExecNode level
    
    This closes #26019.
---
 .../flink/table/annotation/ArgumentTrait.java      |  31 +-
 .../table/types/inference/StaticArgument.java      |  46 ++-
 .../table/types/inference/StaticArgumentTrait.java |   5 +-
 .../table/types/inference/SystemTypeInference.java |  12 +-
 .../table/planner/calcite/RexTableArgCall.java     |  26 ++
 .../plan/nodes/exec/ExecNodeGraphGenerator.java    |  25 +-
 .../nodes/exec/serde/RexNodeJsonDeserializer.java  |  28 ++
 .../nodes/exec/serde/RexNodeJsonSerializer.java    |  27 +-
 .../stream/StreamExecProcessTableFunction.java     | 125 ++++++++
 .../logical/FlinkLogicalTableFunctionScan.java     | 128 ++++++++
 .../stream/StreamPhysicalProcessTableFunction.java | 219 +++++++++++++
 .../StreamPhysicalProcessTableFunctionRule.java    | 135 ++++++++
 ...BatchPhysicalConstantTableFunctionScanRule.java |  91 +++---
 ...treamPhysicalConstantTableFunctionScanRule.java |  77 +++--
 .../planner/plan/utils/ExecNodeMetadataUtil.java   |   4 +-
 .../flink/table/planner/utils/ShortcutUtils.java   |   5 +
 .../logical/FlinkLogicalTableFunctionScan.scala    | 135 --------
 .../FlinkChangelogModeInferenceProgram.scala       |  48 +++
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |   3 +
 .../nodes/exec/serde/RexNodeJsonSerdeTest.java     |  13 +-
 .../plan/stream/sql/ProcessTableFunctionTest.java  | 149 +++++++--
 .../plan/stream/sql/ProcessTableFunctionTest.xml   | 340 ++++++++++++++++++++-
 22 files changed, 1398 insertions(+), 274 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java
index 8704a3c440b..4e63b261e7d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.annotation;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.functions.ProcessTableFunction;
 import org.apache.flink.table.types.inference.StaticArgumentTrait;
+import org.apache.flink.types.RowKind;
 
 /**
  * Declares traits for {@link ArgumentHint}. They enable basic validation by 
the framework.
@@ -78,6 +79,8 @@ public enum ArgumentTrait {
     /**
      * Defines that a PARTITION BY clause is optional for {@link 
#TABLE_AS_SET}. By default, it is
      * mandatory for improving the parallel execution by distributing the 
table by key.
+     *
+     * <p>Note: This trait is only valid for {@link #TABLE_AS_SET} arguments.
      */
     OPTIONAL_PARTITION_BY(false, StaticArgumentTrait.OPTIONAL_PARTITION_BY),
 
@@ -97,8 +100,34 @@ public enum ArgumentTrait {
      *
      * <p>In case of multiple table arguments, pass-through columns are added 
according to the
      * declaration order in the PTF signature.
+     *
+     * <p>Note: This trait is valid for {@link #TABLE_AS_ROW} and {@link 
#TABLE_AS_SET} arguments.
+     */
+    PASS_COLUMNS_THROUGH(false, StaticArgumentTrait.PASS_COLUMNS_THROUGH),
+
+    /**
+     * Defines that updates are allowed as input to the given table argument. 
By default, a table
+     * argument is insert-only and updates will be rejected.
+     *
+     * <p>Input tables become updating when sub queries such as aggregations 
or outer joins force an
+     * incremental computation. For example, the following query only works if 
the function is able
+     * to digest retraction messages:
+     *
+     * <pre>
+     *     // Changes +[1] followed by -U[1], +U[2], -U[2], +U[3] will enter 
the function
+     *     WITH UpdatingTable AS (
+     *       SELECT COUNT(*) FROM (VALUES 1, 2, 3)
+     *     )
+     *     SELECT * FROM f(tableArg => TABLE UpdatingTable)
+     * </pre>
+     *
+     * <p>If updates should be supported, ensure that the data type of the 
table argument is chosen
+     * in a way that it can encode changes. In other words: choose a row type 
that exposes the
+     * {@link RowKind} change flag.
+     *
+     * <p>Note: This trait is valid for {@link #TABLE_AS_ROW} and {@link 
#TABLE_AS_SET} arguments.
      */
-    PASS_COLUMNS_THROUGH(false, StaticArgumentTrait.PASS_COLUMNS_THROUGH);
+    SUPPORT_UPDATES(false, StaticArgumentTrait.SUPPORT_UPDATES);
 
     private final boolean isRoot;
     private final StaticArgumentTrait staticTrait;
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
index fc7fd86abb2..73f8afbf60f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.NullType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.StructuredType;
@@ -273,29 +274,46 @@ public class StaticArgument {
         }
     }
 
-    void checkTableType() {
+    private void checkTableType() {
         if (!traits.contains(StaticArgumentTrait.TABLE)) {
             return;
         }
-        if (dataType == null
-                && conversionClass != null
-                && !DUMMY_ROW_TYPE.supportsInputConversion(conversionClass)) {
+        checkPolymorphicTableType();
+        checkTypedTableType();
+    }
+
+    private void checkPolymorphicTableType() {
+        if (dataType != null || conversionClass == null) {
+            return;
+        }
+        if (!DUMMY_ROW_TYPE.supportsInputConversion(conversionClass)) {
             throw new ValidationException(
                     String.format(
                             "Invalid conversion class '%s' for argument '%s'. "
                                     + "Polymorphic, untyped table arguments 
must use a row class.",
                             conversionClass.getName(), name));
         }
-        if (dataType != null) {
-            final LogicalType type = dataType.getLogicalType();
-            if (traits.contains(StaticArgumentTrait.TABLE)
-                    && !LogicalTypeChecks.isCompositeType(type)) {
-                throw new ValidationException(
-                        String.format(
-                                "Invalid data type '%s' for table argument 
'%s'. "
-                                        + "Typed table arguments must use a 
composite type (i.e. row or structured type).",
-                                type, name));
-            }
+    }
+
+    private void checkTypedTableType() {
+        if (dataType == null) {
+            return;
+        }
+        final LogicalType type = dataType.getLogicalType();
+        if (traits.contains(StaticArgumentTrait.TABLE)
+                && !LogicalTypeChecks.isCompositeType(type)) {
+            throw new ValidationException(
+                    String.format(
+                            "Invalid data type '%s' for table argument '%s'. "
+                                    + "Typed table arguments must use a 
composite type (i.e. row or structured type).",
+                            type, name));
+        }
+        if (is(StaticArgumentTrait.SUPPORT_UPDATES) && 
!type.is(LogicalTypeRoot.ROW)) {
+            throw new ValidationException(
+                    String.format(
+                            "Invalid data type '%s' for table argument '%s'. "
+                                    + "Table arguments that support updates 
must use a row type.",
+                            type, name));
         }
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
index b9f4d4c71fb..fbcc7be7879 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
@@ -37,8 +37,9 @@ public enum StaticArgumentTrait {
     MODEL(),
     TABLE_AS_ROW(TABLE),
     TABLE_AS_SET(TABLE),
-    OPTIONAL_PARTITION_BY(TABLE_AS_SET),
-    PASS_COLUMNS_THROUGH(TABLE);
+    PASS_COLUMNS_THROUGH(TABLE),
+    SUPPORT_UPDATES(TABLE),
+    OPTIONAL_PARTITION_BY(TABLE_AS_SET);
 
     private final Set<StaticArgumentTrait> requirements;
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
index 6c64b32ec25..d3b18b3ab12 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
@@ -81,6 +81,10 @@ public class SystemTypeInference {
         return builder.build();
     }
 
+    public static boolean isValidUidForProcessTableFunction(String uid) {
+        return UID_FORMAT.test(uid);
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     private static void checkScalarArgsOnly(List<StaticArgument> defaultArgs) {
@@ -283,7 +287,7 @@ public class SystemTypeInference {
                                 + "that is not overloaded and doesn't contain 
varargs.");
             }
 
-            checkUidColumn(callContext);
+            checkUidArg(callContext);
             checkMultipleTableArgs(callContext);
             checkTableArgTraits(staticArgs, callContext);
 
@@ -297,16 +301,16 @@ public class SystemTypeInference {
             return origin.getExpectedSignatures(definition);
         }
 
-        private static void checkUidColumn(CallContext callContext) {
+        private static void checkUidArg(CallContext callContext) {
             final List<DataType> args = callContext.getArgumentDataTypes();
 
             // Verify the uid format if provided
             int uidPos = args.size() - 1;
             if (!callContext.isArgumentNull(uidPos)) {
                 final String uid = callContext.getArgumentValue(uidPos, 
String.class).orElse("");
-                if (!UID_FORMAT.test(uid)) {
+                if (!isValidUidForProcessTableFunction(uid)) {
                     throw new ValidationException(
-                            "Invalid unique identifier for process table 
function. The 'uid' argument "
+                            "Invalid unique identifier for process table 
function. The `uid` argument "
                                     + "must be a string literal that follows 
the pattern [a-zA-Z_][a-zA-Z-_0-9]*. "
                                     + "But found: "
                                     + uid);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexTableArgCall.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexTableArgCall.java
index 369f392042b..adbcd0633a3 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexTableArgCall.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexTableArgCall.java
@@ -28,6 +28,7 @@ import org.apache.calcite.rex.RexNode;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 /**
@@ -96,4 +97,29 @@ public class RexTableArgCall extends RexCall {
     public RexTableArgCall copy(RelDataType type, int[] partitionKeys, int[] 
orderKeys) {
         return new RexTableArgCall(type, inputIndex, partitionKeys, orderKeys);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        final RexTableArgCall that = (RexTableArgCall) o;
+        return inputIndex == that.inputIndex
+                && Arrays.equals(partitionKeys, that.partitionKeys)
+                && Arrays.equals(orderKeys, that.orderKeys);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(super.hashCode(), inputIndex);
+        result = 31 * result + Arrays.hashCode(partitionKeys);
+        result = 31 * result + Arrays.hashCode(orderKeys);
+        return result;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java
index beadfc4f125..3d9e9dbc26c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java
@@ -19,15 +19,19 @@
 package org.apache.flink.table.planner.plan.nodes.exec;
 
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
 import 
org.apache.flink.table.planner.plan.nodes.common.CommonIntermediateTableScan;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction;
 import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel;
 
 import org.apache.calcite.rel.RelNode;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * A generator that generates a {@link ExecNode} graph from a graph of {@link 
FlinkPhysicalRel}s.
@@ -43,9 +47,11 @@ import java.util.Map;
 public class ExecNodeGraphGenerator {
 
     private final Map<FlinkPhysicalRel, ExecNode<?>> visitedRels;
+    private final Set<String> visitedProcessTableFunctionUids;
 
     public ExecNodeGraphGenerator() {
         this.visitedRels = new IdentityHashMap<>();
+        this.visitedProcessTableFunctionUids = new HashSet<>();
     }
 
     public ExecNodeGraph generate(List<FlinkPhysicalRel> relNodes, boolean 
isCompiled) {
@@ -78,8 +84,25 @@ public class ExecNodeGraphGenerator {
             
inputEdges.add(ExecEdge.builder().source(inputNode).target(execNode).build());
         }
         execNode.setInputEdges(inputEdges);
-
+        checkUidForProcessTableFunction(execNode);
         visitedRels.put(rel, execNode);
         return execNode;
     }
+
+    private void checkUidForProcessTableFunction(ExecNode<?> execNode) {
+        if (!(execNode instanceof StreamExecProcessTableFunction)) {
+            return;
+        }
+        final String uid = ((StreamExecProcessTableFunction) 
execNode).getUid();
+        if (visitedProcessTableFunctionUids.contains(uid)) {
+            throw new ValidationException(
+                    String.format(
+                            "Duplicate unique identifier '%s' detected among 
process table functions. "
+                                    + "Make sure that all PTF calls have an 
identifier defined that is globally unique. "
+                                    + "Please provide a custom identifier 
using the implicit `uid` argument. "
+                                    + "For example: myFunction(..., uid => 
'my-id')",
+                            uid));
+        }
+        visitedProcessTableFunctionUids.add(uid);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index c5111219283..89a30ff9a40 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.functions.FunctionIdentifier;
 import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.functions.UserDefinedFunctionHelper;
+import org.apache.flink.table.planner.calcite.RexTableArgCall;
 import 
org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction;
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
 import org.apache.flink.table.planner.functions.sql.BuiltInSqlOperator;
@@ -93,6 +94,8 @@ import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSe
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_NAME;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_NULL_AS;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_OPERANDS;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_ORDER_KEYS;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_PARTITION_KEYS;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_RANGES;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SARG;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SQL_KIND;
@@ -107,6 +110,7 @@ import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSe
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_INPUT_REF;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_LITERAL;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_PATTERN_INPUT_REF;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_TABLE_ARG_CALL;
 import static 
org.apache.flink.table.planner.typeutils.SymbolUtil.serializableToCalcite;
 
 /**
@@ -144,6 +148,8 @@ final class RexNodeJsonDeserializer extends 
StdDeserializer<RexNode> {
                 return deserializeCorrelVariable(jsonNode, serdeContext);
             case KIND_PATTERN_INPUT_REF:
                 return deserializePatternFieldRef(jsonNode, serdeContext);
+            case KIND_TABLE_ARG_CALL:
+                return deserializeTableArgCall(jsonNode, serdeContext);
             case KIND_CALL:
                 return deserializeCall(jsonNode, serdeContext);
             default:
@@ -313,6 +319,28 @@ final class RexNodeJsonDeserializer extends 
StdDeserializer<RexNode> {
         return serdeContext.getRexBuilder().makePatternFieldRef(alpha, 
fieldType, inputIndex);
     }
 
+    private static RexNode deserializeTableArgCall(JsonNode jsonNode, 
SerdeContext serdeContext) {
+        final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+        final RelDataType callType =
+                RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, 
serdeContext);
+
+        final int inputIndex = 
jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue();
+
+        final JsonNode partitionKeysNode = 
jsonNode.required(FIELD_NAME_PARTITION_KEYS);
+        final int[] partitionKeys = new int[partitionKeysNode.size()];
+        for (int i = 0; i < partitionKeysNode.size(); ++i) {
+            partitionKeys[i] = partitionKeysNode.get(i).asInt();
+        }
+
+        final JsonNode orderKeysNode = 
jsonNode.required(FIELD_NAME_ORDER_KEYS);
+        final int[] orderKeys = new int[orderKeysNode.size()];
+        for (int i = 0; i < orderKeysNode.size(); ++i) {
+            orderKeys[i] = orderKeysNode.get(i).asInt();
+        }
+
+        return new RexTableArgCall(callType, inputIndex, partitionKeys, 
orderKeys);
+    }
+
     private static RexNode deserializeCall(JsonNode jsonNode, SerdeContext 
serdeContext)
             throws IOException {
         final SqlOperator operator = deserializeSqlOperator(jsonNode, 
serdeContext);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
index f559f6445e5..facb4a11158 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.table.functions.ScalarFunctionDefinition;
 import org.apache.flink.table.functions.TableAggregateFunctionDefinition;
 import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.calcite.RexTableArgCall;
 import 
org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction;
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
 import org.apache.flink.table.planner.functions.sql.BuiltInSqlOperator;
@@ -81,10 +82,10 @@ final class RexNodeJsonSerializer extends 
StdSerializer<RexNode> {
     static final String FIELD_NAME_VALUE = "value";
     static final String FIELD_NAME_TYPE = "type";
     static final String FIELD_NAME_NAME = "name";
+    static final String FIELD_NAME_INPUT_INDEX = "inputIndex";
 
     // INPUT_REF
     static final String KIND_INPUT_REF = "INPUT_REF";
-    static final String FIELD_NAME_INPUT_INDEX = "inputIndex";
 
     // LITERAL
     static final String KIND_LITERAL = "LITERAL";
@@ -122,6 +123,11 @@ final class RexNodeJsonSerializer extends 
StdSerializer<RexNode> {
     static final String FIELD_NAME_SQL_KIND = "sqlKind";
     static final String FIELD_NAME_CLASS = "class";
 
+    // TABLE_ARG_CALL
+    static final String KIND_TABLE_ARG_CALL = "TABLE_ARG_CALL";
+    static final String FIELD_NAME_PARTITION_KEYS = "partitionKeys";
+    static final String FIELD_NAME_ORDER_KEYS = "orderKeys";
+
     RexNodeJsonSerializer() {
         super(RexNode.class);
     }
@@ -154,7 +160,10 @@ final class RexNodeJsonSerializer extends 
StdSerializer<RexNode> {
                         (RexPatternFieldRef) rexNode, jsonGenerator, 
serializerProvider);
                 break;
             default:
-                if (rexNode instanceof RexCall) {
+                if (rexNode instanceof RexTableArgCall) {
+                    serializeTableArgCall(
+                            (RexTableArgCall) rexNode, jsonGenerator, 
serializerProvider);
+                } else if (rexNode instanceof RexCall) {
                     serializeCall(
                             (RexCall) rexNode,
                             jsonGenerator,
@@ -323,6 +332,20 @@ final class RexNodeJsonSerializer extends 
StdSerializer<RexNode> {
         gen.writeEndObject();
     }
 
+    private static void serializeTableArgCall(
+            RexTableArgCall tableArgCall, JsonGenerator gen, 
SerializerProvider serializerProvider)
+            throws IOException {
+        gen.writeStartObject();
+        gen.writeStringField(FIELD_NAME_KIND, KIND_TABLE_ARG_CALL);
+        gen.writeNumberField(FIELD_NAME_INPUT_INDEX, 
tableArgCall.getInputIndex());
+        gen.writeFieldName(FIELD_NAME_PARTITION_KEYS);
+        gen.writeArray(tableArgCall.getPartitionKeys(), 0, 
tableArgCall.getPartitionKeys().length);
+        gen.writeFieldName(FIELD_NAME_ORDER_KEYS);
+        gen.writeArray(tableArgCall.getOrderKeys(), 0, 
tableArgCall.getOrderKeys().length);
+        serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, 
tableArgCall.getType(), gen);
+        gen.writeEndObject();
+    }
+
     private static void serializeCall(
             RexCall call,
             JsonGenerator gen,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
new file mode 100644
index 00000000000..50a8a0f6108
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import 
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
+import org.apache.flink.table.types.logical.RowType;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+
+/**
+ * {@link StreamExecNode} for {@link ProcessTableFunction}.
+ *
+ * <p>A process table function (PTF) maps zero, one, or multiple tables to 
zero, one, or multiple
+ * rows. PTFs enable implementing user-defined operators that can be as 
feature-rich as built-in
+ * operations. PTFs have access to Flink's managed state, event-time and timer 
services, underlying
+ * table changelogs, and can take multiple ordered and/or partitioned tables 
to produce a new table.
+ */
+@ExecNodeMetadata(
+        name = "stream-exec-process-table-function",
+        version = 1,
+        producedTransformations = 
StreamExecProcessTableFunction.PROCESS_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v2_0,
+        minStateVersion = FlinkVersion.v2_0)
+public class StreamExecProcessTableFunction extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData>, 
SingleTransformationTranslator<RowData> {
+
+    public static final String PROCESS_TRANSFORMATION = "process";
+
+    public static final String FIELD_NAME_UID = "uid";
+    public static final String FIELD_NAME_FUNCTION_CALL = "functionCall";
+    public static final String FIELD_NAME_INPUT_CHANGELOG_MODES = 
"inputChangelogModes";
+
+    @JsonProperty(FIELD_NAME_UID)
+    private final String uid;
+
+    @JsonProperty(FIELD_NAME_FUNCTION_CALL)
+    private final RexCall invocation;
+
+    @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODES)
+    private final List<ChangelogMode> inputChangelogModes;
+
+    public StreamExecProcessTableFunction(
+            ReadableConfig tableConfig,
+            List<InputProperty> inputProperties,
+            RowType outputType,
+            String description,
+            String uid,
+            RexCall invocation,
+            List<ChangelogMode> inputChangelogModes) {
+        this(
+                ExecNodeContext.newNodeId(),
+                
ExecNodeContext.newContext(StreamExecProcessTableFunction.class),
+                ExecNodeContext.newPersistedConfig(
+                        StreamExecProcessTableFunction.class, tableConfig),
+                inputProperties,
+                outputType,
+                description,
+                uid,
+                invocation,
+                inputChangelogModes);
+    }
+
+    @JsonCreator
+    public StreamExecProcessTableFunction(
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig 
persistedConfig,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description,
+            @JsonProperty(FIELD_NAME_UID) String uid,
+            @JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation,
+            @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODES)
+                    List<ChangelogMode> inputChangelogModes) {
+        super(id, context, persistedConfig, inputProperties, outputType, 
description);
+        this.uid = uid;
+        this.invocation = (RexCall) invocation;
+        this.inputChangelogModes = inputChangelogModes;
+    }
+
+    public String getUid() {
+        return uid;
+    }
+
+    @Override
+    protected Transformation<RowData> translateToPlanInternal(
+            PlannerBase planner, ExecNodeConfig config) {
+        throw new TableException("Process table function is not fully 
supported yet.");
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java
new file mode 100644
index 00000000000..19287e08287
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.logical;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.TemporalTableFunction;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableFunctionScan;
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
+import org.apache.calcite.rel.metadata.RelColumnMapping;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Subclass of {@link TableFunctionScan} that is a relational expression which 
calls a {@link
+ * FunctionKind#TABLE} or {@link FunctionKind#PROCESS_TABLE} in Flink.
+ */
+@Internal
+public class FlinkLogicalTableFunctionScan extends TableFunctionScan 
implements FlinkLogicalRel {
+
+    public static final Converter CONVERTER =
+            new Converter(
+                    ConverterRule.Config.INSTANCE.withConversion(
+                            LogicalTableFunctionScan.class,
+                            Convention.NONE,
+                            FlinkConventions.LOGICAL(),
+                            "FlinkLogicalTableFunctionScanConverter"));
+
+    public FlinkLogicalTableFunctionScan(
+            RelOptCluster cluster,
+            RelTraitSet traitSet,
+            List<RelNode> inputs,
+            RexNode rexCall,
+            @Nullable Type elementType,
+            RelDataType rowType,
+            @Nullable Set<RelColumnMapping> columnMappings) {
+        super(cluster, traitSet, inputs, rexCall, elementType, rowType, 
columnMappings);
+    }
+
+    @Override
+    public TableFunctionScan copy(
+            RelTraitSet traitSet,
+            List<RelNode> inputs,
+            RexNode rexCall,
+            @Nullable Type elementType,
+            RelDataType rowType,
+            @Nullable Set<RelColumnMapping> columnMappings) {
+        return new FlinkLogicalTableFunctionScan(
+                getCluster(), traitSet, inputs, rexCall, elementType, rowType, 
columnMappings);
+    }
+
+    @Internal
+    public static class Converter extends ConverterRule {
+
+        protected Converter(Config config) {
+            super(config);
+        }
+
+        @Override
+        public boolean matches(RelOptRuleCall call) {
+            final LogicalTableFunctionScan functionScan = call.rel(0);
+            final FunctionDefinition functionDefinition =
+                    
ShortcutUtils.unwrapFunctionDefinition(functionScan.getCall());
+            if (functionDefinition == null) {
+                // For Calcite stack functions
+                return true;
+            }
+            final boolean isTableFunction =
+                    functionDefinition.getKind() == FunctionKind.TABLE
+                            || functionDefinition.getKind() == 
FunctionKind.PROCESS_TABLE;
+            return isTableFunction && !(functionDefinition instanceof 
TemporalTableFunction);
+        }
+
+        @Override
+        public @Nullable RelNode convert(RelNode rel) {
+            final LogicalTableFunctionScan functionScan = 
(LogicalTableFunctionScan) rel;
+            final RelTraitSet traitSet =
+                    
rel.getTraitSet().replace(FlinkConventions.LOGICAL()).simplify();
+            final List<RelNode> newInputs =
+                    functionScan.getInputs().stream()
+                            .map(input -> RelOptRule.convert(input, 
FlinkConventions.LOGICAL()))
+                            .collect(Collectors.toList());
+            final RexCall rexCall = (RexCall) functionScan.getCall();
+            return new FlinkLogicalTableFunctionScan(
+                    functionScan.getCluster(),
+                    traitSet,
+                    newInputs,
+                    rexCall,
+                    functionScan.getElementType(),
+                    functionScan.getRowType(),
+                    functionScan.getColumnMappings());
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
new file mode 100644
index 00000000000..49932263d3c
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ContextResolvedFunction;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.RexTableArgCall;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.types.inference.StaticArgument;
+import org.apache.flink.table.types.inference.StaticArgumentTrait;
+import org.apache.flink.table.types.inference.SystemTypeInference;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;
+
+/**
+ * {@link StreamPhysicalRel} node for {@link ProcessTableFunction}.
+ *
+ * <p>A process table function (PTF) maps zero, one, or multiple tables to 
zero, one, or multiple
+ * rows. PTFs enable implementing user-defined operators that can be as 
feature-rich as built-in
+ * operations. PTFs have access to Flink's managed state, event-time and timer 
services, underlying
+ * table changelogs, and can take multiple ordered and/or partitioned tables 
to produce a new table.
+ */
+public class StreamPhysicalProcessTableFunction extends AbstractRelNode
+        implements StreamPhysicalRel {
+
+    private final FlinkLogicalTableFunctionScan scan;
+    private final String uid;
+
+    private List<RelNode> inputs;
+
+    public StreamPhysicalProcessTableFunction(
+            RelOptCluster cluster,
+            RelTraitSet traitSet,
+            List<RelNode> inputs,
+            FlinkLogicalTableFunctionScan scan,
+            RelDataType rowType) {
+        super(cluster, traitSet);
+        this.inputs = inputs;
+        this.rowType = rowType;
+        this.scan = scan;
+        this.uid = deriveUniqueIdentifier(scan);
+    }
+
+    public StreamPhysicalProcessTableFunction(
+            RelOptCluster cluster,
+            RelTraitSet traitSet,
+            RelNode input,
+            FlinkLogicalTableFunctionScan scan,
+            RelDataType rowType) {
+        this(cluster, traitSet, List.of(input), scan, rowType);
+    }
+
+    @Override
+    public boolean requireWatermark() {
+        // Even if there is no time attribute in the inputs, PTFs can work 
with event-time by taking
+        // the watermark value as timestamp.
+        return true;
+    }
+
+    @Override
+    public List<RelNode> getInputs() {
+        return inputs;
+    }
+
+    @Override
+    public void replaceInput(int ordinalInParent, RelNode p) {
+        final List<RelNode> newInputs = new ArrayList<>(inputs);
+        newInputs.set(ordinalInParent, p);
+        inputs = List.copyOf(newInputs);
+        recomputeDigest();
+    }
+
+    @Override
+    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new StreamPhysicalProcessTableFunction(
+                getCluster(), traitSet, inputs, scan, getRowType());
+    }
+
+    @Override
+    public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
+        final double elementRate = 100.0d * getInputs().size();
+        return planner.getCostFactory().makeCost(elementRate, elementRate, 0);
+    }
+
+    @Override
+    public ExecNode<?> translateToExecNode() {
+        final List<ChangelogMode> inputChangelogModes =
+                getInputs().stream()
+                        .map(StreamPhysicalRel.class::cast)
+                        .map(ChangelogPlanUtils::getChangelogMode)
+                        .map(JavaScalaConversionUtil::toJava)
+                        .map(optional -> 
optional.orElseThrow(IllegalStateException::new))
+                        .collect(Collectors.toList());
+        return new StreamExecProcessTableFunction(
+                unwrapTableConfig(this),
+                getInputs().stream().map(i -> 
InputProperty.DEFAULT).collect(Collectors.toList()),
+                FlinkTypeFactory.toLogicalRowType(rowType),
+                getRelDetailedDescription(),
+                uid,
+                (RexCall) scan.getCall(),
+                inputChangelogModes);
+    }
+
+    @Override
+    public RelWriter explainTerms(RelWriter pw) {
+        super.explainTerms(pw);
+        for (Ord<RelNode> ord : Ord.zip(inputs)) {
+            pw.input("input#" + ord.i, ord.e);
+        }
+        return pw.item("invocation", scan.getCall())
+                .item("uid", uid)
+                .item("select", String.join(",", getRowType().getFieldNames()))
+                .item("rowType", getRowType());
+    }
+
+    @Override
+    protected RelDataType deriveRowType() {
+        return rowType;
+    }
+
+    public List<StaticArgument> getProvidedInputArgs() {
+        final RexCall call = (RexCall) scan.getCall();
+        final List<RexNode> operands = call.getOperands();
+        final BridgingSqlFunction.WithTableFunction function =
+                (BridgingSqlFunction.WithTableFunction) call.getOperator();
+        final List<StaticArgument> declaredArgs =
+                function.getTypeInference()
+                        .getStaticArguments()
+                        .orElseThrow(IllegalStateException::new);
+        // This logic filters out optional tables for which an input is 
missing. It returns tables
+        // in the same order as provided inputs of this RelNode.
+        return Ord.zip(declaredArgs).stream()
+                .filter(arg -> arg.e.is(StaticArgumentTrait.TABLE))
+                .filter(arg -> operands.get(arg.i) instanceof RexTableArgCall)
+                .map(arg -> arg.e)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * An important part of {@link ProcessTableFunction} is the mandatory 
unique identifier. Even if
+     * the PTF has no state entries, state or timers might be added later. So 
a PTF should serve as
+     * an identifiable black box for the optimizer. UIDs ensure that.
+     *
+     * @see SystemTypeInference
+     */
+    private static String deriveUniqueIdentifier(FlinkLogicalTableFunctionScan 
scan) {
+        final RexCall rexCall = (RexCall) scan.getCall();
+        final BridgingSqlFunction.WithTableFunction function =
+                (BridgingSqlFunction.WithTableFunction) rexCall.getOperator();
+        final ContextResolvedFunction resolvedFunction = 
function.getResolvedFunction();
+        final List<RexNode> operands = rexCall.getOperands();
+        // Type inference ensures that uid is always added at the end
+        final RexNode uidRexNode = operands.get(operands.size() - 1);
+        if (uidRexNode.getKind() == SqlKind.DEFAULT) {
+            final String uid =
+                    resolvedFunction
+                            .getIdentifier()
+                            .map(FunctionIdentifier::getFunctionName)
+                            .orElse("");
+            if (!SystemTypeInference.isValidUidForProcessTableFunction(uid)) {
+                throw new ValidationException(
+                        String.format(
+                                "Could not derive a unique identifier for 
process table function '%s'. "
+                                        + "The function's name does not 
qualify for a UID. Please provide "
+                                        + "a custom identifier using the 
implicit `uid` argument. "
+                                        + "For example: myFunction(..., uid => 
'my-id')",
+                                resolvedFunction.asSummaryString()));
+            }
+            return uid;
+        }
+        // Otherwise UID should be correct as it has been checked by 
SystemTypeInference.
+        return RexLiteral.stringValue(uidRexNode);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunctionRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunctionRule.java
new file mode 100644
index 00000000000..fd33940ee9a
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunctionRule.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.planner.calcite.RexTableArgCall;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.TableCharacteristic;
+import org.apache.calcite.sql.TableCharacteristic.Semantics;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Rule to convert a {@link FlinkLogicalTableFunctionScan} with table 
arguments into a {@link
+ * StreamPhysicalProcessTableFunction}.
+ */
+public class StreamPhysicalProcessTableFunctionRule extends ConverterRule {
+
+    public static final StreamPhysicalProcessTableFunctionRule INSTANCE =
+            new StreamPhysicalProcessTableFunctionRule(
+                    Config.INSTANCE.withConversion(
+                            FlinkLogicalTableFunctionScan.class,
+                            FlinkConventions.LOGICAL(),
+                            FlinkConventions.STREAM_PHYSICAL(),
+                            "StreamPhysicalProcessTableFunctionRule"));
+
+    private StreamPhysicalProcessTableFunctionRule(Config config) {
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        final FlinkLogicalTableFunctionScan scan = call.rel(0);
+        if (scan.getInputs().isEmpty()) {
+            // Let StreamPhysicalConstantTableFunctionScanRule take over
+            return false;
+        }
+        final RexCall rexCall = (RexCall) scan.getCall();
+        final FunctionDefinition definition = 
ShortcutUtils.unwrapFunctionDefinition(rexCall);
+        return definition != null && definition.getKind() == 
FunctionKind.PROCESS_TABLE;
+    }
+
+    @Override
+    public @Nullable RelNode convert(RelNode rel) {
+        final FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) rel;
+        final RexCall rexCall = (RexCall) scan.getCall();
+        final BridgingSqlFunction.WithTableFunction function =
+                (BridgingSqlFunction.WithTableFunction) rexCall.getOperator();
+        final List<RexNode> operands = rexCall.getOperands();
+        final List<RelNode> newInputs =
+                applyDistributionOnInputs(function, operands, rel.getInputs());
+        final RelTraitSet providedTraitSet =
+                rel.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL());
+        return new StreamPhysicalProcessTableFunction(
+                scan.getCluster(), providedTraitSet, newInputs, scan, 
scan.getRowType());
+    }
+
+    private static List<RelNode> applyDistributionOnInputs(
+            BridgingSqlFunction.WithTableFunction function,
+            List<RexNode> operands,
+            List<RelNode> inputs) {
+        return Ord.zip(operands).stream()
+                .filter(operand -> operand.e instanceof RexTableArgCall)
+                .map(
+                        tableOperand -> {
+                            final int pos = tableOperand.i;
+                            final RexTableArgCall tableArgCall = 
(RexTableArgCall) tableOperand.e;
+                            final TableCharacteristic tableCharacteristic =
+                                    function.tableCharacteristic(pos);
+                            assert tableCharacteristic != null;
+                            return applyDistributionOnInput(
+                                    tableArgCall,
+                                    tableCharacteristic,
+                                    inputs.get(tableArgCall.getInputIndex()));
+                        })
+                .collect(Collectors.toList());
+    }
+
+    private static RelNode applyDistributionOnInput(
+            RexTableArgCall tableOperand, TableCharacteristic 
tableCharacteristic, RelNode input) {
+        final FlinkRelDistribution requiredDistribution =
+                deriveDistribution(tableOperand, tableCharacteristic);
+        final RelTraitSet requiredTraitSet =
+                input.getCluster()
+                        .getPlanner()
+                        .emptyTraitSet()
+                        .replace(requiredDistribution)
+                        .replace(FlinkConventions.STREAM_PHYSICAL());
+        return RelOptRule.convert(input, requiredTraitSet);
+    }
+
+    private static FlinkRelDistribution deriveDistribution(
+            RexTableArgCall tableOperand, TableCharacteristic 
tableCharacteristic) {
+        if (tableCharacteristic.semantics == Semantics.SET) {
+            final int[] partitionKeys = tableOperand.getPartitionKeys();
+            if (partitionKeys.length == 0) {
+                return FlinkRelDistribution.SINGLETON();
+            }
+            return FlinkRelDistribution.hash(partitionKeys, true);
+        }
+        return FlinkRelDistribution.DEFAULT();
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.java
index d6113743f3e..c88d132b4c3 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.java
@@ -18,16 +18,21 @@
 
 package org.apache.flink.table.planner.plan.rules.physical.batch;
 
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
 import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCorrelate;
 import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalValues;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelRule;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rex.RexUtil;
 import org.immutables.value.Value;
@@ -35,20 +40,21 @@ import org.immutables.value.Value;
 import scala.Option;
 
 /**
- * Converts {@link FlinkLogicalTableFunctionScan} with constant RexCall to
+ * Converts {@link FlinkLogicalTableFunctionScan} with constant parameters. 
Add the rule to support
+ * selecting from a UDF directly, e.g. {@code SELECT * FROM func() as T(c)}.
+ *
+ * <p>For {@link FunctionKind#TABLE}:
  *
  * <pre>
- *                            {@link BatchPhysicalCorrelate}
- *                                   /               \
- * empty {@link BatchPhysicalValuesRule}}     {@link 
FlinkLogicalTableFunctionScan}.
+ *   empty {@link BatchPhysicalValues} -> {@link BatchPhysicalCorrelate}
  * </pre>
  *
- * <p>Add the rule to support select from a UDF directly, such as the 
following SQL: {@code SELECT *
- * FROM LATERAL TABLE(func()) as T(c)}
+ * <p>{@link BatchPhysicalCorrelateRule} powers queries such as {@code SELECT 
* FROM T, LATERAL
+ * TABLE(func()) as T(c)} or {@code SELECT a, c FROM T, LATERAL TABLE(func(a)) 
as T(c)}.
+ *
+ * <p>For {@link FunctionKind#PROCESS_TABLE}:
  *
- * <p>Note: {@link BatchPhysicalCorrelateRule} is responsible for converting a 
reasonable physical
- * plan for the normal correlate query, such as the following SQL: example1: 
{@code SELECT * FROM T,
- * LATERAL TABLE(func()) as T(c) example2: SELECT a, c FROM T, LATERAL 
TABLE(func(a)) as T(c)}
+ * <p>{@link FunctionKind#PROCESS_TABLE} is currently unsupported.
  */
 @Value.Enclosing
 public class BatchPhysicalConstantTableFunctionScanRule
@@ -65,18 +71,17 @@ public class BatchPhysicalConstantTableFunctionScanRule
     }
 
     public boolean matches(RelOptRuleCall call) {
-        FlinkLogicalTableFunctionScan scan = call.rel(0);
-        return RexUtil.isConstant(scan.getCall()) && 
scan.getInputs().isEmpty();
+        final FlinkLogicalTableFunctionScan scan = call.rel(0);
+        return !RexUtil.containsInputRef(scan.getCall()) && 
scan.getInputs().isEmpty();
     }
 
     public void onMatch(RelOptRuleCall call) {
-        FlinkLogicalTableFunctionScan scan = call.rel(0);
-
-        // create correlate left
-        RelOptCluster cluster = scan.getCluster();
-        RelTraitSet traitSet =
+        final FlinkLogicalTableFunctionScan scan = call.rel(0);
+        final RelOptCluster cluster = scan.getCluster();
+        final RelTraitSet traitSet =
                 
call.getPlanner().emptyTraitSet().replace(FlinkConventions.BATCH_PHYSICAL());
-        BatchPhysicalValues values =
+
+        final BatchPhysicalValues values =
                 new BatchPhysicalValues(
                         cluster,
                         traitSet,
@@ -84,35 +89,37 @@ public class BatchPhysicalConstantTableFunctionScanRule
                         cluster.getTypeFactory()
                                 .createStructType(ImmutableList.of(), 
ImmutableList.of()));
 
-        BatchPhysicalCorrelate correlate =
-                new BatchPhysicalCorrelate(
-                        cluster,
-                        traitSet,
-                        values,
-                        scan,
-                        Option.empty(),
-                        scan.getRowType(),
-                        JoinRelType.INNER);
-        call.transformTo(correlate);
+        final FunctionDefinition function = 
ShortcutUtils.unwrapFunctionDefinition(scan.getCall());
+        assert function != null;
+        final RelNode replacement;
+        if (function.getKind() == FunctionKind.TABLE) {
+            replacement =
+                    new BatchPhysicalCorrelate(
+                            cluster,
+                            traitSet,
+                            values,
+                            scan,
+                            Option.empty(),
+                            scan.getRowType(),
+                            JoinRelType.INNER);
+        } else {
+            throw new TableException("Unsupported function for scan:" + 
function.getKind());
+        }
+
+        call.transformTo(replacement);
     }
 
     /** Configuration for {@link BatchPhysicalConstantTableFunctionScanRule}. 
*/
-    @Value.Immutable(singleton = false)
+    @Value.Immutable
     public interface BatchPhysicalConstantTableFunctionScanRuleConfig extends 
RelRule.Config {
-        
BatchPhysicalConstantTableFunctionScanRule.BatchPhysicalConstantTableFunctionScanRuleConfig
-                DEFAULT =
-                        ImmutableBatchPhysicalConstantTableFunctionScanRule
-                                
.BatchPhysicalConstantTableFunctionScanRuleConfig.builder()
-                                .build()
-                                .withOperandSupplier(
-                                        b0 ->
-                                                
b0.operand(FlinkLogicalTableFunctionScan.class)
-                                                        .anyInputs())
-                                
.withDescription("BatchPhysicalConstantTableFunctionScanRule")
-                                .as(
-                                        
BatchPhysicalConstantTableFunctionScanRule
-                                                
.BatchPhysicalConstantTableFunctionScanRuleConfig
-                                                .class);
+        BatchPhysicalConstantTableFunctionScanRuleConfig DEFAULT =
+                ImmutableBatchPhysicalConstantTableFunctionScanRule
+                        
.BatchPhysicalConstantTableFunctionScanRuleConfig.builder()
+                        .build()
+                        .withOperandSupplier(
+                                b0 -> 
b0.operand(FlinkLogicalTableFunctionScan.class).anyInputs())
+                        
.withDescription("BatchPhysicalConstantTableFunctionScanRule")
+                        
.as(BatchPhysicalConstantTableFunctionScanRuleConfig.class);
 
         @Override
         default BatchPhysicalConstantTableFunctionScanRule toRule() {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.java
index 385b34e0457..df177f7bdbc 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.java
@@ -18,16 +18,22 @@
 
 package org.apache.flink.table.planner.plan.rules.physical.stream;
 
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalProcessTableFunction;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalValues;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelRule;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rex.RexUtil;
 import org.immutables.value.Value;
@@ -35,20 +41,26 @@ import org.immutables.value.Value;
 import scala.Option;
 
 /**
- * Converts {@link FlinkLogicalTableFunctionScan} with constant RexCall. To
+ * Converts {@link FlinkLogicalTableFunctionScan} with constant parameters. 
Add the rule to support
+ * selecting from a UDF directly, e.g. {@code SELECT * FROM func() as T(c)}.
+ *
+ * <p>For {@link org.apache.flink.table.functions.FunctionKind#TABLE}:
  *
  * <pre>
- *                           {@link StreamPhysicalCorrelate}
- *                              /                     \
- *       empty {@link StreamPhysicalValues}  {@link 
FlinkLogicalTableFunctionScan}
+ *   empty {@link StreamPhysicalValues} -> {@link StreamPhysicalCorrelate}
  * </pre>
  *
- * <p>Add the rule to support select from a UDF directly, such as the 
following SQL: {@code SELECT *
- * FROM LATERAL TABLE(func()) as T(c)}
+ * <p>{@link StreamPhysicalCorrelateRule} powers queries such as {@code SELECT 
* FROM T, LATERAL
+ * TABLE(func()) as T(c)} or {@code SELECT a, c FROM T, LATERAL TABLE(func(a)) 
as T(c)}.
+ *
+ * <p>For {@link org.apache.flink.table.functions.FunctionKind#PROCESS_TABLE}:
+ *
+ * <pre>
+ *   empty {@link StreamPhysicalValues} -> {@link 
StreamPhysicalProcessTableFunction}
+ * </pre>
  *
- * <p>Note: @{link StreamPhysicalCorrelateRule} is responsible for converting 
a reasonable physical
- * plan for the normal correlate query, such as the following SQL: example1: 
{@code SELECT * FROM T,
- * LATERAL TABLE(func()) as T(c) example2: SELECT a, c FROM T, LATERAL 
TABLE(func(a)) as T(c)}
+ * <p>{@link StreamPhysicalProcessTableFunction} powers queries such as {@code 
SELECT * FROM func(t
+ * => TABLE T)} or {@code SELECT * FROM func(t => TABLE T PARTITION BY k)}.
  */
 @Value.Enclosing
 public class StreamPhysicalConstantTableFunctionScanRule
@@ -67,18 +79,17 @@ public class StreamPhysicalConstantTableFunctionScanRule
     }
 
     public boolean matches(RelOptRuleCall call) {
-        FlinkLogicalTableFunctionScan scan = call.rel(0);
+        final FlinkLogicalTableFunctionScan scan = call.rel(0);
         return !RexUtil.containsInputRef(scan.getCall()) && 
scan.getInputs().isEmpty();
     }
 
     public void onMatch(RelOptRuleCall call) {
-        FlinkLogicalTableFunctionScan scan = call.rel(0);
-
-        // create correlate left
-        RelOptCluster cluster = scan.getCluster();
-        RelTraitSet traitSet =
+        final FlinkLogicalTableFunctionScan scan = call.rel(0);
+        final RelOptCluster cluster = scan.getCluster();
+        final RelTraitSet traitSet =
                 
call.getPlanner().emptyTraitSet().replace(FlinkConventions.STREAM_PHYSICAL());
-        StreamPhysicalValues values =
+
+        final StreamPhysicalValues values =
                 new StreamPhysicalValues(
                         cluster,
                         traitSet,
@@ -86,20 +97,32 @@ public class StreamPhysicalConstantTableFunctionScanRule
                         cluster.getTypeFactory()
                                 .createStructType(ImmutableList.of(), 
ImmutableList.of()));
 
-        StreamPhysicalCorrelate correlate =
-                new StreamPhysicalCorrelate(
-                        cluster,
-                        traitSet,
-                        values,
-                        scan,
-                        Option.empty(),
-                        scan.getRowType(),
-                        JoinRelType.INNER);
-        call.transformTo(correlate);
+        final FunctionDefinition function = 
ShortcutUtils.unwrapFunctionDefinition(scan.getCall());
+        assert function != null;
+        final RelNode replacement;
+        if (function.getKind() == FunctionKind.TABLE) {
+            replacement =
+                    new StreamPhysicalCorrelate(
+                            cluster,
+                            traitSet,
+                            values,
+                            scan,
+                            Option.empty(),
+                            scan.getRowType(),
+                            JoinRelType.INNER);
+        } else if (function.getKind() == FunctionKind.PROCESS_TABLE) {
+            replacement =
+                    new StreamPhysicalProcessTableFunction(
+                            cluster, traitSet, values, scan, 
scan.getRowType());
+        } else {
+            throw new TableException("Unsupported function for scan:" + 
function.getKind());
+        }
+
+        call.transformTo(replacement);
     }
 
     /** Configuration for {@link StreamPhysicalConstantTableFunctionScanRule}. 
*/
-    @Value.Immutable(singleton = false)
+    @Value.Immutable
     public interface StreamPhysicalConstantTableFunctionScanRuleConfig extends 
RelRule.Config {
         StreamPhysicalConstantTableFunctionScanRule
                         .StreamPhysicalConstantTableFunctionScanRuleConfig
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index a19fc0dbaf9..484d0883be1 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -76,6 +76,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMiniBatch
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMultipleInput;
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCalc;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupAggregate;
@@ -124,7 +125,7 @@ public final class ExecNodeMetadataUtil {
     }
 
     private static final Set<Class<? extends ExecNode<?>>> EXEC_NODES =
-            new HashSet<Class<? extends ExecNode<?>>>() {
+            new HashSet<>() {
                 {
                     add(StreamExecCalc.class);
                     add(StreamExecChangelogNormalize.class);
@@ -164,6 +165,7 @@ public final class ExecNodeMetadataUtil {
                     add(StreamExecWindowTableFunction.class);
                     add(StreamExecPythonCalc.class);
                     add(StreamExecAsyncCalc.class);
+                    add(StreamExecProcessTableFunction.class);
                     add(StreamExecPythonCorrelate.class);
                     add(StreamExecPythonGroupAggregate.class);
                     add(StreamExecPythonGroupWindowAggregate.class);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
index 522b3731919..a025eacf3be 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.expressions.RexNodeExpression;
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+import org.apache.flink.table.planner.functions.utils.TableSqlFunction;
 
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.RelOptCluster;
@@ -140,6 +141,10 @@ public final class ShortcutUtils {
         }
         final RexCall call = (RexCall) rexNode;
         if (!(call.getOperator() instanceof BridgingSqlFunction)) {
+            // legacy
+            if (call.getOperator() instanceof TableSqlFunction) {
+                return ((TableSqlFunction) call.getOperator()).udtf();
+            }
             return null;
         }
         return ((BridgingSqlFunction) call.getOperator()).getDefinition();
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
deleted file mode 100644
index ca664d7b63e..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.nodes.logical
-
-import org.apache.flink.table.functions.TemporalTableFunction
-import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
-import org.apache.flink.table.planner.functions.utils.TableSqlFunction
-import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.convert.ConverterRule.Config
-import org.apache.calcite.rel.core.TableFunctionScan
-import org.apache.calcite.rel.logical.LogicalTableFunctionScan
-import org.apache.calcite.rel.metadata.RelColumnMapping
-import org.apache.calcite.rex.{RexCall, RexNode}
-
-import java.lang.reflect.Type
-import java.util
-
-import scala.collection.JavaConversions._
-
-/**
- * Sub-class of [[TableFunctionScan]] that is a relational expression which 
calls a table-valued
- * function in Flink.
- */
-class FlinkLogicalTableFunctionScan(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    inputs: util.List[RelNode],
-    rexCall: RexNode,
-    elementType: Type,
-    rowType: RelDataType,
-    columnMappings: util.Set[RelColumnMapping])
-  extends TableFunctionScan(
-    cluster,
-    traitSet,
-    inputs,
-    rexCall,
-    elementType,
-    rowType,
-    columnMappings)
-  with FlinkLogicalRel {
-
-  override def copy(
-      traitSet: RelTraitSet,
-      inputs: util.List[RelNode],
-      rexCall: RexNode,
-      elementType: Type,
-      rowType: RelDataType,
-      columnMappings: util.Set[RelColumnMapping]): TableFunctionScan = {
-
-    new FlinkLogicalTableFunctionScan(
-      cluster,
-      traitSet,
-      inputs,
-      rexCall,
-      elementType,
-      rowType,
-      columnMappings)
-  }
-
-}
-
-class FlinkLogicalTableFunctionScanConverter(config: Config) extends 
ConverterRule(config) {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val logicalTableFunction: LogicalTableFunctionScan = call.rel(0)
-
-    !isTemporalTableFunctionCall(logicalTableFunction)
-  }
-
-  private def isTemporalTableFunctionCall(
-      logicalTableFunction: LogicalTableFunctionScan): Boolean = {
-
-    if (!logicalTableFunction.getCall.isInstanceOf[RexCall]) {
-      return false
-    }
-    val rexCall = logicalTableFunction.getCall.asInstanceOf[RexCall]
-    val functionDefinition = rexCall.getOperator match {
-      case tsf: TableSqlFunction => tsf.udtf
-      case bsf: BridgingSqlFunction => bsf.getDefinition
-      case _ => return false
-    }
-    functionDefinition.isInstanceOf[TemporalTableFunction]
-  }
-
-  def convert(rel: RelNode): RelNode = {
-    val scan = rel.asInstanceOf[LogicalTableFunctionScan]
-    val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
-    val newInputs = scan.getInputs.map(input => RelOptRule.convert(input, 
FlinkConventions.LOGICAL))
-    val rexCall = scan.getCall.asInstanceOf[RexCall];
-    val builder = rel.getCluster.getRexBuilder
-    // When rexCall uses NamedArguments, RexCall is not inferred with the 
correct type.
-    // We just use the type of scan as the type of RexCall.
-    val newCall = rexCall.clone(rel.getRowType, rexCall.getOperands)
-
-    new FlinkLogicalTableFunctionScan(
-      scan.getCluster,
-      traitSet,
-      newInputs,
-      newCall,
-      scan.getElementType,
-      scan.getRowType,
-      scan.getColumnMappings
-    )
-  }
-
-}
-
-object FlinkLogicalTableFunctionScan {
-  val CONVERTER = new FlinkLogicalTableFunctionScanConverter(
-    Config.INSTANCE.withConversion(
-      classOf[LogicalTableFunctionScan],
-      Convention.NONE,
-      FlinkConventions.LOGICAL,
-      "FlinkLogicalTableFunctionScanConverter"))
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index 681dee6e766..e6d2892eab6 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -31,6 +31,7 @@ import 
org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFast
 import org.apache.flink.table.planner.sinks.DataStreamTableSink
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
 import org.apache.flink.table.runtime.operators.join.FlinkJoinType
+import org.apache.flink.table.types.inference.{StaticArgument, 
StaticArgumentTrait}
 import org.apache.flink.types.RowKind
 
 import org.apache.calcite.rel.RelNode
@@ -329,6 +330,29 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         val providedTrait = new 
ModifyKindSetTrait(scan.intermediateTable.modifyKindSet)
         createNewNode(scan, List(), providedTrait, requiredTrait, requester)
 
+      case process: StreamPhysicalProcessTableFunction =>
+        // Accepted changes depend on input argument declaration
+        val requiredChildrenTraits = process.getProvidedInputArgs
+          .map(
+            arg =>
+              if (arg.is(StaticArgumentTrait.SUPPORT_UPDATES)) {
+                ModifyKindSetTrait.ALL_CHANGES
+              } else {
+                ModifyKindSetTrait.INSERT_ONLY
+              })
+          .toList
+
+        val children = if (requiredChildrenTraits.isEmpty) {
+          // Constant function has a single StreamPhysicalValues input
+          visitChildren(process, ModifyKindSetTrait.INSERT_ONLY)
+        } else {
+          visitChildren(process, requiredChildrenTraits)
+        }
+
+        // Currently, PTFs will only output insert-only
+        val providedTrait = ModifyKindSetTrait.INSERT_ONLY
+        createNewNode(process, children, providedTrait, requiredTrait, 
requester)
+
       case _ =>
         throw new UnsupportedOperationException(
           s"Unsupported visit for ${rel.getClass.getSimpleName}")
@@ -350,6 +374,16 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
       newChildren.toList
     }
 
+    private def visitChildren(
+        parent: StreamPhysicalRel,
+        requiredChildrenTraits: List[ModifyKindSetTrait]): 
List[StreamPhysicalRel] = {
+      val requester = getNodeName(parent)
+      val newChildren = for (i <- 0 until parent.getInputs.size()) yield {
+        visitChild(parent, i, requiredChildrenTraits(i), requester)
+      }
+      newChildren.toList
+    }
+
     private def visitChild(
         parent: StreamPhysicalRel,
         childOrdinal: Int,
@@ -676,6 +710,20 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
             createNewNode(rel, Some(List()), providedTrait)
           }
 
+        case process: StreamPhysicalProcessTableFunction =>
+          // ProcessTableFunction currently only consumes retract or 
insert-only
+          val children = process.getInputs.map {
+            case child: StreamPhysicalRel =>
+              val childModifyKindSet = getModifyKindSet(child)
+              val requiredChildTrait = if (childModifyKindSet.isInsertOnly) {
+                UpdateKindTrait.NONE
+              } else {
+                UpdateKindTrait.BEFORE_AND_AFTER
+              }
+              this.visit(child, requiredChildTrait)
+          }.toList
+          createNewNode(rel, Some(children.flatten), UpdateKindTrait.NONE)
+
         case _ =>
           throw new UnsupportedOperationException(
             s"Unsupported visit for ${rel.getClass.getSimpleName}")
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index fb1fec0a1b6..b5d30afa20f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.planner.plan.rules
 
 import org.apache.flink.table.planner.plan.nodes.logical._
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalProcessTableFunctionRule
 import org.apache.flink.table.planner.plan.rules.logical._
 import 
org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule
 import org.apache.flink.table.planner.plan.rules.physical.stream._
@@ -465,6 +466,8 @@ object FlinkStreamRuleSets {
     ExpandWindowTableFunctionTransposeRule.INSTANCE,
     StreamPhysicalWindowRankRule.INSTANCE,
     StreamPhysicalWindowDeduplicateRule.INSTANCE,
+    // process table function
+    StreamPhysicalProcessTableFunctionRule.INSTANCE,
     // join
     StreamPhysicalJoinRule.INSTANCE,
     StreamPhysicalIntervalJoinRule.INSTANCE,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
index 489f10a6123..e4baf0e0375 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.module.Module;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
+import org.apache.flink.table.planner.calcite.RexTableArgCall;
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
@@ -684,7 +685,17 @@ public class RexNodeJsonSerdeTest {
                         FlinkSqlOperatorTable.HASH_CODE,
                         
rexBuilder.makeInputRef(FACTORY.createSqlType(SqlTypeName.INTEGER), 1)),
                 rexBuilder.makePatternFieldRef(
-                        "test", FACTORY.createSqlType(SqlTypeName.INTEGER), 
0));
+                        "test", FACTORY.createSqlType(SqlTypeName.INTEGER), 0),
+                new RexTableArgCall(
+                        FACTORY.createStructType(
+                                StructKind.PEEK_FIELDS_NO_EXPAND,
+                                Arrays.asList(
+                                        
FACTORY.createSqlType(SqlTypeName.VARCHAR),
+                                        
FACTORY.createSqlType(SqlTypeName.INTEGER)),
+                                Arrays.asList("f1", "f2")),
+                        0,
+                        new int[] {1},
+                        new int[] {0}));
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
index 8aeee6ca5c0..96b4e514775 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
@@ -46,6 +46,7 @@ import java.util.stream.Stream;
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static 
org.apache.flink.table.annotation.ArgumentTrait.OPTIONAL_PARTITION_BY;
 import static 
org.apache.flink.table.annotation.ArgumentTrait.PASS_COLUMNS_THROUGH;
+import static org.apache.flink.table.annotation.ArgumentTrait.SUPPORT_UPDATES;
 import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_ROW;
 import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_SET;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -58,9 +59,18 @@ public class ProcessTableFunctionTest extends TableTestBase {
     @BeforeEach
     void setup() {
         util = streamTestUtil(TableConfig.getDefault());
-        util.tableEnv().executeSql("CREATE VIEW t1 AS SELECT 'Bob' AS name, 12 
AS score");
-        util.tableEnv().executeSql("CREATE VIEW t2 AS SELECT 'Bob' AS name, 12 
AS different");
-        util.tableEnv().executeSql("CREATE VIEW t3 AS SELECT 'Bob' AS name, 
TRUE AS isValid");
+        util.tableEnv()
+                .executeSql(
+                        "CREATE VIEW t AS SELECT * FROM (VALUES ('Bob', 12), 
('Alice', 42)) AS T(name, score)");
+        util.tableEnv()
+                .executeSql("CREATE VIEW t_name_diff AS SELECT 'Bob' AS name, 
12 AS different");
+        util.tableEnv()
+                .executeSql("CREATE VIEW t_type_diff AS SELECT 'Bob' AS name, 
TRUE AS isValid");
+        util.tableEnv()
+                .executeSql("CREATE VIEW t_updating AS SELECT name, COUNT(*) 
FROM t GROUP BY name");
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE t_sink (name STRING, data STRING) WITH 
('connector' = 'blackhole')");
     }
 
     @Test
@@ -86,13 +96,13 @@ public class ProcessTableFunctionTest extends TableTestBase 
{
     @Test
     void testTableAsRow() {
         util.addTemporarySystemFunction("f", TableAsRowFunction.class);
-        assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)");
+        util.verifyRelPlan("SELECT * FROM f(r => TABLE t, i => 1)");
     }
 
     @Test
     void testTypedTableAsRow() {
         util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class);
-        assertReachesOptimizer("SELECT * FROM f(u => TABLE t1, i => 1)");
+        util.verifyRelPlan("SELECT * FROM f(u => TABLE t, i => 1)");
     }
 
     @Test
@@ -100,25 +110,25 @@ public class ProcessTableFunctionTest extends 
TableTestBase {
         util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class);
         // function expects <STRING name, INT score>
         // but table is <STRING name, INT different>
-        assertReachesOptimizer("SELECT * FROM f(u => TABLE t2, i => 1)");
+        util.verifyRelPlan("SELECT * FROM f(u => TABLE t_name_diff, i => 1)");
     }
 
     @Test
     void testTableAsSet() {
         util.addTemporarySystemFunction("f", TableAsSetFunction.class);
-        assertReachesOptimizer("SELECT * FROM f(r => TABLE t1 PARTITION BY 
name, i => 1)");
+        util.verifyRelPlan("SELECT * FROM f(r => TABLE t PARTITION BY name, i 
=> 1)");
     }
 
     @Test
     void testTableAsSetOptionalPartitionBy() {
         util.addTemporarySystemFunction("f", 
TableAsSetOptionalPartitionFunction.class);
-        assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)");
+        util.verifyRelPlan("SELECT * FROM f(r => TABLE t, i => 1)");
     }
 
     @Test
     void testTypedTableAsSet() {
         util.addTemporarySystemFunction("f", TypedTableAsSetFunction.class);
-        assertReachesOptimizer("SELECT * FROM f(u => TABLE t1 PARTITION BY 
name, i => 1)");
+        util.verifyRelPlan("SELECT * FROM f(u => TABLE t PARTITION BY name, i 
=> 1)");
     }
 
     @Test
@@ -131,27 +141,83 @@ public class ProcessTableFunctionTest extends 
TableTestBase {
     void testPojoArgs() {
         util.addTemporarySystemFunction("f", PojoArgsFunction.class);
         util.addTemporarySystemFunction("pojoCreator", 
PojoCreatingFunction.class);
-        assertReachesOptimizer(
-                "SELECT * FROM f(input => TABLE t1, scalar => 
pojoCreator('Bob', 12), uid => 'my-ptf')");
+        util.verifyRelPlan(
+                "SELECT * FROM f(input => TABLE t, scalar => 
pojoCreator('Bob', 12), uid => 'my-ptf')");
     }
 
     @Test
     void testTableAsSetPassThroughColumns() {
         util.addTemporarySystemFunction("f", 
TableAsSetPassThroughFunction.class);
-        assertReachesOptimizer("SELECT * FROM f(r => TABLE t1 PARTITION BY 
name, i => 1)");
+        util.verifyRelPlan("SELECT * FROM f(r => TABLE t PARTITION BY name, i 
=> 1)");
     }
 
     @Test
     void testTableAsRowPassThroughColumns() {
         util.addTemporarySystemFunction("f", 
TableAsRowPassThroughFunction.class);
-        assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)");
+        util.verifyRelPlan("SELECT * FROM f(r => TABLE t, i => 1)");
+    }
+
+    @Test
+    void testUpdatingInput() {
+        util.addTemporarySystemFunction("f", UpdatingArgFunction.class);
+        util.verifyRelPlan("SELECT * FROM f(r => TABLE t_updating PARTITION BY 
name, i => 1)");
+    }
+
+    @Test
+    void testMissingUid() {
+        // Function name contains special characters and can thus not be used 
as UID
+        util.addTemporarySystemFunction("f*", ScalarArgsFunction.class);
+        assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM `f*`(42, 
true)"))
+                .satisfies(
+                        anyCauseMatches(
+                                "Could not derive a unique identifier for 
process table function 'f*'. "
+                                        + "The function's name does not 
qualify for a UID. "
+                                        + "Please provide a custom identifier 
using the implicit `uid` argument. "
+                                        + "For example: myFunction(..., uid => 
'my-id')"));
+    }
+
+    @Test
+    void testUidPipelineSplitIntoTwoFunctions() {
+        util.addTemporarySystemFunction("f", TableAsSetFunction.class);
+        util.verifyExecPlan(
+                util.tableEnv()
+                        .createStatementSet()
+                        .addInsertSql(
+                                "INSERT INTO t_sink SELECT * FROM f(r => TABLE 
t PARTITION BY name, i => 1, uid => 'a')")
+                        .addInsertSql(
+                                "INSERT INTO t_sink SELECT * FROM f(r => TABLE 
t PARTITION BY name, i => 1, uid => 'b')"));
+    }
+
+    @Test
+    void testUidPipelineMergeIntoOneFunction() {
+        util.addTemporarySystemFunction("f", TableAsSetFunction.class);
+        util.verifyExecPlan(
+                util.tableEnv()
+                        .createStatementSet()
+                        .addInsertSql(
+                                "INSERT INTO t_sink SELECT * FROM f(r => TABLE 
t PARTITION BY name, i => 1, uid => 'same')")
+                        .addInsertSql(
+                                "INSERT INTO t_sink SELECT * FROM f(r => TABLE 
t PARTITION BY name, i => 1, uid => 'same')"));
+    }
+
+    @Test
+    void testUidPipelineMergeWithFanOut() {
+        util.addTemporarySystemFunction("f", TableAsSetFunction.class);
+
+        util.verifyExecPlan(
+                util.tableEnv()
+                        .createStatementSet()
+                        .addInsertSql(
+                                "INSERT INTO t_sink SELECT * FROM f(r => TABLE 
t PARTITION BY name, i => 1, uid => 'same') WHERE name = 'Bob'")
+                        .addInsertSql(
+                                "INSERT INTO t_sink SELECT * FROM f(r => TABLE 
t PARTITION BY name, i => 1, uid => 'same') WHERE name = 'Alice'"));
     }
 
     @ParameterizedTest
     @MethodSource("errorSpecs")
     void testErrorBehavior(ErrorSpec spec) {
         util.addTemporarySystemFunction("f", spec.functionClass);
-        assertThatThrownBy(() -> util.verifyRelPlan(spec.sql))
+        assertThatThrownBy(() -> util.verifyExecPlan(spec.sql))
                 .satisfies(anyCauseMatches(spec.errorMessage));
     }
 
@@ -162,31 +228,31 @@ public class ProcessTableFunctionTest extends 
TableTestBase {
                         ScalarArgsFunction.class,
                         "SELECT * FROM f(uid => '%', i => 1, b => true)",
                         "Invalid unique identifier for process table function. 
"
-                                + "The 'uid' argument must be a string literal 
that follows the pattern [a-zA-Z_][a-zA-Z-_0-9]*. "
+                                + "The `uid` argument must be a string literal 
that follows the pattern [a-zA-Z_][a-zA-Z-_0-9]*. "
                                 + "But found: %"),
                 ErrorSpec.of(
                         "typed table as row with invalid input",
                         TypedTableAsRowFunction.class,
                         // function expects <STRING name, INT score>
-                        "SELECT * FROM f(u => TABLE t3, i => 1)",
+                        "SELECT * FROM f(u => TABLE t_type_diff, i => 1)",
                         "No match found for function signature "
                                 + "f(<RecordType(CHAR(3) name, BOOLEAN 
isValid)>, <NUMERIC>, <CHARACTER>)"),
                 ErrorSpec.of(
                         "table as set with missing partition by",
                         TableAsSetFunction.class,
-                        "SELECT * FROM f(r => TABLE t1, i => 1)",
+                        "SELECT * FROM f(r => TABLE t, i => 1)",
                         "Table argument 'r' requires a PARTITION BY clause for 
parallel processing."),
                 ErrorSpec.of(
                         "typed table as set with invalid input",
                         TypedTableAsSetFunction.class,
                         // function expects <STRING name, INT score>
-                        "SELECT * FROM f(u => TABLE t3 PARTITION BY name, i => 
1)",
+                        "SELECT * FROM f(u => TABLE t_type_diff PARTITION BY 
name, i => 1)",
                         "No match found for function signature "
                                 + "f(<RecordType(CHAR(3) name, BOOLEAN 
isValid)>, <NUMERIC>, <CHARACTER>)"),
                 ErrorSpec.of(
                         "table function instead of process table function",
                         NoProcessTableFunction.class,
-                        "SELECT * FROM f(r => TABLE t1)",
+                        "SELECT * FROM f(r => TABLE t)",
                         "Only scalar arguments are supported at this location. 
"
                                 + "But argument 'r' declared the following 
traits: [TABLE, TABLE_AS_ROW]"),
                 ErrorSpec.of(
@@ -197,7 +263,7 @@ public class ProcessTableFunctionTest extends TableTestBase 
{
                 ErrorSpec.of(
                         "multiple table args",
                         MultiTableFunction.class,
-                        "SELECT * FROM f(r1 => TABLE t1, r2 => TABLE t1)",
+                        "SELECT * FROM f(r1 => TABLE t, r2 => TABLE t)",
                         "Currently, only signatures with at most one table 
argument are supported."),
                 ErrorSpec.of(
                         "row instead of table",
@@ -207,25 +273,36 @@ public class ProcessTableFunctionTest extends 
TableTestBase {
                 ErrorSpec.of(
                         "table as row partition by",
                         TableAsRowFunction.class,
-                        "SELECT * FROM f(r => TABLE t1 PARTITION BY name, i => 
1)",
+                        "SELECT * FROM f(r => TABLE t PARTITION BY name, i => 
1)",
                         "Only tables with set semantics may be partitioned. "
                                 + "Invalid PARTITION BY clause in the 0-th 
operand of table function 'f'"),
                 ErrorSpec.of(
                         "invalid partition by clause",
                         TableAsSetFunction.class,
-                        "SELECT * FROM f(r => TABLE t1 PARTITION BY invalid, i 
=> 1)",
+                        "SELECT * FROM f(r => TABLE t PARTITION BY invalid, i 
=> 1)",
                         "Invalid column 'invalid' for PARTITION BY clause. 
Available columns are: [name, score]"),
                 ErrorSpec.of(
                         "unsupported order by",
                         TableAsSetFunction.class,
-                        "SELECT * FROM f(r => TABLE t1 PARTITION BY name ORDER 
BY score, i => 1)",
-                        "ORDER BY clause is currently not supported."));
-    }
-
-    private void assertReachesOptimizer(String sql) {
-        assertThatThrownBy(() -> util.verifyRelPlan(sql))
-                .hasMessageContaining(
-                        "This exception indicates that the query uses an 
unsupported SQL feature.");
+                        "SELECT * FROM f(r => TABLE t PARTITION BY name ORDER 
BY score, i => 1)",
+                        "ORDER BY clause is currently not supported."),
+                ErrorSpec.of(
+                        "updates into insert-only table arg",
+                        TableAsSetFunction.class,
+                        "SELECT * FROM f(r => TABLE t_updating PARTITION BY 
name, i => 1)",
+                        "StreamPhysicalProcessTableFunction doesn't support 
consuming update changes"),
+                ErrorSpec.of(
+                        "updates into POJO table arg",
+                        InvalidTypedUpdatingArgFunction.class,
+                        "SELECT * FROM f(r => TABLE t_updating, i => 1)",
+                        "Table arguments that support updates must use a row 
type."),
+                ErrorSpec.of(
+                        "uid conflict",
+                        TableAsSetFunction.class,
+                        "SELECT * FROM f(r => TABLE t PARTITION BY name, i => 
42, uid => 'same') "
+                                + "UNION ALL SELECT * FROM f(r => TABLE t 
PARTITION BY name, i => 999, uid => 'same')",
+                        "Duplicate unique identifier 'same' detected among 
process table functions. "
+                                + "Make sure that all PTF calls have an 
identifier defined that is globally unique."));
     }
 
     /** Testing function. */
@@ -252,6 +329,18 @@ public class ProcessTableFunctionTest extends 
TableTestBase {
         public void eval(@ArgumentHint(TABLE_AS_SET) Row r, Integer i) {}
     }
 
+    /** Testing function. */
+    public static class UpdatingArgFunction extends 
ProcessTableFunction<String> {
+        @SuppressWarnings("unused")
+        public void eval(@ArgumentHint({TABLE_AS_SET, SUPPORT_UPDATES}) Row r, 
Integer i) {}
+    }
+
+    /** Testing function. */
+    public static class InvalidTypedUpdatingArgFunction extends 
ProcessTableFunction<String> {
+        @SuppressWarnings("unused")
+        public void eval(@ArgumentHint({TABLE_AS_ROW, SUPPORT_UPDATES}) User 
u, Integer i) {}
+    }
+
     /** Testing function. */
     public static class MultiTableFunction extends 
ProcessTableFunction<String> {
         @SuppressWarnings("unused")
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
index 5f4f90d18fd..3ab9cb43979 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
@@ -16,6 +16,93 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testEmptyArgs">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM f(uid => 'my-ptf')]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$0])
++- LogicalTableFunctionScan(invocation=[f(_UTF-16LE'my-ptf')], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[f(_UTF-16LE'my-ptf')], uid=[my-ptf], 
select=[EXPR$0], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
++- Values(tuples=[[{  }]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPojoArgs">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM f(input => TABLE t, scalar => pojoCreator('Bob', 
12), uid => 'my-ptf')]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$0])
++- LogicalTableFunctionScan(invocation=[f(TABLE(#0), 
pojoCreator(_UTF-16LE'Bob', 12), _UTF-16LE'my-ptf')], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+   +- LogicalProject(name=[$0], score=[$1])
+      +- LogicalProject(name=[$0], score=[$1])
+         +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 
42 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[f(TABLE(#0), pojoCreator(_UTF-16LE'Bob', 12), 
_UTF-16LE'my-ptf')], uid=[my-ptf], select=[EXPR$0], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
++- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testScalarArgsNoUid">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM f(i => 1, b => true)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$0])
++- LogicalTableFunctionScan(invocation=[f(1, true, DEFAULT())], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[f(1, true, DEFAULT())], uid=[f], 
select=[EXPR$0], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
++- Values(tuples=[[{  }]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUidPipelineSplitIntoTwoFunctions">
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.t_sink], fields=[name, 
EXPR$0])
++- LogicalProject(name=[$0], EXPR$0=[$1])
+   +- LogicalTableFunctionScan(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
_UTF-16LE'a')], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) 
EXPR$0)])
+      +- LogicalProject(name=[$0], score=[$1])
+         +- LogicalProject(name=[$0], score=[$1])
+            +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { 
_UTF-16LE'Alice', 42 }]])
+
+LogicalSink(table=[default_catalog.default_database.t_sink], fields=[name, 
EXPR$0])
++- LogicalProject(name=[$0], EXPR$0=[$1])
+   +- LogicalTableFunctionScan(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
_UTF-16LE'b')], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) 
EXPR$0)])
+      +- LogicalProject(name=[$0], score=[$1])
+         +- LogicalProject(name=[$0], score=[$1])
+            +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { 
_UTF-16LE'Alice', 42 }]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Exchange(distribution=[hash[name]])(reuse_id=[1])
++- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
+
+Sink(table=[default_catalog.default_database.t_sink], fields=[name, EXPR$0])
++- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
_UTF-16LE'a')], uid=[a], select=[name,EXPR$0], rowType=[RecordType(VARCHAR(5) 
name, VARCHAR(2147483647) EXPR$0)])
+   +- Reused(reference_id=[1])
+
+Sink(table=[default_catalog.default_database.t_sink], fields=[name, EXPR$0])
++- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
_UTF-16LE'b')], uid=[b], select=[name,EXPR$0], rowType=[RecordType(VARCHAR(5) 
name, VARCHAR(2147483647) EXPR$0)])
+   +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testScalarArgsWithUid">
     <Resource name="sql">
       <![CDATA[SELECT * FROM f(uid => 'my-uid', i => 1, b => true)]]>
@@ -28,48 +115,273 @@ LogicalProject(EXPR$0=[$0])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Correlate(invocation=[f(1, true, _UTF-16LE'my-uid')], 
correlate=[table(f(1,true,'my-uid'))], select=[EXPR$0], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
+ProcessTableFunction(invocation=[f(1, true, _UTF-16LE'my-uid')], uid=[my-uid], 
select=[EXPR$0], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
 +- Values(tuples=[[{  }]])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testEmptyArgs">
+  <TestCase name="testTableAsRow">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM f(uid => 'my-ptf')]]>
+      <![CDATA[SELECT * FROM f(r => TABLE t, i => 1)]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(EXPR$0=[$0])
-+- LogicalTableFunctionScan(invocation=[f(_UTF-16LE'my-ptf')], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
++- LogicalTableFunctionScan(invocation=[f(TABLE(#0), 1, DEFAULT())], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+   +- LogicalProject(name=[$0], score=[$1])
+      +- LogicalProject(name=[$0], score=[$1])
+         +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 
42 }]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Correlate(invocation=[f(_UTF-16LE'my-ptf')], correlate=[table(f('my-ptf'))], 
select=[EXPR$0], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)], 
joinType=[INNER])
-+- Values(tuples=[[{  }]])
+ProcessTableFunction(invocation=[f(TABLE(#0), 1, DEFAULT())], uid=[f], 
select=[EXPR$0], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
++- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testUnknownScalarArg">
+  <TestCase name="testTableAsRowPassThroughColumns">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM f(i => 1, b => true, invalid => 'invalid')]]>
+      <![CDATA[SELECT * FROM f(r => TABLE t, i => 1)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(name=[$0], score=[$1], EXPR$0=[$2])
++- LogicalTableFunctionScan(invocation=[f(TABLE(#0), 1, DEFAULT())], 
rowType=[RecordType(VARCHAR(5) name, INTEGER score, VARCHAR(2147483647) 
EXPR$0)])
+   +- LogicalProject(name=[$0], score=[$1])
+      +- LogicalProject(name=[$0], score=[$1])
+         +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 
42 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[f(TABLE(#0), 1, DEFAULT())], uid=[f], 
select=[name,score,EXPR$0], rowType=[RecordType(VARCHAR(5) name, INTEGER score, 
VARCHAR(2147483647) EXPR$0)])
++- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTableAsSet">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(name=[$0], EXPR$0=[$1])
++- LogicalTableFunctionScan(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
DEFAULT())], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) EXPR$0)])
+   +- LogicalProject(name=[$0], score=[$1])
+      +- LogicalProject(name=[$0], score=[$1])
+         +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 
42 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, DEFAULT())], 
uid=[f], select=[name,EXPR$0], rowType=[RecordType(VARCHAR(5) name, 
VARCHAR(2147483647) EXPR$0)])
++- Exchange(distribution=[hash[name]])
+   +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTableAsSetPassThroughColumns">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(name=[$0], score=[$1], EXPR$0=[$2])
++- LogicalTableFunctionScan(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
DEFAULT())], rowType=[RecordType(VARCHAR(5) name, INTEGER score, 
VARCHAR(2147483647) EXPR$0)])
+   +- LogicalProject(name=[$0], score=[$1])
+      +- LogicalProject(name=[$0], score=[$1])
+         +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 
42 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, DEFAULT())], 
uid=[f], select=[name,score,EXPR$0], rowType=[RecordType(VARCHAR(5) name, 
INTEGER score, VARCHAR(2147483647) EXPR$0)])
++- Exchange(distribution=[hash[name]])
+   +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTypedTableAsRow">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM f(u => TABLE t, i => 1)]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(EXPR$0=[$0])
-+- LogicalTableFunctionScan(invocation=[f(1, true, DEFAULT())], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
++- LogicalTableFunctionScan(invocation=[f(TABLE(#0), 1, DEFAULT())], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+   +- LogicalProject(name=[$0], score=[$1])
+      +- LogicalProject(name=[$0], score=[$1])
+         +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 
42 }]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Correlate(invocation=[f(1, true, DEFAULT())], 
correlate=[table(f(1,true,DEFAULT()))], select=[EXPR$0], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
-+- Values(tuples=[[{  }]])
+ProcessTableFunction(invocation=[f(TABLE(#0), 1, DEFAULT())], uid=[f], 
select=[EXPR$0], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
++- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testScalarArgsNoUid">
+  <TestCase name="testTypedTableAsRowIgnoringColumnNames">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM f(i => 1, b => true)]]>
+      <![CDATA[SELECT * FROM f(u => TABLE t_name_diff, i => 1)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$0])
++- LogicalTableFunctionScan(invocation=[f(TABLE(#0), 1, DEFAULT())], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+   +- LogicalProject(name=[$0], different=[$1])
+      +- LogicalProject(name=[_UTF-16LE'Bob'], different=[12])
+         +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[f(TABLE(#0), 1, DEFAULT())], uid=[f], 
select=[EXPR$0], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
++- Calc(select=['Bob' AS name, 12 AS different])
+   +- Values(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTypedTableAsSet">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM f(u => TABLE t PARTITION BY name, i => 1)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(name=[$0], EXPR$0=[$1])
++- LogicalTableFunctionScan(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
DEFAULT())], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) EXPR$0)])
+   +- LogicalProject(name=[$0], score=[$1])
+      +- LogicalProject(name=[$0], score=[$1])
+         +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 
42 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, DEFAULT())], 
uid=[f], select=[name,EXPR$0], rowType=[RecordType(VARCHAR(5) name, 
VARCHAR(2147483647) EXPR$0)])
++- Exchange(distribution=[hash[name]])
+   +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUidPipelineMergeIntoOneFunction">
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.t_sink], fields=[name, 
EXPR$0])
++- LogicalProject(name=[$0], EXPR$0=[$1])
+   +- LogicalTableFunctionScan(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
_UTF-16LE'same')], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) 
EXPR$0)])
+      +- LogicalProject(name=[$0], score=[$1])
+         +- LogicalProject(name=[$0], score=[$1])
+            +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { 
_UTF-16LE'Alice', 42 }]])
+
+LogicalSink(table=[default_catalog.default_database.t_sink], fields=[name, 
EXPR$0])
++- LogicalProject(name=[$0], EXPR$0=[$1])
+   +- LogicalTableFunctionScan(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
_UTF-16LE'same')], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) 
EXPR$0)])
+      +- LogicalProject(name=[$0], score=[$1])
+         +- LogicalProject(name=[$0], score=[$1])
+            +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { 
_UTF-16LE'Alice', 42 }]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
_UTF-16LE'same')], uid=[same], select=[name,EXPR$0], 
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) EXPR$0)])(reuse_id=[1])
++- Exchange(distribution=[hash[name]])
+   +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
+
+Sink(table=[default_catalog.default_database.t_sink], fields=[name, EXPR$0])
++- Reused(reference_id=[1])
+
+Sink(table=[default_catalog.default_database.t_sink], fields=[name, EXPR$0])
++- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUidPipelineMergeWithFanOut">
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.t_sink], fields=[name, 
EXPR$0])
++- LogicalProject(name=[$0], EXPR$0=[$1])
+   +- LogicalFilter(condition=[=($0, _UTF-16LE'Bob')])
+      +- LogicalTableFunctionScan(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
_UTF-16LE'same')], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) 
EXPR$0)])
+         +- LogicalProject(name=[$0], score=[$1])
+            +- LogicalProject(name=[$0], score=[$1])
+               +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { 
_UTF-16LE'Alice', 42 }]])
+
+LogicalSink(table=[default_catalog.default_database.t_sink], fields=[name, 
EXPR$0])
++- LogicalProject(name=[$0], EXPR$0=[$1])
+   +- LogicalFilter(condition=[=($0, _UTF-16LE'Alice')])
+      +- LogicalTableFunctionScan(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
_UTF-16LE'same')], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) 
EXPR$0)])
+         +- LogicalProject(name=[$0], score=[$1])
+            +- LogicalProject(name=[$0], score=[$1])
+               +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { 
_UTF-16LE'Alice', 42 }]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
_UTF-16LE'same')], uid=[same], select=[name,EXPR$0], 
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) EXPR$0)])(reuse_id=[1])
++- Exchange(distribution=[hash[name]])
+   +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
+
+Sink(table=[default_catalog.default_database.t_sink], fields=[name, EXPR$0])
++- Calc(select=['Bob' AS name, EXPR$0], where=[(name = 'Bob')])
+   +- Reused(reference_id=[1])
+
+Sink(table=[default_catalog.default_database.t_sink], fields=[name, EXPR$0])
++- Calc(select=['Alice' AS name, EXPR$0], where=[(name = 'Alice')])
+   +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUpdatingInput">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM f(r => TABLE t_updating PARTITION BY name, i => 
1)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(name=[$0], EXPR$0=[$1])
++- LogicalTableFunctionScan(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
DEFAULT())], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) EXPR$0)])
+   +- LogicalProject(name=[$0], EXPR$1=[$1])
+      +- LogicalAggregate(group=[{0}], EXPR$1=[COUNT()])
+         +- LogicalProject(name=[$0])
+            +- LogicalProject(name=[$0], score=[$1])
+               +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { 
_UTF-16LE'Alice', 42 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, DEFAULT())], 
uid=[f], select=[name,EXPR$0], rowType=[RecordType(VARCHAR(5) name, 
VARCHAR(2147483647) EXPR$0)])
++- Exchange(distribution=[hash[name]])
+   +- GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS EXPR$1])
+      +- Exchange(distribution=[hash[name]])
+         +- Calc(select=[name])
+            +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 
}]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTableAsSetOptionalPartitionBy">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM f(r => TABLE t, i => 1)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$0])
++- LogicalTableFunctionScan(invocation=[f(TABLE(#0), 1, DEFAULT())], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+   +- LogicalProject(name=[$0], score=[$1])
+      +- LogicalProject(name=[$0], score=[$1])
+         +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 
42 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[f(TABLE(#0), 1, DEFAULT())], uid=[f], 
select=[EXPR$0], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
++- Exchange(distribution=[single])
+   +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUnknownScalarArg">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM f(i => 1, b => true, invalid => 'invalid')]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
@@ -79,7 +391,7 @@ LogicalProject(EXPR$0=[$0])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Correlate(invocation=[f(1, true, DEFAULT())], 
correlate=[table(f(1,true,DEFAULT()))], select=[EXPR$0], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
+ProcessTableFunction(invocation=[f(1, true, DEFAULT())], uid=[f], 
select=[EXPR$0], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
 +- Values(tuples=[[{  }]])
 ]]>
     </Resource>

Reply via email to