This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 948f06374d0 [FLINK-26712][table-planner] Metadata keys should not 
conflict with physical columns
948f06374d0 is described below

commit 948f06374d0c502437b56e25f6b3ec237fe7c720
Author: Timo Walther <[email protected]>
AuthorDate: Mon Mar 21 17:54:37 2022 +0100

    [FLINK-26712][table-planner] Metadata keys should not conflict with 
physical columns
    
    This reduces the likelihood for name collisions between metadata columns 
and physical
    columns. It might break some connector implementations that used 
SupportsReadable/WritingMetadata
    and name-based column arithmetics. We don't recommend using name-based 
column arithmetics but
    index-based ones.
    
    This closes #19236.
---
 .../file/table/FileSystemTableSource.java          |  34 +-
 .../sink/abilities/SupportsWritingMetadata.java    |   5 +-
 .../source/abilities/SupportsReadingMetadata.java  |   5 +-
 .../table/planner/connectors/DynamicSinkUtils.java |   9 +-
 .../planner/connectors/DynamicSourceUtils.java     |  13 +-
 .../PushProjectIntoTableSourceScanRule.java        |   2 +
 .../planner/factories/TestValuesTableFactory.java  |   3 +-
 .../PushProjectIntoTableSourceScanRuleTest.java    |   4 +-
 .../file/table/FileSystemTableSourceTest.xml       |   4 +-
 .../planner/plan/batch/sql/TableSourceTest.xml     |   4 +-
 .../testWritingMetadata.out                        |   2 +-
 .../testReadingMetadata.out                        |  10 +-
 .../PushWatermarkIntoTableSourceScanRuleTest.xml   |   4 +-
 .../PushLocalAggIntoTableSourceScanRuleTest.xml    | 444 ++++++++++-----------
 .../plan/stream/sql/SourceWatermarkTest.xml        |   4 +-
 .../planner/plan/stream/sql/TableScanTest.xml      |  25 +-
 .../planner/plan/stream/sql/TableSinkTest.xml      |  19 +-
 .../planner/plan/stream/sql/TableSourceTest.xml    |   4 +-
 .../planner/plan/stream/sql/TableScanTest.scala    |  19 +
 .../planner/plan/stream/sql/TableSinkTest.scala    |  29 ++
 20 files changed, 375 insertions(+), 268 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
index 5fc8dd1e718..448a39618ed 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
@@ -64,8 +64,11 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static org.apache.flink.util.CollectionUtil.entry;
+
 /** File system table source. */
 @Internal
 public class FileSystemTableSource extends AbstractFileSystemTable
@@ -109,20 +112,14 @@ public class FileSystemTableSource extends 
AbstractFileSystemTable
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) 
{
-        // When this table has no partition, just return a empty source.
+        // When this table has no partition, just return an empty source.
         if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
             return InputFormatProvider.of(new CollectionInputFormat<>(new 
ArrayList<>(), null));
         }
 
         // Resolve metadata and make sure to filter out metadata not in the 
producedDataType
         final List<String> metadataKeys =
-                DataType.getFieldNames(producedDataType).stream()
-                        .filter(
-                                ((this.metadataKeys == null)
-                                                ? Collections.emptyList()
-                                                : this.metadataKeys)
-                                        ::contains)
-                        .collect(Collectors.toList());
+                this.metadataKeys == null ? Collections.emptyList() : 
this.metadataKeys;
         final List<ReadableFileInfo> metadataToExtract =
                 
metadataKeys.stream().map(ReadableFileInfo::resolve).collect(Collectors.toList());
 
@@ -225,16 +222,27 @@ public class FileSystemTableSource extends 
AbstractFileSystemTable
             List<ReadableFileInfo> metadata,
             List<String> partitionKeys) {
         if (!metadata.isEmpty() || !partitionKeys.isEmpty()) {
+            final List<String> producedFieldNames = 
DataType.getFieldNames(producedDataType);
+            final Map<String, FileInfoAccessor> metadataColumns =
+                    IntStream.range(0, metadata.size())
+                            .mapToObj(
+                                    i -> {
+                                        // Access metadata columns from the 
back because the
+                                        // names are decided by the planner
+                                        final int columnPos =
+                                                producedFieldNames.size() - 
metadata.size() + i;
+                                        return entry(
+                                                
producedFieldNames.get(columnPos),
+                                                metadata.get(i).getAccessor());
+                                    })
+                            .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
             bulkFormat =
                     new FileInfoExtractorBulkFormat(
                             bulkFormat,
                             producedDataType,
                             context.createTypeInformation(producedDataType),
-                            metadata.stream()
-                                    .collect(
-                                            Collectors.toMap(
-                                                    ReadableFileInfo::getKey,
-                                                    
ReadableFileInfo::getAccessor)),
+                            metadataColumns,
                             partitionKeys,
                             defaultPartName);
         }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java
index dd344950c1d..858bc05ef6c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java
@@ -70,7 +70,7 @@ import java.util.Map;
  * <pre>{@code
  * // for t1 and t2
  * ROW < i INT, s STRING, d DOUBLE >                                           
   // physical input
- * ROW < i INT, s STRING, d DOUBLE, timestamp TIMESTAMP(3) WITH LOCAL TIME 
ZONE > // final input
+ * ROW < i INT, s STRING, d DOUBLE, $metadata$timestamp TIMESTAMP(3) WITH 
LOCAL TIME ZONE > // final input
  *
  * // for t3
  * ROW < i INT, s STRING, d DOUBLE >                                           
   // physical input
@@ -115,7 +115,8 @@ public interface SupportsWritingMetadata {
      *
      * @param metadataKeys a subset of the keys returned by {@link 
#listWritableMetadata()}, ordered
      *     by the iteration order of returned map
-     * @param consumedDataType the final input type of the sink
+     * @param consumedDataType the final input type of the sink, it is 
intended to be only forwarded
+     *     and the planner will decide on the field names to avoid collisions
      * @see EncodingFormat#applyWritableMetadata(List)
      */
     void applyWritableMetadata(List<String> metadataKeys, DataType 
consumedDataType);
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
index dcf20353af4..ac462b668a9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
@@ -78,7 +78,7 @@ import java.util.Map;
  * <pre>{@code
  * // for t1 and t2
  * ROW < i INT, s STRING, d DOUBLE >                                           
   // physical output
- * ROW < i INT, s STRING, d DOUBLE, timestamp TIMESTAMP(3) WITH LOCAL TIME 
ZONE > // final output
+ * ROW < i INT, s STRING, d DOUBLE, $metadata$timestamp TIMESTAMP(3) WITH 
LOCAL TIME ZONE > // final output
  * }</pre>
  */
 @PublicEvolving
@@ -129,7 +129,8 @@ public interface SupportsReadingMetadata {
      *
      * @param metadataKeys a subset of the keys returned by {@link 
#listReadableMetadata()}, ordered
      *     by the iteration order of returned map
-     * @param producedDataType the final output type of the source
+     * @param producedDataType the final output type of the source, it is 
intended to be only
+     *     forwarded and the planner will decide on the field names to avoid 
collisions
      * @see DecodingFormat#applyReadableMetadata(List)
      */
     void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index dd5a362b417..7a906ffa9bd 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -84,6 +84,9 @@ import static 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.suppor
 @Internal
 public final class DynamicSinkUtils {
 
+    // Ensures that physical and metadata columns don't collide.
+    private static final String METADATA_COLUMN_PREFIX = "$metadata$";
+
     /** Converts an {@link TableResult#collect()} sink to a {@link RelNode}. */
     public static RelNode convertCollectToRel(
             FlinkRelBuilder relBuilder,
@@ -642,7 +645,11 @@ public final class DynamicSinkUtils {
 
         final Stream<RowField> metadataFields =
                 createRequiredMetadataKeys(schema, sink).stream()
-                        .map(k -> new RowField(k, 
metadataMap.get(k).getLogicalType()));
+                        .map(
+                                k ->
+                                        new RowField(
+                                                METADATA_COLUMN_PREFIX + k,
+                                                
metadataMap.get(k).getLogicalType()));
 
         final List<RowField> rowFields =
                 Stream.concat(physicalFields, 
metadataFields).collect(Collectors.toList());
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
index e531da669bb..13dda6d39c6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
@@ -71,6 +71,9 @@ import static 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.suppor
 @Internal
 public final class DynamicSourceUtils {
 
+    // Ensures that physical and metadata columns don't collide.
+    public static final String METADATA_COLUMN_PREFIX = "$metadata$";
+
     /**
      * Converts a given {@link DataStream} to a {@link RelNode}. It adds 
helper projections if
      * necessary.
@@ -204,7 +207,11 @@ public final class DynamicSourceUtils {
 
         final Stream<RowField> metadataFields =
                 createRequiredMetadataKeys(schema, source).stream()
-                        .map(k -> new RowField(k, 
metadataMap.get(k).getLogicalType()));
+                        .map(
+                                k ->
+                                        new RowField(
+                                                METADATA_COLUMN_PREFIX + k,
+                                                
metadataMap.get(k).getLogicalType()));
 
         final List<RowField> rowFields =
                 Stream.concat(physicalFields, 
metadataFields).collect(Collectors.toList());
@@ -315,7 +322,9 @@ public final class DynamicSourceUtils {
                                                         .getMetadataKey()
                                                         
.orElse(metadataColumn.getName());
                                         return rexBuilder.makeAbstractCast(
-                                                relDataType, 
relBuilder.field(metadataKey));
+                                                relDataType,
+                                                relBuilder.field(
+                                                        METADATA_COLUMN_PREFIX 
+ metadataKey));
                                     } else {
                                         return relBuilder.field(c.getName());
                                     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
index a4044bf45b0..58af37cbcbf 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
@@ -60,6 +60,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.METADATA_COLUMN_PREFIX;
 import static 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createProducedType;
 import static 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createRequiredMetadataKeys;
 import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext;
@@ -308,6 +309,7 @@ public class PushProjectIntoTableSourceScanRule
             final List<String> projectedMetadataKeys =
                     projectedMetadataColumns.stream()
                             .map(NestedColumn::name)
+                            .map(k -> 
k.substring(METADATA_COLUMN_PREFIX.length()))
                             .collect(Collectors.toList());
 
             abilitySpecs.add(new ReadingMetadataSpec(projectedMetadataKeys, 
newProducedType));
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index db9585ea4bd..1c93fe10e02 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -402,8 +402,7 @@ public final class TestValuesTableFactory
             Collection<Row> data = registeredData.getOrDefault(dataId, 
Collections.emptyList());
             List<Map<String, String>> partitions =
                     
parsePartitionList(helper.getOptions().get(PARTITION_LIST));
-            DataType producedDataType =
-                    
context.getCatalogTable().getSchema().toPhysicalRowDataType();
+            DataType producedDataType = context.getPhysicalRowDataType();
             // pushing project into scan will prune schema and we have to get 
the mapping between
             // partition and row
             Map<Map<String, String>, Collection<Row>> partition2Rows;
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
index d98ac9414fb..3f38bae8ab5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
@@ -349,7 +349,7 @@ public class PushProjectIntoTableSourceScanRuleTest
                 equalTo(Collections.emptyList()));
         assertThat(
                 DataType.getFieldNames(appliedMetadataDataType.get()),
-                equalTo(Collections.singletonList("m2")));
+                equalTo(Collections.singletonList("$metadata$m2")));
     }
 
     @Test
@@ -375,7 +375,7 @@ public class PushProjectIntoTableSourceScanRuleTest
                 equalTo(Collections.singletonList("f1")));
         assertThat(
                 DataType.getFieldNames(appliedMetadataDataType.get()),
-                equalTo(Arrays.asList("f1", "m2")));
+                equalTo(Arrays.asList("f1", "$metadata$m2")));
     }
 
     // 
---------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
index 6a2c8f88d9b..479eb8ad787 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
@@ -50,8 +50,8 @@ LogicalSink(table=[default_catalog.default_database.MySink], 
fields=[a, b, filem
     <Resource name="optimized rel plan">
       <![CDATA[
 Sink(table=[default_catalog.default_database.MySink], fields=[a, b, filemeta])
-+- Calc(select=[a, b, CAST(file.path AS VARCHAR(2147483647)) AS filemeta])
-   +- TableSourceScan(table=[[default_catalog, default_database, 
MyTableWithMeta, project=[a, b], metadata=[file.path]]], fields=[a, b, 
file.path])
++- Calc(select=[a, b, CAST($metadata$file.path AS VARCHAR(2147483647)) AS 
filemeta])
+   +- TableSourceScan(table=[[default_catalog, default_database, 
MyTableWithMeta, project=[a, b], metadata=[file.path]]], fields=[a, b, 
$metadata$file.path])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
index 4c134457412..037458a3857 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
@@ -113,8 +113,8 @@ LogicalProject(id=[$0], nested1=[$1.nested1], 
results=[+(+($1.nested1.value, $1.
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + 
deepNested_nested2_num) + metadata_1) AS results])
-+- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, 
deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]], 
fields=[id, deepNested_nested1, deepNested_nested2_num, metadata_1])
+Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + 
deepNested_nested2_num) + $metadata$metadata_1) AS results])
++- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, 
deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]], 
fields=[id, deepNested_nested1, deepNested_nested2_num, $metadata$metadata_1])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out
index 60411d8a3d9..53107af6c88 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out
@@ -69,7 +69,7 @@
       "abilities" : [ {
         "type" : "WritingMetadata",
         "metadataKeys" : [ "m" ],
-        "consumedType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)> 
NOT NULL"
+        "consumedType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m` 
VARCHAR(2147483647)> NOT NULL"
       } ]
     },
     "inputChangelogMode" : [ "INSERT" ],
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReadingMetadata.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReadingMetadata.out
index a8014cf4400..81bc950fda9 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReadingMetadata.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReadingMetadata.out
@@ -40,11 +40,11 @@
       }, {
         "type" : "ReadingMetadata",
         "metadataKeys" : [ "m" ],
-        "producedType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)> 
NOT NULL"
+        "producedType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m` 
VARCHAR(2147483647)> NOT NULL"
       } ]
     },
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable2, project=[a, b], metadata=[m]]], fields=[a, b, m])",
+    "outputType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable2, project=[a, b], metadata=[m]]], fields=[a, b, 
$metadata$m])",
     "inputProperties" : [ ]
   }, {
     "id" : 2,
@@ -88,8 +88,8 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[a, b, m])"
+    "outputType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[a, b, $metadata$m])"
   } ],
   "edges" : [ {
     "source" : 1,
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
index deab0da75fd..b0874d69e86 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
@@ -111,8 +111,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], metadata=[$3], 
computed=[$4])
     <Resource name="optimized rel plan">
       <![CDATA[
 FlinkLogicalCalc(select=[a, b, c, metadata, computed])
-+- FlinkLogicalCalc(select=[a, b, Reinterpret(c) AS c, CAST(metadata_2 AS 
BIGINT) AS metadata, +(CAST(metadata_2 AS BIGINT), b) AS computed])
-   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, watermark=[-(c, CAST(+(CAST(metadata_2 AS BIGINT), +(CAST(metadata_2 
AS BIGINT), b)) AS INTERVAL SECOND))]]], fields=[a, b, c, metadata_2])
++- FlinkLogicalCalc(select=[a, b, Reinterpret(c) AS c, 
CAST($metadata$metadata_2 AS BIGINT) AS metadata, +(CAST($metadata$metadata_2 
AS BIGINT), b) AS computed])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, watermark=[-(c, CAST(+(CAST($metadata$metadata_2 AS BIGINT), 
+(CAST($metadata$metadata_2 AS BIGINT), b)) AS INTERVAL SECOND))]]], fields=[a, 
b, c, $metadata$metadata_2])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
index d97e11194e1..abaf03610cc 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
@@ -16,13 +16,18 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
-  <TestCase name="testCanPushDownLocalHashAggWithGroup">
+  <TestCase name="testCannotPushDownLocalAggAfterLimitPushDown">
     <Resource name="sql">
       <![CDATA[SELECT
   sum(amount),
   name,
   type
-FROM inventory
+FROM (
+  SELECT
+    *
+  FROM inventory
+  LIMIT 100
+) t
   group by name, type]]>
     </Resource>
     <Resource name="ast">
@@ -30,7 +35,9 @@ FROM inventory
 LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
 +- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
    +- LogicalProject(name=[$1], type=[$4], amount=[$2])
-      +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory]])
+      +- LogicalSort(fetch=[100])
+         +- LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], 
type=[$4])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -38,14 +45,19 @@ LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
 Calc(select=[EXPR$0, name, type])
 +- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type, 
Final_SUM(sum$0) AS EXPR$0])
    +- Exchange(distribution=[hash[name, type]])
-      +- TableSourceScan(table=[[default_catalog, default_database, inventory, 
project=[name, type, amount], metadata=[], aggregates=[grouping=[name,type], 
aggFunctions=[LongSumAggFunction(amount)]]]], fields=[name, type, sum$0])
+      +- LocalHashAggregate(groupBy=[name, type], select=[name, type, 
Partial_SUM(amount) AS sum$0])
+         +- Calc(select=[name, type, amount])
+            +- Limit(offset=[0], fetch=[100], global=[true])
+               +- Exchange(distribution=[single])
+                  +- Limit(offset=[0], fetch=[100], global=[false])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, inventory, limit=[100]]], fields=[id, name, amount, price, 
type])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testDisablePushDownLocalAgg">
+  <TestCase name="testCannotPushDownLocalAggWithUDAF">
     <Resource name="sql">
       <![CDATA[SELECT
-  sum(amount),
+  udaf_collect(amount),
   name,
   type
 FROM inventory
@@ -54,7 +66,7 @@ FROM inventory
     <Resource name="ast">
       <![CDATA[
 LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
-+- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[udaf_collect($2)])
    +- LogicalProject(name=[$1], type=[$4], amount=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory]])
 ]]>
@@ -62,109 +74,185 @@ LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[EXPR$0, name, type])
-+- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type, 
Final_SUM(sum$0) AS EXPR$0])
-   +- Exchange(distribution=[hash[name, type]])
-      +- LocalHashAggregate(groupBy=[name, type], select=[name, type, 
Partial_SUM(amount) AS sum$0])
-         +- TableSourceScan(table=[[default_catalog, default_database, 
inventory, project=[name, type, amount], metadata=[]]], fields=[name, type, 
amount])
++- SortAggregate(isMerge=[true], groupBy=[name, type], select=[name, type, 
Final_udaf_collect(EXPR$0) AS EXPR$0])
+   +- Sort(orderBy=[name ASC, type ASC])
+      +- Exchange(distribution=[hash[name, type]])
+         +- LocalSortAggregate(groupBy=[name, type], select=[name, type, 
Partial_udaf_collect(amount) AS EXPR$0])
+            +- Sort(orderBy=[name ASC, type ASC])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
inventory, project=[name, type, amount], metadata=[]]], fields=[name, type, 
amount])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCanPushDownLocalHashAggWithoutGroup">
+  <TestCase name="testCannotPushDownLocalAggWithUnsupportedDataTypes">
+    <Resource name="sql">
+      <![CDATA[SELECT
+  max(name),
+  type
+FROM inventory
+  group by type]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1], type=[$0])
++- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)])
+   +- LogicalProject(type=[$4], name=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[EXPR$0, type])
++- SortAggregate(isMerge=[true], groupBy=[type], select=[type, 
Final_MAX(max$0) AS EXPR$0])
+   +- Sort(orderBy=[type ASC])
+      +- Exchange(distribution=[hash[type]])
+         +- LocalSortAggregate(groupBy=[type], select=[type, Partial_MAX(name) 
AS max$0])
+            +- Sort(orderBy=[type ASC])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
inventory, project=[type, name], metadata=[]]], fields=[type, name])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCannotPushDownWithArgFilter">
     <Resource name="sql">
       <![CDATA[SELECT
   min(id),
   max(amount),
   sum(price),
-  avg(price),
-  count(id)
-FROM inventory]]>
+  count(id) FILTER(WHERE id > 100),
+  name
+FROM inventory
+  group by name]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[MIN($0)], EXPR$1=[MAX($1)], 
EXPR$2=[SUM($2)], EXPR$3=[AVG($2)], EXPR$4=[COUNT($0)])
-+- LogicalProject(id=[$0], amount=[$2], price=[$3])
-   +- LogicalTableScan(table=[[default_catalog, default_database, inventory]])
+LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], name=[$0])
++- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)], EXPR$1=[MAX($2)], 
EXPR$2=[SUM($3)], EXPR$3=[COUNT($1) FILTER $4])
+   +- LogicalProject(name=[$1], id=[$0], amount=[$2], price=[$3], $f4=[IS 
TRUE(>($0, 100))])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS EXPR$0, 
Final_MAX(max$1) AS EXPR$1, Final_SUM(sum$2) AS EXPR$2, Final_AVG(sum$3, 
count$4) AS EXPR$3, Final_COUNT(count$5) AS EXPR$4])
-+- Exchange(distribution=[single])
-   +- TableSourceScan(table=[[default_catalog, default_database, inventory, 
project=[id, amount, price], metadata=[], aggregates=[grouping=[], 
aggFunctions=[LongMinAggFunction(id),LongMaxAggFunction(amount),LongSumAggFunction(price),LongSum0AggFunction(price),CountAggFunction(price),CountAggFunction(id)]]]],
 fields=[min$0, max$1, sum$2, sum$3, count$4, count$5])
+Calc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3, name])
++- HashAggregate(isMerge=[true], groupBy=[name], select=[name, 
Final_MIN(min$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1, Final_SUM(sum$2) AS 
EXPR$2, Final_COUNT(count$3) AS EXPR$3])
+   +- Exchange(distribution=[hash[name]])
+      +- LocalHashAggregate(groupBy=[name], select=[name, Partial_MIN(id) AS 
min$0, Partial_MAX(amount) AS max$1, Partial_SUM(price) AS sum$2, 
Partial_COUNT(id) FILTER $f4 AS count$3])
+         +- Calc(select=[name, id, amount, price, IS TRUE(>(id, 100)) AS $f4])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
inventory, project=[name, id, amount, price], metadata=[]]], fields=[name, id, 
amount, price])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCanPushDownLocalSortAggWithoutSort">
+  <TestCase name="testCannotPushDownWithColumnExpression">
     <Resource name="sql">
       <![CDATA[SELECT
-  min(id),
+  min(amount + price),
   max(amount),
   sum(price),
-  avg(price),
-  count(id)
-FROM inventory]]>
+  count(id),
+  name
+FROM inventory
+  group by name]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[MIN($0)], EXPR$1=[MAX($1)], 
EXPR$2=[SUM($2)], EXPR$3=[AVG($2)], EXPR$4=[COUNT($0)])
-+- LogicalProject(id=[$0], amount=[$2], price=[$3])
-   +- LogicalTableScan(table=[[default_catalog, default_database, inventory]])
+LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], name=[$0])
++- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)], EXPR$1=[MAX($2)], 
EXPR$2=[SUM($3)], EXPR$3=[COUNT($4)])
+   +- LogicalProject(name=[$1], $f1=[+($2, $3)], amount=[$2], price=[$3], 
id=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-SortAggregate(isMerge=[true], select=[Final_MIN(min$0) AS EXPR$0, 
Final_MAX(max$1) AS EXPR$1, Final_SUM(sum$2) AS EXPR$2, Final_AVG(sum$3, 
count$4) AS EXPR$3, Final_COUNT(count$5) AS EXPR$4])
-+- Exchange(distribution=[single])
-   +- TableSourceScan(table=[[default_catalog, default_database, inventory, 
project=[id, amount, price], metadata=[], aggregates=[grouping=[], 
aggFunctions=[LongMinAggFunction(id),LongMaxAggFunction(amount),LongSumAggFunction(price),LongSum0AggFunction(price),CountAggFunction(price),CountAggFunction(id)]]]],
 fields=[min$0, max$1, sum$2, sum$3, count$4, count$5])
+Calc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3, name])
++- HashAggregate(isMerge=[true], groupBy=[name], select=[name, 
Final_MIN(min$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1, Final_SUM(sum$2) AS 
EXPR$2, Final_COUNT(count$3) AS EXPR$3])
+   +- Exchange(distribution=[hash[name]])
+      +- LocalHashAggregate(groupBy=[name], select=[name, Partial_MIN($f1) AS 
min$0, Partial_MAX(amount) AS max$1, Partial_SUM(price) AS sum$2, 
Partial_COUNT(id) AS count$3])
+         +- Calc(select=[name, +(amount, price) AS $f1, amount, price, id])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
inventory, project=[name, amount, price, id], metadata=[]]], fields=[name, 
amount, price, id])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCanPushDownLocalSortAggWithSort">
+  <TestCase name="testCannotPushDownWithUnsupportedAggFunction">
     <Resource name="sql">
       <![CDATA[SELECT
-  sum(amount),
-  name,
-  type
+  min(id),
+  max(amount),
+  sum(price),
+  count(distinct id),
+  name
 FROM inventory
-  group by name, type]]>
+  group by name]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
-+- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
-   +- LogicalProject(name=[$1], type=[$4], amount=[$2])
+LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], name=[$0])
++- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)], EXPR$1=[MAX($2)], 
EXPR$2=[SUM($3)], EXPR$3=[COUNT(DISTINCT $1)])
+   +- LogicalProject(name=[$1], id=[$0], amount=[$2], price=[$3])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[EXPR$0, name, type])
-+- SortAggregate(isMerge=[true], groupBy=[name, type], select=[name, type, 
Final_SUM(sum$0) AS EXPR$0])
-   +- Sort(orderBy=[name ASC, type ASC])
-      +- Exchange(distribution=[hash[name, type]])
-         +- TableSourceScan(table=[[default_catalog, default_database, 
inventory, project=[name, type, amount], metadata=[], 
aggregates=[grouping=[name,type], aggFunctions=[LongSumAggFunction(amount)]]]], 
fields=[name, type, sum$0])
+Calc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3, name])
++- HashAggregate(isMerge=[true], groupBy=[name], select=[name, 
Final_MIN(min$0) AS EXPR$0, Final_MIN(min$1) AS EXPR$1, Final_MIN(min$2) AS 
EXPR$2, Final_COUNT(count$3) AS EXPR$3])
+   +- Exchange(distribution=[hash[name]])
+      +- LocalHashAggregate(groupBy=[name], select=[name, Partial_MIN(EXPR$0) 
FILTER $g_1 AS min$0, Partial_MIN(EXPR$1) FILTER $g_1 AS min$1, 
Partial_MIN(EXPR$2) FILTER $g_1 AS min$2, Partial_COUNT(id) FILTER $g_0 AS 
count$3])
+         +- Calc(select=[name, id, EXPR$0, EXPR$1, EXPR$2, =(CASE(=($e, 0), 0, 
1), 0) AS $g_0, =(CASE(=($e, 0), 0, 1), 1) AS $g_1])
+            +- HashAggregate(isMerge=[true], groupBy=[name, id, $e], 
select=[name, id, $e, Final_MIN(min$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1, 
Final_SUM(sum$2) AS EXPR$2])
+               +- Exchange(distribution=[hash[name, id, $e]])
+                  +- LocalHashAggregate(groupBy=[name, id, $e], select=[name, 
id, $e, Partial_MIN(id_0) AS min$0, Partial_MAX(amount) AS max$1, 
Partial_SUM(price) AS sum$2])
+                     +- Expand(projects=[{name, id, amount, price, 0 AS $e, id 
AS id_0}, {name, null AS id, amount, price, 1 AS $e, id AS id_0}])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, inventory, project=[name, id, amount, price], metadata=[]]], 
fields=[name, id, amount, price])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCanPushDownLocalAggWithAuxGrouping">
+  <TestCase name="testCannotPushDownWithWindowAggFunction">
     <Resource name="sql">
       <![CDATA[SELECT
-  id, name, count(*)
-FROM inventory_meta
-  group by id, name]]>
+  id,
+  amount,
+  sum(price) over (partition by name),
+  name
+FROM inventory]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()])
-+- LogicalProject(id=[$0], name=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory_meta]])
+LogicalProject(id=[$0], amount=[$2], EXPR$2=[CASE(>(COUNT($3) OVER (PARTITION 
BY $1), 0), $SUM0($3) OVER (PARTITION BY $1), null:BIGINT)], name=[$1])
++- LogicalTableScan(table=[[default_catalog, default_database, inventory]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-HashAggregate(isMerge=[true], groupBy=[id], auxGrouping=[name], select=[id, 
name, Final_COUNT(count1$0) AS EXPR$2])
-+- Exchange(distribution=[hash[id]])
-   +- TableSourceScan(table=[[default_catalog, default_database, 
inventory_meta, project=[id, name], metadata=[], 
aggregates=[grouping=[id,name], aggFunctions=[Count1AggFunction()]]]], 
fields=[id, name, count1$0])
+Calc(select=[id, amount, CASE(>(w0$o0, 0), w0$o1, null:BIGINT) AS EXPR$2, 
name])
++- OverAggregate(partitionBy=[name], window#0=[COUNT(price) AS w0$o0, 
$SUM0(price) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING], select=[id, name, amount, price, w0$o0, w0$o1])
+   +- Sort(orderBy=[name ASC])
+      +- Exchange(distribution=[hash[name]])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
inventory, project=[id, name, amount, price], metadata=[]]], fields=[id, name, 
amount, price])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCanPushDownLocalHashAggWithGroup">
+    <Resource name="sql">
+      <![CDATA[SELECT
+  sum(amount),
+  name,
+  type
+FROM inventory
+  group by name, type]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+   +- LogicalProject(name=[$1], type=[$4], amount=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[EXPR$0, name, type])
++- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type, 
Final_SUM(sum$0) AS EXPR$0])
+   +- Exchange(distribution=[hash[name, type]])
+      +- TableSourceScan(table=[[default_catalog, default_database, inventory, 
project=[name, type, amount], metadata=[], aggregates=[grouping=[name,type], 
aggFunctions=[LongSumAggFunction(amount)]]]], fields=[name, type, sum$0])
 ]]>
     </Resource>
   </TestCase>
@@ -193,6 +281,28 @@ Calc(select=[EXPR$0, name, type])
 +- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type, 
Final_SUM(sum$0) AS EXPR$0])
    +- Exchange(distribution=[hash[name, type]])
       +- TableSourceScan(table=[[default_catalog, default_database, inventory, 
filter=[=(id, 123:BIGINT)], project=[name, type, amount], metadata=[], 
aggregates=[grouping=[name,type], aggFunctions=[LongSumAggFunction(amount)]]]], 
fields=[name, type, sum$0])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCanPushDownLocalAggWithAuxGrouping">
+    <Resource name="sql">
+      <![CDATA[SELECT
+  id, name, count(*)
+FROM inventory_meta
+  group by id, name]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()])
++- LogicalProject(id=[$0], name=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory_meta]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashAggregate(isMerge=[true], groupBy=[id], auxGrouping=[name], select=[id, 
name, Final_COUNT(count1$0) AS EXPR$2])
++- Exchange(distribution=[hash[id]])
+   +- TableSourceScan(table=[[default_catalog, default_database, 
inventory_meta, project=[id, name], metadata=[], 
aggregates=[grouping=[id,name], aggFunctions=[Count1AggFunction()]]]], 
fields=[id, name, count1$0])
 ]]>
     </Resource>
   </TestCase>
@@ -222,7 +332,7 @@ LogicalProject(EXPR$0=[$2], EXPR$1=[$3], name=[$0], 
type=[$1])
 Calc(select=[EXPR$0, EXPR$1, name, type])
 +- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type, 
Final_SUM(sum$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1])
    +- Exchange(distribution=[hash[name, type]])
-      +- TableSourceScan(table=[[default_catalog, default_database, 
inventory_meta, filter=[=(id, 123:BIGINT)], project=[name, type, amount], 
metadata=[metadata_1], aggregates=[grouping=[name,type], 
aggFunctions=[LongSumAggFunction(amount),LongMaxAggFunction(metadata_1)]]]], 
fields=[name, type, sum$0, max$1])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
inventory_meta, filter=[=(id, 123:BIGINT)], project=[name, type, amount], 
metadata=[metadata_1], aggregates=[grouping=[name,type], 
aggFunctions=[LongSumAggFunction(amount),LongMaxAggFunction($metadata$metadata_1)]]]],
 fields=[name, type, sum$0, max$1])
 ]]>
     </Resource>
   </TestCase>
@@ -254,46 +364,39 @@ Calc(select=[EXPR$0, type, name])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCanPushDownLocalAggWithoutProjectionPushDown">
+  <TestCase name="testCanPushDownLocalHashAggWithoutGroup">
     <Resource name="sql">
       <![CDATA[SELECT
-  sum(amount),
-  name,
-  type
-FROM inventory_no_proj
-  where id = 123
-  group by name, type]]>
+  min(id),
+  max(amount),
+  sum(price),
+  avg(price),
+  count(id)
+FROM inventory]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
-+- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
-   +- LogicalProject(name=[$1], type=[$4], amount=[$2])
-      +- LogicalFilter(condition=[=($0, 123)])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory_no_proj]])
+LogicalAggregate(group=[{}], EXPR$0=[MIN($0)], EXPR$1=[MAX($1)], 
EXPR$2=[SUM($2)], EXPR$3=[AVG($2)], EXPR$4=[COUNT($0)])
++- LogicalProject(id=[$0], amount=[$2], price=[$3])
+   +- LogicalTableScan(table=[[default_catalog, default_database, inventory]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[EXPR$0, name, type])
-+- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type, 
Final_SUM(sum$0) AS EXPR$0])
-   +- Exchange(distribution=[hash[name, type]])
-      +- TableSourceScan(table=[[default_catalog, default_database, 
inventory_no_proj, filter=[=(id, 123:BIGINT)], 
aggregates=[grouping=[name,type], aggFunctions=[LongSumAggFunction(amount)]]]], 
fields=[name, type, sum$0])
+HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS EXPR$0, 
Final_MAX(max$1) AS EXPR$1, Final_SUM(sum$2) AS EXPR$2, Final_AVG(sum$3, 
count$4) AS EXPR$3, Final_COUNT(count$5) AS EXPR$4])
++- Exchange(distribution=[single])
+   +- TableSourceScan(table=[[default_catalog, default_database, inventory, 
project=[id, amount, price], metadata=[], aggregates=[grouping=[], 
aggFunctions=[LongMinAggFunction(id),LongMaxAggFunction(amount),LongSumAggFunction(price),LongSum0AggFunction(price),CountAggFunction(price),CountAggFunction(id)]]]],
 fields=[min$0, max$1, sum$2, sum$3, count$4, count$5])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCannotPushDownLocalAggAfterLimitPushDown">
+  <TestCase name="testCanPushDownLocalAggWithoutProjectionPushDown">
     <Resource name="sql">
       <![CDATA[SELECT
   sum(amount),
   name,
   type
-FROM (
-  SELECT
-    *
-  FROM inventory
-  LIMIT 100
-) t
+FROM inventory_no_proj
+  where id = 123
   group by name, type]]>
     </Resource>
     <Resource name="ast">
@@ -301,9 +404,8 @@ FROM (
 LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
 +- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
    +- LogicalProject(name=[$1], type=[$4], amount=[$2])
-      +- LogicalSort(fetch=[100])
-         +- LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], 
type=[$4])
-            +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory]])
+      +- LogicalFilter(condition=[=($0, 123)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory_no_proj]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -311,188 +413,86 @@ LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
 Calc(select=[EXPR$0, name, type])
 +- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type, 
Final_SUM(sum$0) AS EXPR$0])
    +- Exchange(distribution=[hash[name, type]])
-      +- LocalHashAggregate(groupBy=[name, type], select=[name, type, 
Partial_SUM(amount) AS sum$0])
-         +- Calc(select=[name, type, amount])
-            +- Limit(offset=[0], fetch=[100], global=[true])
-               +- Exchange(distribution=[single])
-                  +- Limit(offset=[0], fetch=[100], global=[false])
-                     +- TableSourceScan(table=[[default_catalog, 
default_database, inventory, limit=[100]]], fields=[id, name, amount, price, 
type])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testCannotPushDownLocalAggWithUDAF">
-    <Resource name="sql">
-      <![CDATA[SELECT
-  udaf_collect(amount),
-  name,
-  type
-FROM inventory
-  group by name, type]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
-+- LogicalAggregate(group=[{0, 1}], EXPR$0=[udaf_collect($2)])
-   +- LogicalProject(name=[$1], type=[$4], amount=[$2])
-      +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[EXPR$0, name, type])
-+- SortAggregate(isMerge=[true], groupBy=[name, type], select=[name, type, 
Final_udaf_collect(EXPR$0) AS EXPR$0])
-   +- Sort(orderBy=[name ASC, type ASC])
-      +- Exchange(distribution=[hash[name, type]])
-         +- LocalSortAggregate(groupBy=[name, type], select=[name, type, 
Partial_udaf_collect(amount) AS EXPR$0])
-            +- Sort(orderBy=[name ASC, type ASC])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
inventory, project=[name, type, amount], metadata=[]]], fields=[name, type, 
amount])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testCannotPushDownLocalAggWithUnsupportedDataTypes">
-    <Resource name="sql">
-      <![CDATA[SELECT
-  max(name),
-  type
-FROM inventory
-  group by type]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(EXPR$0=[$1], type=[$0])
-+- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)])
-   +- LogicalProject(type=[$4], name=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[EXPR$0, type])
-+- SortAggregate(isMerge=[true], groupBy=[type], select=[type, 
Final_MAX(max$0) AS EXPR$0])
-   +- Sort(orderBy=[type ASC])
-      +- Exchange(distribution=[hash[type]])
-         +- LocalSortAggregate(groupBy=[type], select=[type, Partial_MAX(name) 
AS max$0])
-            +- Sort(orderBy=[type ASC])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
inventory, project=[type, name], metadata=[]]], fields=[type, name])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
inventory_no_proj, filter=[=(id, 123:BIGINT)], 
aggregates=[grouping=[name,type], aggFunctions=[LongSumAggFunction(amount)]]]], 
fields=[name, type, sum$0])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCannotPushDownWithColumnExpression">
+  <TestCase name="testCanPushDownLocalSortAggWithoutSort">
     <Resource name="sql">
       <![CDATA[SELECT
-  min(amount + price),
+  min(id),
   max(amount),
   sum(price),
-  count(id),
-  name
-FROM inventory
-  group by name]]>
+  avg(price),
+  count(id)
+FROM inventory]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], name=[$0])
-+- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)], EXPR$1=[MAX($2)], 
EXPR$2=[SUM($3)], EXPR$3=[COUNT($4)])
-   +- LogicalProject(name=[$1], $f1=[+($2, $3)], amount=[$2], price=[$3], 
id=[$0])
-      +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory]])
+LogicalAggregate(group=[{}], EXPR$0=[MIN($0)], EXPR$1=[MAX($1)], 
EXPR$2=[SUM($2)], EXPR$3=[AVG($2)], EXPR$4=[COUNT($0)])
++- LogicalProject(id=[$0], amount=[$2], price=[$3])
+   +- LogicalTableScan(table=[[default_catalog, default_database, inventory]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3, name])
-+- HashAggregate(isMerge=[true], groupBy=[name], select=[name, 
Final_MIN(min$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1, Final_SUM(sum$2) AS 
EXPR$2, Final_COUNT(count$3) AS EXPR$3])
-   +- Exchange(distribution=[hash[name]])
-      +- LocalHashAggregate(groupBy=[name], select=[name, Partial_MIN($f1) AS 
min$0, Partial_MAX(amount) AS max$1, Partial_SUM(price) AS sum$2, 
Partial_COUNT(id) AS count$3])
-         +- Calc(select=[name, +(amount, price) AS $f1, amount, price, id])
-            +- TableSourceScan(table=[[default_catalog, default_database, 
inventory, project=[name, amount, price, id], metadata=[]]], fields=[name, 
amount, price, id])
+SortAggregate(isMerge=[true], select=[Final_MIN(min$0) AS EXPR$0, 
Final_MAX(max$1) AS EXPR$1, Final_SUM(sum$2) AS EXPR$2, Final_AVG(sum$3, 
count$4) AS EXPR$3, Final_COUNT(count$5) AS EXPR$4])
++- Exchange(distribution=[single])
+   +- TableSourceScan(table=[[default_catalog, default_database, inventory, 
project=[id, amount, price], metadata=[], aggregates=[grouping=[], 
aggFunctions=[LongMinAggFunction(id),LongMaxAggFunction(amount),LongSumAggFunction(price),LongSum0AggFunction(price),CountAggFunction(price),CountAggFunction(id)]]]],
 fields=[min$0, max$1, sum$2, sum$3, count$4, count$5])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCannotPushDownWithUnsupportedAggFunction">
+  <TestCase name="testDisablePushDownLocalAgg">
     <Resource name="sql">
       <![CDATA[SELECT
-  min(id),
-  max(amount),
-  sum(price),
-  count(distinct id),
-  name
+  sum(amount),
+  name,
+  type
 FROM inventory
-  group by name]]>
+  group by name, type]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], name=[$0])
-+- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)], EXPR$1=[MAX($2)], 
EXPR$2=[SUM($3)], EXPR$3=[COUNT(DISTINCT $1)])
-   +- LogicalProject(name=[$1], id=[$0], amount=[$2], price=[$3])
+LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+   +- LogicalProject(name=[$1], type=[$4], amount=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3, name])
-+- HashAggregate(isMerge=[true], groupBy=[name], select=[name, 
Final_MIN(min$0) AS EXPR$0, Final_MIN(min$1) AS EXPR$1, Final_MIN(min$2) AS 
EXPR$2, Final_COUNT(count$3) AS EXPR$3])
-   +- Exchange(distribution=[hash[name]])
-      +- LocalHashAggregate(groupBy=[name], select=[name, Partial_MIN(EXPR$0) 
FILTER $g_1 AS min$0, Partial_MIN(EXPR$1) FILTER $g_1 AS min$1, 
Partial_MIN(EXPR$2) FILTER $g_1 AS min$2, Partial_COUNT(id) FILTER $g_0 AS 
count$3])
-         +- Calc(select=[name, id, EXPR$0, EXPR$1, EXPR$2, =(CASE(=($e, 0), 0, 
1), 0) AS $g_0, =(CASE(=($e, 0), 0, 1), 1) AS $g_1])
-            +- HashAggregate(isMerge=[true], groupBy=[name, id, $e], 
select=[name, id, $e, Final_MIN(min$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1, 
Final_SUM(sum$2) AS EXPR$2])
-               +- Exchange(distribution=[hash[name, id, $e]])
-                  +- LocalHashAggregate(groupBy=[name, id, $e], select=[name, 
id, $e, Partial_MIN(id_0) AS min$0, Partial_MAX(amount) AS max$1, 
Partial_SUM(price) AS sum$2])
-                     +- Expand(projects=[{name, id, amount, price, 0 AS $e, id 
AS id_0}, {name, null AS id, amount, price, 1 AS $e, id AS id_0}])
-                        +- TableSourceScan(table=[[default_catalog, 
default_database, inventory, project=[name, id, amount, price], metadata=[]]], 
fields=[name, id, amount, price])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testCannotPushDownWithWindowAggFunction">
-    <Resource name="sql">
-      <![CDATA[SELECT
-  id,
-  amount,
-  sum(price) over (partition by name),
-  name
-FROM inventory]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(id=[$0], amount=[$2], EXPR$2=[CASE(>(COUNT($3) OVER (PARTITION 
BY $1), 0), $SUM0($3) OVER (PARTITION BY $1), null:BIGINT)], name=[$1])
-+- LogicalTableScan(table=[[default_catalog, default_database, inventory]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[id, amount, CASE(>(w0$o0, 0), w0$o1, null:BIGINT) AS EXPR$2, 
name])
-+- OverAggregate(partitionBy=[name], window#0=[COUNT(price) AS w0$o0, 
$SUM0(price) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING], select=[id, name, amount, price, w0$o0, w0$o1])
-   +- Sort(orderBy=[name ASC])
-      +- Exchange(distribution=[hash[name]])
-         +- TableSourceScan(table=[[default_catalog, default_database, 
inventory, project=[id, name, amount, price], metadata=[]]], fields=[id, name, 
amount, price])
+Calc(select=[EXPR$0, name, type])
++- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type, 
Final_SUM(sum$0) AS EXPR$0])
+   +- Exchange(distribution=[hash[name, type]])
+      +- LocalHashAggregate(groupBy=[name, type], select=[name, type, 
Partial_SUM(amount) AS sum$0])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
inventory, project=[name, type, amount], metadata=[]]], fields=[name, type, 
amount])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCannotPushDownWithArgFilter">
+  <TestCase name="testCanPushDownLocalSortAggWithSort">
     <Resource name="sql">
       <![CDATA[SELECT
-  min(id),
-  max(amount),
-  sum(price),
-  count(id) FILTER(WHERE id > 100),
-  name
+  sum(amount),
+  name,
+  type
 FROM inventory
-  group by name]]>
+  group by name, type]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], name=[$0])
-+- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)], EXPR$1=[MAX($2)], 
EXPR$2=[SUM($3)], EXPR$3=[COUNT($1) FILTER $4])
-   +- LogicalProject(name=[$1], id=[$0], amount=[$2], price=[$3], $f4=[IS 
TRUE(>($0, 100))])
+LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+   +- LogicalProject(name=[$1], type=[$4], amount=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
inventory]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3, name])
-+- HashAggregate(isMerge=[true], groupBy=[name], select=[name, 
Final_MIN(min$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1, Final_SUM(sum$2) AS 
EXPR$2, Final_COUNT(count$3) AS EXPR$3])
-   +- Exchange(distribution=[hash[name]])
-      +- LocalHashAggregate(groupBy=[name], select=[name, Partial_MIN(id) AS 
min$0, Partial_MAX(amount) AS max$1, Partial_SUM(price) AS sum$2, 
Partial_COUNT(id) FILTER $f4 AS count$3])
-         +- Calc(select=[name, id, amount, price, IS TRUE(>(id, 100)) AS $f4])
-            +- TableSourceScan(table=[[default_catalog, default_database, 
inventory, project=[name, id, amount, price], metadata=[]]], fields=[name, id, 
amount, price])
+Calc(select=[EXPR$0, name, type])
++- SortAggregate(isMerge=[true], groupBy=[name, type], select=[name, type, 
Final_SUM(sum$0) AS EXPR$0])
+   +- Sort(orderBy=[name ASC, type ASC])
+      +- Exchange(distribution=[hash[name, type]])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
inventory, project=[name, type, amount], metadata=[], 
aggregates=[grouping=[name,type], aggFunctions=[LongSumAggFunction(amount)]]]], 
fields=[name, type, sum$0])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
index b6fb8db11bd..88d4eba4d39 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
@@ -144,7 +144,7 @@ LogicalProject(a=[$0], b=[$1])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b])
-+- TableSourceScan(table=[[default_catalog, default_database, MyLtzTable, 
project=[a, b], metadata=[originTime], watermark=[TO_TIMESTAMP_LTZ(originTime, 
3)]]], fields=[a, b, originTime])
++- TableSourceScan(table=[[default_catalog, default_database, MyLtzTable, 
project=[a, b], metadata=[originTime], 
watermark=[TO_TIMESTAMP_LTZ($metadata$originTime, 3)]]], fields=[a, b, 
$metadata$originTime])
 ]]>
     </Resource>
   </TestCase>
@@ -182,7 +182,7 @@ LogicalProject(a=[$0], b=[$1])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[a, b], metadata=[originTime], 
watermark=[TO_TIMESTAMP(FROM_UNIXTIME(/(originTime, 1000)), 
_UTF-16LE'yyyy-MM-dd HH:mm:ss')]]], fields=[a, b, originTime])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[a, b], metadata=[originTime], 
watermark=[TO_TIMESTAMP(FROM_UNIXTIME(/($metadata$originTime, 1000)), 
_UTF-16LE'yyyy-MM-dd HH:mm:ss')]]], fields=[a, b, $metadata$originTime])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 8759c6acf50..7c252f7ae31 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -192,8 +192,8 @@ LogicalProject(a=[$0], other_metadata=[CAST($4):INTEGER], 
b=[$1], c=[$2], metada
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, CAST(metadata_3 AS INTEGER) AS other_metadata, b, c, 
metadata_1, UPPER(metadata_1) AS computed])
-+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]], 
fields=[a, b, c, metadata_1, metadata_3])
+Calc(select=[a, CAST($metadata$metadata_3 AS INTEGER) AS other_metadata, b, c, 
$metadata$metadata_1 AS metadata_1, UPPER($metadata$metadata_1) AS computed])
++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]], 
fields=[a, b, c, $metadata$metadata_1, $metadata$metadata_3])
 ]]>
     </Resource>
   </TestCase>
@@ -209,8 +209,25 @@ LogicalProject(b=[$1], other_metadata=[CAST($4):INTEGER])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[b, CAST(metadata_3 AS INTEGER) AS other_metadata])
-+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, 
project=[b], metadata=[metadata_3]]], fields=[b, metadata_3])
+Calc(select=[b, CAST($metadata$metadata_3 AS INTEGER) AS other_metadata])
++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, 
project=[b], metadata=[metadata_3]]], fields=[b, $metadata$metadata_3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDDLWithMetadataThatConflictsWithPhysicalColumn">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MetadataTable]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(timestamp=[$0], metadata_timestamp=[$2], other=[$1], 
computed_other=[UPPER($1)], computed_timestamp=[CAST($2):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"])
++- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[timestamp, $metadata$timestamp AS metadata_timestamp, 
$metadata$other AS other, UPPER($metadata$other) AS computed_other, 
CAST($metadata$timestamp AS VARCHAR(2147483647)) AS computed_timestamp])
++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]], 
fields=[timestamp, $metadata$other, $metadata$timestamp])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
index 78d2822cd4a..41f48c1cc4e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
@@ -489,8 +489,23 @@ 
LogicalSink(table=[default_catalog.default_database.MetadataTable], fields=[a, b
     <Resource name="optimized rel plan">
       <![CDATA[
 Sink(table=[default_catalog.default_database.MetadataTable], fields=[a, b, c, 
metadata_1, metadata_2])
-+- Calc(select=[a, b, c, metadata_1, CAST(CAST(metadata_2 AS INTEGER) AS 
BIGINT) AS metadata_2])
-   +- TableSourceScan(table=[[default_catalog, default_database, 
MetadataTable, project=[a, b, c], metadata=[metadata_1, metadata_2]]], 
fields=[a, b, c, metadata_1, metadata_2])
++- Calc(select=[a, b, c, $metadata$metadata_1 AS metadata_1, 
CAST(CAST($metadata$metadata_2 AS INTEGER) AS BIGINT) AS metadata_2])
+   +- TableSourceScan(table=[[default_catalog, default_database, 
MetadataTable, project=[a, b, c], metadata=[metadata_1, metadata_2]]], 
fields=[a, b, c, $metadata$metadata_1, $metadata$metadata_2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMetadataColumnThatConflictsWithPhysicalColumn">
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.MetadataTable], 
fields=[metadata_1, metadata_2, other, metadata_23])
++- LogicalProject(metadata_1=[$0], metadata_2=[$1], other=[$2], 
metadata_23=[$4])
+   +- LogicalTableScan(table=[[default_catalog, default_database, 
MetadataTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.MetadataTable], 
fields=[metadata_1, metadata_2, other, $metadata$metadata_2])
++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, 
project=[metadata_1, metadata_2, other], metadata=[metadata_2]]], 
fields=[metadata_1, metadata_2, other, $metadata$metadata_2])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index f8009ff2197..648a40ad94e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -79,8 +79,8 @@ LogicalProject(id=[$0], nested1=[$1.nested1], 
results=[+(+($1.nested1.value, $1.
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + 
deepNested_nested2_num) + metadata_1) AS results])
-+- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, 
deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]], 
fields=[id, deepNested_nested1, deepNested_nested2_num, metadata_1])
+Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + 
deepNested_nested2_num) + $metadata$metadata_1) AS results])
++- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, 
deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]], 
fields=[id, deepNested_nested1, deepNested_nested2_num, $metadata$metadata_1])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index 63051cba01d..c380c7ac9c5 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -99,6 +99,25 @@ class TableScanTest extends TableTestBase {
     util.verifyExecPlan("SELECT * FROM MetadataTable")
   }
 
+  @Test
+  def testDDLWithMetadataThatConflictsWithPhysicalColumn(): Unit = {
+    util.addTable(
+      s"""
+         |CREATE TABLE MetadataTable (
+         |  `timestamp` TIMESTAMP(9),
+         |  `metadata_timestamp` TIMESTAMP(0) METADATA FROM 'timestamp',
+         |  `other` STRING METADATA,
+         |  `computed_other` AS UPPER(`other`),
+         |  `computed_timestamp` AS CAST(`metadata_timestamp` AS STRING)
+         |) WITH (
+         |  'connector' = 'values',
+         |  'bounded' = 'false',
+         |  'readable-metadata' = 'timestamp:TIMESTAMP(0), other:STRING'
+         |)
+       """.stripMargin)
+    util.verifyExecPlan("SELECT * FROM MetadataTable")
+  }
+
   @Test
   def testDDLWithMetadataColumnProjectionPushDown(): Unit = {
     // tests reordering, skipping, and casting of metadata
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index e8bfaeef453..f568041691f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -429,6 +429,35 @@ class TableSinkTest extends TableTestBase {
     util.verifyRelPlan(stmtSet)
   }
 
+  @Test
+  def testMetadataColumnThatConflictsWithPhysicalColumn(): Unit = {
+    util.addTable(
+      s"""
+         |CREATE TABLE MetadataTable (
+         |  `metadata_1` DOUBLE,
+         |  `m_1` STRING METADATA FROM 'metadata_1' VIRTUAL,
+         |  `m_2` BIGINT METADATA FROM 'metadata_2',
+         |  `metadata_2` DOUBLE,
+         |  `other` STRING
+         |) WITH (
+         |  'connector' = 'values',
+         |  'readable-metadata' = 'metadata_1:STRING, metadata_2:BIGINT',
+         |  'writable-metadata' = 'metadata_1:STRING, metadata_2:BIGINT'
+         |)
+       """.stripMargin)
+
+    val sql =
+      """
+        |INSERT INTO MetadataTable
+        |SELECT `metadata_1`, `m_2`, `metadata_2`, `other`
+        |FROM MetadataTable
+        |""".stripMargin
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql(sql)
+
+    util.verifyRelPlan(stmtSet)
+  }
+
   @Test
   def testSinkDisorderChangeLogWithJoin(): Unit = {
     util.tableEnv.executeSql(

Reply via email to