This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 948f06374d0 [FLINK-26712][table-planner] Metadata keys should not
conflict with physical columns
948f06374d0 is described below
commit 948f06374d0c502437b56e25f6b3ec237fe7c720
Author: Timo Walther <[email protected]>
AuthorDate: Mon Mar 21 17:54:37 2022 +0100
[FLINK-26712][table-planner] Metadata keys should not conflict with
physical columns
This reduces the likelihood for name collisions between metadata columns
and physical
columns. It might break some connector implementations that used
SupportsReadable/WritingMetadata
and name-based column arithmetics. We don't recommend using name-based
column arithmetics but
index-based ones.
This closes #19236.
---
.../file/table/FileSystemTableSource.java | 34 +-
.../sink/abilities/SupportsWritingMetadata.java | 5 +-
.../source/abilities/SupportsReadingMetadata.java | 5 +-
.../table/planner/connectors/DynamicSinkUtils.java | 9 +-
.../planner/connectors/DynamicSourceUtils.java | 13 +-
.../PushProjectIntoTableSourceScanRule.java | 2 +
.../planner/factories/TestValuesTableFactory.java | 3 +-
.../PushProjectIntoTableSourceScanRuleTest.java | 4 +-
.../file/table/FileSystemTableSourceTest.xml | 4 +-
.../planner/plan/batch/sql/TableSourceTest.xml | 4 +-
.../testWritingMetadata.out | 2 +-
.../testReadingMetadata.out | 10 +-
.../PushWatermarkIntoTableSourceScanRuleTest.xml | 4 +-
.../PushLocalAggIntoTableSourceScanRuleTest.xml | 444 ++++++++++-----------
.../plan/stream/sql/SourceWatermarkTest.xml | 4 +-
.../planner/plan/stream/sql/TableScanTest.xml | 25 +-
.../planner/plan/stream/sql/TableSinkTest.xml | 19 +-
.../planner/plan/stream/sql/TableSourceTest.xml | 4 +-
.../planner/plan/stream/sql/TableScanTest.scala | 19 +
.../planner/plan/stream/sql/TableSinkTest.scala | 29 ++
20 files changed, 375 insertions(+), 268 deletions(-)
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
index 5fc8dd1e718..448a39618ed 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
@@ -64,8 +64,11 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static org.apache.flink.util.CollectionUtil.entry;
+
/** File system table source. */
@Internal
public class FileSystemTableSource extends AbstractFileSystemTable
@@ -109,20 +112,14 @@ public class FileSystemTableSource extends
AbstractFileSystemTable
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext)
{
- // When this table has no partition, just return a empty source.
+ // When this table has no partition, just return an empty source.
if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
return InputFormatProvider.of(new CollectionInputFormat<>(new
ArrayList<>(), null));
}
// Resolve metadata and make sure to filter out metadata not in the
producedDataType
final List<String> metadataKeys =
- DataType.getFieldNames(producedDataType).stream()
- .filter(
- ((this.metadataKeys == null)
- ? Collections.emptyList()
- : this.metadataKeys)
- ::contains)
- .collect(Collectors.toList());
+ this.metadataKeys == null ? Collections.emptyList() :
this.metadataKeys;
final List<ReadableFileInfo> metadataToExtract =
metadataKeys.stream().map(ReadableFileInfo::resolve).collect(Collectors.toList());
@@ -225,16 +222,27 @@ public class FileSystemTableSource extends
AbstractFileSystemTable
List<ReadableFileInfo> metadata,
List<String> partitionKeys) {
if (!metadata.isEmpty() || !partitionKeys.isEmpty()) {
+ final List<String> producedFieldNames =
DataType.getFieldNames(producedDataType);
+ final Map<String, FileInfoAccessor> metadataColumns =
+ IntStream.range(0, metadata.size())
+ .mapToObj(
+ i -> {
+ // Access metadata columns from the
back because the
+ // names are decided by the planner
+ final int columnPos =
+ producedFieldNames.size() -
metadata.size() + i;
+ return entry(
+
producedFieldNames.get(columnPos),
+ metadata.get(i).getAccessor());
+ })
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+
bulkFormat =
new FileInfoExtractorBulkFormat(
bulkFormat,
producedDataType,
context.createTypeInformation(producedDataType),
- metadata.stream()
- .collect(
- Collectors.toMap(
- ReadableFileInfo::getKey,
-
ReadableFileInfo::getAccessor)),
+ metadataColumns,
partitionKeys,
defaultPartName);
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java
index dd344950c1d..858bc05ef6c 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java
@@ -70,7 +70,7 @@ import java.util.Map;
* <pre>{@code
* // for t1 and t2
* ROW < i INT, s STRING, d DOUBLE >
// physical input
- * ROW < i INT, s STRING, d DOUBLE, timestamp TIMESTAMP(3) WITH LOCAL TIME
ZONE > // final input
+ * ROW < i INT, s STRING, d DOUBLE, $metadata$timestamp TIMESTAMP(3) WITH
LOCAL TIME ZONE > // final input
*
* // for t3
* ROW < i INT, s STRING, d DOUBLE >
// physical input
@@ -115,7 +115,8 @@ public interface SupportsWritingMetadata {
*
* @param metadataKeys a subset of the keys returned by {@link
#listWritableMetadata()}, ordered
* by the iteration order of returned map
- * @param consumedDataType the final input type of the sink
+ * @param consumedDataType the final input type of the sink, it is
intended to be only forwarded
+ * and the planner will decide on the field names to avoid collisions
* @see EncodingFormat#applyWritableMetadata(List)
*/
void applyWritableMetadata(List<String> metadataKeys, DataType
consumedDataType);
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
index dcf20353af4..ac462b668a9 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
@@ -78,7 +78,7 @@ import java.util.Map;
* <pre>{@code
* // for t1 and t2
* ROW < i INT, s STRING, d DOUBLE >
// physical output
- * ROW < i INT, s STRING, d DOUBLE, timestamp TIMESTAMP(3) WITH LOCAL TIME
ZONE > // final output
+ * ROW < i INT, s STRING, d DOUBLE, $metadata$timestamp TIMESTAMP(3) WITH
LOCAL TIME ZONE > // final output
* }</pre>
*/
@PublicEvolving
@@ -129,7 +129,8 @@ public interface SupportsReadingMetadata {
*
* @param metadataKeys a subset of the keys returned by {@link
#listReadableMetadata()}, ordered
* by the iteration order of returned map
- * @param producedDataType the final output type of the source
+ * @param producedDataType the final output type of the source, it is
intended to be only
+ * forwarded and the planner will decide on the field names to avoid
collisions
* @see DecodingFormat#applyReadableMetadata(List)
*/
void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType);
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index dd5a362b417..7a906ffa9bd 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -84,6 +84,9 @@ import static
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.suppor
@Internal
public final class DynamicSinkUtils {
+ // Ensures that physical and metadata columns don't collide.
+ private static final String METADATA_COLUMN_PREFIX = "$metadata$";
+
/** Converts an {@link TableResult#collect()} sink to a {@link RelNode}. */
public static RelNode convertCollectToRel(
FlinkRelBuilder relBuilder,
@@ -642,7 +645,11 @@ public final class DynamicSinkUtils {
final Stream<RowField> metadataFields =
createRequiredMetadataKeys(schema, sink).stream()
- .map(k -> new RowField(k,
metadataMap.get(k).getLogicalType()));
+ .map(
+ k ->
+ new RowField(
+ METADATA_COLUMN_PREFIX + k,
+
metadataMap.get(k).getLogicalType()));
final List<RowField> rowFields =
Stream.concat(physicalFields,
metadataFields).collect(Collectors.toList());
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
index e531da669bb..13dda6d39c6 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
@@ -71,6 +71,9 @@ import static
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.suppor
@Internal
public final class DynamicSourceUtils {
+ // Ensures that physical and metadata columns don't collide.
+ public static final String METADATA_COLUMN_PREFIX = "$metadata$";
+
/**
* Converts a given {@link DataStream} to a {@link RelNode}. It adds
helper projections if
* necessary.
@@ -204,7 +207,11 @@ public final class DynamicSourceUtils {
final Stream<RowField> metadataFields =
createRequiredMetadataKeys(schema, source).stream()
- .map(k -> new RowField(k,
metadataMap.get(k).getLogicalType()));
+ .map(
+ k ->
+ new RowField(
+ METADATA_COLUMN_PREFIX + k,
+
metadataMap.get(k).getLogicalType()));
final List<RowField> rowFields =
Stream.concat(physicalFields,
metadataFields).collect(Collectors.toList());
@@ -315,7 +322,9 @@ public final class DynamicSourceUtils {
.getMetadataKey()
.orElse(metadataColumn.getName());
return rexBuilder.makeAbstractCast(
- relDataType,
relBuilder.field(metadataKey));
+ relDataType,
+ relBuilder.field(
+ METADATA_COLUMN_PREFIX
+ metadataKey));
} else {
return relBuilder.field(c.getName());
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
index a4044bf45b0..58af37cbcbf 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
@@ -60,6 +60,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static
org.apache.flink.table.planner.connectors.DynamicSourceUtils.METADATA_COLUMN_PREFIX;
import static
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createProducedType;
import static
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createRequiredMetadataKeys;
import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext;
@@ -308,6 +309,7 @@ public class PushProjectIntoTableSourceScanRule
final List<String> projectedMetadataKeys =
projectedMetadataColumns.stream()
.map(NestedColumn::name)
+ .map(k ->
k.substring(METADATA_COLUMN_PREFIX.length()))
.collect(Collectors.toList());
abilitySpecs.add(new ReadingMetadataSpec(projectedMetadataKeys,
newProducedType));
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index db9585ea4bd..1c93fe10e02 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -402,8 +402,7 @@ public final class TestValuesTableFactory
Collection<Row> data = registeredData.getOrDefault(dataId,
Collections.emptyList());
List<Map<String, String>> partitions =
parsePartitionList(helper.getOptions().get(PARTITION_LIST));
- DataType producedDataType =
-
context.getCatalogTable().getSchema().toPhysicalRowDataType();
+ DataType producedDataType = context.getPhysicalRowDataType();
// pushing project into scan will prune schema and we have to get
the mapping between
// partition and row
Map<Map<String, String>, Collection<Row>> partition2Rows;
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
index d98ac9414fb..3f38bae8ab5 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
@@ -349,7 +349,7 @@ public class PushProjectIntoTableSourceScanRuleTest
equalTo(Collections.emptyList()));
assertThat(
DataType.getFieldNames(appliedMetadataDataType.get()),
- equalTo(Collections.singletonList("m2")));
+ equalTo(Collections.singletonList("$metadata$m2")));
}
@Test
@@ -375,7 +375,7 @@ public class PushProjectIntoTableSourceScanRuleTest
equalTo(Collections.singletonList("f1")));
assertThat(
DataType.getFieldNames(appliedMetadataDataType.get()),
- equalTo(Arrays.asList("f1", "m2")));
+ equalTo(Arrays.asList("f1", "$metadata$m2")));
}
//
---------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
index 6a2c8f88d9b..479eb8ad787 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
@@ -50,8 +50,8 @@ LogicalSink(table=[default_catalog.default_database.MySink],
fields=[a, b, filem
<Resource name="optimized rel plan">
<![CDATA[
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, filemeta])
-+- Calc(select=[a, b, CAST(file.path AS VARCHAR(2147483647)) AS filemeta])
- +- TableSourceScan(table=[[default_catalog, default_database,
MyTableWithMeta, project=[a, b], metadata=[file.path]]], fields=[a, b,
file.path])
++- Calc(select=[a, b, CAST($metadata$file.path AS VARCHAR(2147483647)) AS
filemeta])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTableWithMeta, project=[a, b], metadata=[file.path]]], fields=[a, b,
$metadata$file.path])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
index 4c134457412..037458a3857 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
@@ -113,8 +113,8 @@ LogicalProject(id=[$0], nested1=[$1.nested1],
results=[+(+($1.nested1.value, $1.
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value +
deepNested_nested2_num) + metadata_1) AS results])
-+- TableSourceScan(table=[[default_catalog, default_database, T, project=[id,
deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]],
fields=[id, deepNested_nested1, deepNested_nested2_num, metadata_1])
+Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value +
deepNested_nested2_num) + $metadata$metadata_1) AS results])
++- TableSourceScan(table=[[default_catalog, default_database, T, project=[id,
deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]],
fields=[id, deepNested_nested1, deepNested_nested2_num, $metadata$metadata_1])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out
index 60411d8a3d9..53107af6c88 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out
@@ -69,7 +69,7 @@
"abilities" : [ {
"type" : "WritingMetadata",
"metadataKeys" : [ "m" ],
- "consumedType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)>
NOT NULL"
+ "consumedType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m`
VARCHAR(2147483647)> NOT NULL"
} ]
},
"inputChangelogMode" : [ "INSERT" ],
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReadingMetadata.out
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReadingMetadata.out
index a8014cf4400..81bc950fda9 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReadingMetadata.out
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReadingMetadata.out
@@ -40,11 +40,11 @@
}, {
"type" : "ReadingMetadata",
"metadataKeys" : [ "m" ],
- "producedType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)>
NOT NULL"
+ "producedType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m`
VARCHAR(2147483647)> NOT NULL"
} ]
},
- "outputType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)>",
- "description" : "TableSourceScan(table=[[default_catalog,
default_database, MyTable2, project=[a, b], metadata=[m]]], fields=[a, b, m])",
+ "outputType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m`
VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, MyTable2, project=[a, b], metadata=[m]]], fields=[a, b,
$metadata$m])",
"inputProperties" : [ ]
}, {
"id" : 2,
@@ -88,8 +88,8 @@
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
- "outputType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)>",
- "description" : "Sink(table=[default_catalog.default_database.sink],
fields=[a, b, m])"
+ "outputType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m`
VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink],
fields=[a, b, $metadata$m])"
} ],
"edges" : [ {
"source" : 1,
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
index deab0da75fd..b0874d69e86 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
@@ -111,8 +111,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], metadata=[$3],
computed=[$4])
<Resource name="optimized rel plan">
<![CDATA[
FlinkLogicalCalc(select=[a, b, c, metadata, computed])
-+- FlinkLogicalCalc(select=[a, b, Reinterpret(c) AS c, CAST(metadata_2 AS
BIGINT) AS metadata, +(CAST(metadata_2 AS BIGINT), b) AS computed])
- +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database,
MyTable, watermark=[-(c, CAST(+(CAST(metadata_2 AS BIGINT), +(CAST(metadata_2
AS BIGINT), b)) AS INTERVAL SECOND))]]], fields=[a, b, c, metadata_2])
++- FlinkLogicalCalc(select=[a, b, Reinterpret(c) AS c,
CAST($metadata$metadata_2 AS BIGINT) AS metadata, +(CAST($metadata$metadata_2
AS BIGINT), b) AS computed])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database,
MyTable, watermark=[-(c, CAST(+(CAST($metadata$metadata_2 AS BIGINT),
+(CAST($metadata$metadata_2 AS BIGINT), b)) AS INTERVAL SECOND))]]], fields=[a,
b, c, $metadata$metadata_2])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
index d97e11194e1..abaf03610cc 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
@@ -16,13 +16,18 @@ See the License for the specific language governing
permissions and
limitations under the License.
-->
<Root>
- <TestCase name="testCanPushDownLocalHashAggWithGroup">
+ <TestCase name="testCannotPushDownLocalAggAfterLimitPushDown">
<Resource name="sql">
<![CDATA[SELECT
sum(amount),
name,
type
-FROM inventory
+FROM (
+ SELECT
+ *
+ FROM inventory
+ LIMIT 100
+) t
group by name, type]]>
</Resource>
<Resource name="ast">
@@ -30,7 +35,9 @@ FROM inventory
LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
+- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+- LogicalProject(name=[$1], type=[$4], amount=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database,
inventory]])
+ +- LogicalSort(fetch=[100])
+ +- LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3],
type=[$4])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
inventory]])
]]>
</Resource>
<Resource name="optimized rel plan">
@@ -38,14 +45,19 @@ LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
Calc(select=[EXPR$0, name, type])
+- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type,
Final_SUM(sum$0) AS EXPR$0])
+- Exchange(distribution=[hash[name, type]])
- +- TableSourceScan(table=[[default_catalog, default_database, inventory,
project=[name, type, amount], metadata=[], aggregates=[grouping=[name,type],
aggFunctions=[LongSumAggFunction(amount)]]]], fields=[name, type, sum$0])
+ +- LocalHashAggregate(groupBy=[name, type], select=[name, type,
Partial_SUM(amount) AS sum$0])
+ +- Calc(select=[name, type, amount])
+ +- Limit(offset=[0], fetch=[100], global=[true])
+ +- Exchange(distribution=[single])
+ +- Limit(offset=[0], fetch=[100], global=[false])
+ +- TableSourceScan(table=[[default_catalog,
default_database, inventory, limit=[100]]], fields=[id, name, amount, price,
type])
]]>
</Resource>
</TestCase>
- <TestCase name="testDisablePushDownLocalAgg">
+ <TestCase name="testCannotPushDownLocalAggWithUDAF">
<Resource name="sql">
<![CDATA[SELECT
- sum(amount),
+ udaf_collect(amount),
name,
type
FROM inventory
@@ -54,7 +66,7 @@ FROM inventory
<Resource name="ast">
<![CDATA[
LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
-+- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[udaf_collect($2)])
+- LogicalProject(name=[$1], type=[$4], amount=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database,
inventory]])
]]>
@@ -62,109 +74,185 @@ LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[EXPR$0, name, type])
-+- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type,
Final_SUM(sum$0) AS EXPR$0])
- +- Exchange(distribution=[hash[name, type]])
- +- LocalHashAggregate(groupBy=[name, type], select=[name, type,
Partial_SUM(amount) AS sum$0])
- +- TableSourceScan(table=[[default_catalog, default_database,
inventory, project=[name, type, amount], metadata=[]]], fields=[name, type,
amount])
++- SortAggregate(isMerge=[true], groupBy=[name, type], select=[name, type,
Final_udaf_collect(EXPR$0) AS EXPR$0])
+ +- Sort(orderBy=[name ASC, type ASC])
+ +- Exchange(distribution=[hash[name, type]])
+ +- LocalSortAggregate(groupBy=[name, type], select=[name, type,
Partial_udaf_collect(amount) AS EXPR$0])
+ +- Sort(orderBy=[name ASC, type ASC])
+ +- TableSourceScan(table=[[default_catalog, default_database,
inventory, project=[name, type, amount], metadata=[]]], fields=[name, type,
amount])
]]>
</Resource>
</TestCase>
- <TestCase name="testCanPushDownLocalHashAggWithoutGroup">
+ <TestCase name="testCannotPushDownLocalAggWithUnsupportedDataTypes">
+ <Resource name="sql">
+ <![CDATA[SELECT
+ max(name),
+ type
+FROM inventory
+ group by type]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(EXPR$0=[$1], type=[$0])
++- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)])
+ +- LogicalProject(type=[$4], name=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
inventory]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[EXPR$0, type])
++- SortAggregate(isMerge=[true], groupBy=[type], select=[type,
Final_MAX(max$0) AS EXPR$0])
+ +- Sort(orderBy=[type ASC])
+ +- Exchange(distribution=[hash[type]])
+ +- LocalSortAggregate(groupBy=[type], select=[type, Partial_MAX(name)
AS max$0])
+ +- Sort(orderBy=[type ASC])
+ +- TableSourceScan(table=[[default_catalog, default_database,
inventory, project=[type, name], metadata=[]]], fields=[type, name])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCannotPushDownWithArgFilter">
<Resource name="sql">
<![CDATA[SELECT
min(id),
max(amount),
sum(price),
- avg(price),
- count(id)
-FROM inventory]]>
+ count(id) FILTER(WHERE id > 100),
+ name
+FROM inventory
+ group by name]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[MIN($0)], EXPR$1=[MAX($1)],
EXPR$2=[SUM($2)], EXPR$3=[AVG($2)], EXPR$4=[COUNT($0)])
-+- LogicalProject(id=[$0], amount=[$2], price=[$3])
- +- LogicalTableScan(table=[[default_catalog, default_database, inventory]])
+LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], name=[$0])
++- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)], EXPR$1=[MAX($2)],
EXPR$2=[SUM($3)], EXPR$3=[COUNT($1) FILTER $4])
+ +- LogicalProject(name=[$1], id=[$0], amount=[$2], price=[$3], $f4=[IS
TRUE(>($0, 100))])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
inventory]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS EXPR$0,
Final_MAX(max$1) AS EXPR$1, Final_SUM(sum$2) AS EXPR$2, Final_AVG(sum$3,
count$4) AS EXPR$3, Final_COUNT(count$5) AS EXPR$4])
-+- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database, inventory,
project=[id, amount, price], metadata=[], aggregates=[grouping=[],
aggFunctions=[LongMinAggFunction(id),LongMaxAggFunction(amount),LongSumAggFunction(price),LongSum0AggFunction(price),CountAggFunction(price),CountAggFunction(id)]]]],
fields=[min$0, max$1, sum$2, sum$3, count$4, count$5])
+Calc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3, name])
++- HashAggregate(isMerge=[true], groupBy=[name], select=[name,
Final_MIN(min$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1, Final_SUM(sum$2) AS
EXPR$2, Final_COUNT(count$3) AS EXPR$3])
+ +- Exchange(distribution=[hash[name]])
+ +- LocalHashAggregate(groupBy=[name], select=[name, Partial_MIN(id) AS
min$0, Partial_MAX(amount) AS max$1, Partial_SUM(price) AS sum$2,
Partial_COUNT(id) FILTER $f4 AS count$3])
+ +- Calc(select=[name, id, amount, price, IS TRUE(>(id, 100)) AS $f4])
+ +- TableSourceScan(table=[[default_catalog, default_database,
inventory, project=[name, id, amount, price], metadata=[]]], fields=[name, id,
amount, price])
]]>
</Resource>
</TestCase>
- <TestCase name="testCanPushDownLocalSortAggWithoutSort">
+ <TestCase name="testCannotPushDownWithColumnExpression">
<Resource name="sql">
<![CDATA[SELECT
- min(id),
+ min(amount + price),
max(amount),
sum(price),
- avg(price),
- count(id)
-FROM inventory]]>
+ count(id),
+ name
+FROM inventory
+ group by name]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[MIN($0)], EXPR$1=[MAX($1)],
EXPR$2=[SUM($2)], EXPR$3=[AVG($2)], EXPR$4=[COUNT($0)])
-+- LogicalProject(id=[$0], amount=[$2], price=[$3])
- +- LogicalTableScan(table=[[default_catalog, default_database, inventory]])
+LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], name=[$0])
++- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)], EXPR$1=[MAX($2)],
EXPR$2=[SUM($3)], EXPR$3=[COUNT($4)])
+ +- LogicalProject(name=[$1], $f1=[+($2, $3)], amount=[$2], price=[$3],
id=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
inventory]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-SortAggregate(isMerge=[true], select=[Final_MIN(min$0) AS EXPR$0,
Final_MAX(max$1) AS EXPR$1, Final_SUM(sum$2) AS EXPR$2, Final_AVG(sum$3,
count$4) AS EXPR$3, Final_COUNT(count$5) AS EXPR$4])
-+- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database, inventory,
project=[id, amount, price], metadata=[], aggregates=[grouping=[],
aggFunctions=[LongMinAggFunction(id),LongMaxAggFunction(amount),LongSumAggFunction(price),LongSum0AggFunction(price),CountAggFunction(price),CountAggFunction(id)]]]],
fields=[min$0, max$1, sum$2, sum$3, count$4, count$5])
+Calc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3, name])
++- HashAggregate(isMerge=[true], groupBy=[name], select=[name,
Final_MIN(min$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1, Final_SUM(sum$2) AS
EXPR$2, Final_COUNT(count$3) AS EXPR$3])
+ +- Exchange(distribution=[hash[name]])
+ +- LocalHashAggregate(groupBy=[name], select=[name, Partial_MIN($f1) AS
min$0, Partial_MAX(amount) AS max$1, Partial_SUM(price) AS sum$2,
Partial_COUNT(id) AS count$3])
+ +- Calc(select=[name, +(amount, price) AS $f1, amount, price, id])
+ +- TableSourceScan(table=[[default_catalog, default_database,
inventory, project=[name, amount, price, id], metadata=[]]], fields=[name,
amount, price, id])
]]>
</Resource>
</TestCase>
- <TestCase name="testCanPushDownLocalSortAggWithSort">
+ <TestCase name="testCannotPushDownWithUnsupportedAggFunction">
<Resource name="sql">
<![CDATA[SELECT
- sum(amount),
- name,
- type
+ min(id),
+ max(amount),
+ sum(price),
+ count(distinct id),
+ name
FROM inventory
- group by name, type]]>
+ group by name]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
-+- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
- +- LogicalProject(name=[$1], type=[$4], amount=[$2])
+LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], name=[$0])
++- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)], EXPR$1=[MAX($2)],
EXPR$2=[SUM($3)], EXPR$3=[COUNT(DISTINCT $1)])
+ +- LogicalProject(name=[$1], id=[$0], amount=[$2], price=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database,
inventory]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[EXPR$0, name, type])
-+- SortAggregate(isMerge=[true], groupBy=[name, type], select=[name, type,
Final_SUM(sum$0) AS EXPR$0])
- +- Sort(orderBy=[name ASC, type ASC])
- +- Exchange(distribution=[hash[name, type]])
- +- TableSourceScan(table=[[default_catalog, default_database,
inventory, project=[name, type, amount], metadata=[],
aggregates=[grouping=[name,type], aggFunctions=[LongSumAggFunction(amount)]]]],
fields=[name, type, sum$0])
+Calc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3, name])
++- HashAggregate(isMerge=[true], groupBy=[name], select=[name,
Final_MIN(min$0) AS EXPR$0, Final_MIN(min$1) AS EXPR$1, Final_MIN(min$2) AS
EXPR$2, Final_COUNT(count$3) AS EXPR$3])
+ +- Exchange(distribution=[hash[name]])
+ +- LocalHashAggregate(groupBy=[name], select=[name, Partial_MIN(EXPR$0)
FILTER $g_1 AS min$0, Partial_MIN(EXPR$1) FILTER $g_1 AS min$1,
Partial_MIN(EXPR$2) FILTER $g_1 AS min$2, Partial_COUNT(id) FILTER $g_0 AS
count$3])
+ +- Calc(select=[name, id, EXPR$0, EXPR$1, EXPR$2, =(CASE(=($e, 0), 0,
1), 0) AS $g_0, =(CASE(=($e, 0), 0, 1), 1) AS $g_1])
+ +- HashAggregate(isMerge=[true], groupBy=[name, id, $e],
select=[name, id, $e, Final_MIN(min$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1,
Final_SUM(sum$2) AS EXPR$2])
+ +- Exchange(distribution=[hash[name, id, $e]])
+ +- LocalHashAggregate(groupBy=[name, id, $e], select=[name,
id, $e, Partial_MIN(id_0) AS min$0, Partial_MAX(amount) AS max$1,
Partial_SUM(price) AS sum$2])
+ +- Expand(projects=[{name, id, amount, price, 0 AS $e, id
AS id_0}, {name, null AS id, amount, price, 1 AS $e, id AS id_0}])
+ +- TableSourceScan(table=[[default_catalog,
default_database, inventory, project=[name, id, amount, price], metadata=[]]],
fields=[name, id, amount, price])
]]>
</Resource>
</TestCase>
- <TestCase name="testCanPushDownLocalAggWithAuxGrouping">
+ <TestCase name="testCannotPushDownWithWindowAggFunction">
<Resource name="sql">
<![CDATA[SELECT
- id, name, count(*)
-FROM inventory_meta
- group by id, name]]>
+ id,
+ amount,
+ sum(price) over (partition by name),
+ name
+FROM inventory]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()])
-+- LogicalProject(id=[$0], name=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database,
inventory_meta]])
+LogicalProject(id=[$0], amount=[$2], EXPR$2=[CASE(>(COUNT($3) OVER (PARTITION
BY $1), 0), $SUM0($3) OVER (PARTITION BY $1), null:BIGINT)], name=[$1])
++- LogicalTableScan(table=[[default_catalog, default_database, inventory]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-HashAggregate(isMerge=[true], groupBy=[id], auxGrouping=[name], select=[id,
name, Final_COUNT(count1$0) AS EXPR$2])
-+- Exchange(distribution=[hash[id]])
- +- TableSourceScan(table=[[default_catalog, default_database,
inventory_meta, project=[id, name], metadata=[],
aggregates=[grouping=[id,name], aggFunctions=[Count1AggFunction()]]]],
fields=[id, name, count1$0])
+Calc(select=[id, amount, CASE(>(w0$o0, 0), w0$o1, null:BIGINT) AS EXPR$2,
name])
++- OverAggregate(partitionBy=[name], window#0=[COUNT(price) AS w0$o0,
$SUM0(price) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING], select=[id, name, amount, price, w0$o0, w0$o1])
+ +- Sort(orderBy=[name ASC])
+ +- Exchange(distribution=[hash[name]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
inventory, project=[id, name, amount, price], metadata=[]]], fields=[id, name,
amount, price])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCanPushDownLocalHashAggWithGroup">
+ <Resource name="sql">
+ <![CDATA[SELECT
+ sum(amount),
+ name,
+ type
+FROM inventory
+ group by name, type]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+ +- LogicalProject(name=[$1], type=[$4], amount=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
inventory]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[EXPR$0, name, type])
++- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type,
Final_SUM(sum$0) AS EXPR$0])
+ +- Exchange(distribution=[hash[name, type]])
+ +- TableSourceScan(table=[[default_catalog, default_database, inventory,
project=[name, type, amount], metadata=[], aggregates=[grouping=[name,type],
aggFunctions=[LongSumAggFunction(amount)]]]], fields=[name, type, sum$0])
]]>
</Resource>
</TestCase>
@@ -193,6 +281,28 @@ Calc(select=[EXPR$0, name, type])
+- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type,
Final_SUM(sum$0) AS EXPR$0])
+- Exchange(distribution=[hash[name, type]])
+- TableSourceScan(table=[[default_catalog, default_database, inventory,
filter=[=(id, 123:BIGINT)], project=[name, type, amount], metadata=[],
aggregates=[grouping=[name,type], aggFunctions=[LongSumAggFunction(amount)]]]],
fields=[name, type, sum$0])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCanPushDownLocalAggWithAuxGrouping">
+ <Resource name="sql">
+ <![CDATA[SELECT
+ id, name, count(*)
+FROM inventory_meta
+ group by id, name]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()])
++- LogicalProject(id=[$0], name=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
inventory_meta]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+HashAggregate(isMerge=[true], groupBy=[id], auxGrouping=[name], select=[id,
name, Final_COUNT(count1$0) AS EXPR$2])
++- Exchange(distribution=[hash[id]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
inventory_meta, project=[id, name], metadata=[],
aggregates=[grouping=[id,name], aggFunctions=[Count1AggFunction()]]]],
fields=[id, name, count1$0])
]]>
</Resource>
</TestCase>
@@ -222,7 +332,7 @@ LogicalProject(EXPR$0=[$2], EXPR$1=[$3], name=[$0],
type=[$1])
Calc(select=[EXPR$0, EXPR$1, name, type])
+- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type,
Final_SUM(sum$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1])
+- Exchange(distribution=[hash[name, type]])
- +- TableSourceScan(table=[[default_catalog, default_database,
inventory_meta, filter=[=(id, 123:BIGINT)], project=[name, type, amount],
metadata=[metadata_1], aggregates=[grouping=[name,type],
aggFunctions=[LongSumAggFunction(amount),LongMaxAggFunction(metadata_1)]]]],
fields=[name, type, sum$0, max$1])
+ +- TableSourceScan(table=[[default_catalog, default_database,
inventory_meta, filter=[=(id, 123:BIGINT)], project=[name, type, amount],
metadata=[metadata_1], aggregates=[grouping=[name,type],
aggFunctions=[LongSumAggFunction(amount),LongMaxAggFunction($metadata$metadata_1)]]]],
fields=[name, type, sum$0, max$1])
]]>
</Resource>
</TestCase>
@@ -254,46 +364,39 @@ Calc(select=[EXPR$0, type, name])
]]>
</Resource>
</TestCase>
- <TestCase name="testCanPushDownLocalAggWithoutProjectionPushDown">
+ <TestCase name="testCanPushDownLocalHashAggWithoutGroup">
<Resource name="sql">
<![CDATA[SELECT
- sum(amount),
- name,
- type
-FROM inventory_no_proj
- where id = 123
- group by name, type]]>
+ min(id),
+ max(amount),
+ sum(price),
+ avg(price),
+ count(id)
+FROM inventory]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
-+- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
- +- LogicalProject(name=[$1], type=[$4], amount=[$2])
- +- LogicalFilter(condition=[=($0, 123)])
- +- LogicalTableScan(table=[[default_catalog, default_database,
inventory_no_proj]])
+LogicalAggregate(group=[{}], EXPR$0=[MIN($0)], EXPR$1=[MAX($1)],
EXPR$2=[SUM($2)], EXPR$3=[AVG($2)], EXPR$4=[COUNT($0)])
++- LogicalProject(id=[$0], amount=[$2], price=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database, inventory]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[EXPR$0, name, type])
-+- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type,
Final_SUM(sum$0) AS EXPR$0])
- +- Exchange(distribution=[hash[name, type]])
- +- TableSourceScan(table=[[default_catalog, default_database,
inventory_no_proj, filter=[=(id, 123:BIGINT)],
aggregates=[grouping=[name,type], aggFunctions=[LongSumAggFunction(amount)]]]],
fields=[name, type, sum$0])
+HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS EXPR$0,
Final_MAX(max$1) AS EXPR$1, Final_SUM(sum$2) AS EXPR$2, Final_AVG(sum$3,
count$4) AS EXPR$3, Final_COUNT(count$5) AS EXPR$4])
++- Exchange(distribution=[single])
+ +- TableSourceScan(table=[[default_catalog, default_database, inventory,
project=[id, amount, price], metadata=[], aggregates=[grouping=[],
aggFunctions=[LongMinAggFunction(id),LongMaxAggFunction(amount),LongSumAggFunction(price),LongSum0AggFunction(price),CountAggFunction(price),CountAggFunction(id)]]]],
fields=[min$0, max$1, sum$2, sum$3, count$4, count$5])
]]>
</Resource>
</TestCase>
- <TestCase name="testCannotPushDownLocalAggAfterLimitPushDown">
+ <TestCase name="testCanPushDownLocalAggWithoutProjectionPushDown">
<Resource name="sql">
<![CDATA[SELECT
sum(amount),
name,
type
-FROM (
- SELECT
- *
- FROM inventory
- LIMIT 100
-) t
+FROM inventory_no_proj
+ where id = 123
group by name, type]]>
</Resource>
<Resource name="ast">
@@ -301,9 +404,8 @@ FROM (
LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
+- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+- LogicalProject(name=[$1], type=[$4], amount=[$2])
- +- LogicalSort(fetch=[100])
- +- LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3],
type=[$4])
- +- LogicalTableScan(table=[[default_catalog, default_database,
inventory]])
+ +- LogicalFilter(condition=[=($0, 123)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
inventory_no_proj]])
]]>
</Resource>
<Resource name="optimized rel plan">
@@ -311,188 +413,86 @@ LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
Calc(select=[EXPR$0, name, type])
+- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type,
Final_SUM(sum$0) AS EXPR$0])
+- Exchange(distribution=[hash[name, type]])
- +- LocalHashAggregate(groupBy=[name, type], select=[name, type,
Partial_SUM(amount) AS sum$0])
- +- Calc(select=[name, type, amount])
- +- Limit(offset=[0], fetch=[100], global=[true])
- +- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[100], global=[false])
- +- TableSourceScan(table=[[default_catalog,
default_database, inventory, limit=[100]]], fields=[id, name, amount, price,
type])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testCannotPushDownLocalAggWithUDAF">
- <Resource name="sql">
- <![CDATA[SELECT
- udaf_collect(amount),
- name,
- type
-FROM inventory
- group by name, type]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
-+- LogicalAggregate(group=[{0, 1}], EXPR$0=[udaf_collect($2)])
- +- LogicalProject(name=[$1], type=[$4], amount=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database,
inventory]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Calc(select=[EXPR$0, name, type])
-+- SortAggregate(isMerge=[true], groupBy=[name, type], select=[name, type,
Final_udaf_collect(EXPR$0) AS EXPR$0])
- +- Sort(orderBy=[name ASC, type ASC])
- +- Exchange(distribution=[hash[name, type]])
- +- LocalSortAggregate(groupBy=[name, type], select=[name, type,
Partial_udaf_collect(amount) AS EXPR$0])
- +- Sort(orderBy=[name ASC, type ASC])
- +- TableSourceScan(table=[[default_catalog, default_database,
inventory, project=[name, type, amount], metadata=[]]], fields=[name, type,
amount])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testCannotPushDownLocalAggWithUnsupportedDataTypes">
- <Resource name="sql">
- <![CDATA[SELECT
- max(name),
- type
-FROM inventory
- group by type]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(EXPR$0=[$1], type=[$0])
-+- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)])
- +- LogicalProject(type=[$4], name=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database,
inventory]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Calc(select=[EXPR$0, type])
-+- SortAggregate(isMerge=[true], groupBy=[type], select=[type,
Final_MAX(max$0) AS EXPR$0])
- +- Sort(orderBy=[type ASC])
- +- Exchange(distribution=[hash[type]])
- +- LocalSortAggregate(groupBy=[type], select=[type, Partial_MAX(name)
AS max$0])
- +- Sort(orderBy=[type ASC])
- +- TableSourceScan(table=[[default_catalog, default_database,
inventory, project=[type, name], metadata=[]]], fields=[type, name])
+ +- TableSourceScan(table=[[default_catalog, default_database,
inventory_no_proj, filter=[=(id, 123:BIGINT)],
aggregates=[grouping=[name,type], aggFunctions=[LongSumAggFunction(amount)]]]],
fields=[name, type, sum$0])
]]>
</Resource>
</TestCase>
- <TestCase name="testCannotPushDownWithColumnExpression">
+ <TestCase name="testCanPushDownLocalSortAggWithoutSort">
<Resource name="sql">
<![CDATA[SELECT
- min(amount + price),
+ min(id),
max(amount),
sum(price),
- count(id),
- name
-FROM inventory
- group by name]]>
+ avg(price),
+ count(id)
+FROM inventory]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], name=[$0])
-+- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)], EXPR$1=[MAX($2)],
EXPR$2=[SUM($3)], EXPR$3=[COUNT($4)])
- +- LogicalProject(name=[$1], $f1=[+($2, $3)], amount=[$2], price=[$3],
id=[$0])
- +- LogicalTableScan(table=[[default_catalog, default_database,
inventory]])
+LogicalAggregate(group=[{}], EXPR$0=[MIN($0)], EXPR$1=[MAX($1)],
EXPR$2=[SUM($2)], EXPR$3=[AVG($2)], EXPR$4=[COUNT($0)])
++- LogicalProject(id=[$0], amount=[$2], price=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database, inventory]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3, name])
-+- HashAggregate(isMerge=[true], groupBy=[name], select=[name,
Final_MIN(min$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1, Final_SUM(sum$2) AS
EXPR$2, Final_COUNT(count$3) AS EXPR$3])
- +- Exchange(distribution=[hash[name]])
- +- LocalHashAggregate(groupBy=[name], select=[name, Partial_MIN($f1) AS
min$0, Partial_MAX(amount) AS max$1, Partial_SUM(price) AS sum$2,
Partial_COUNT(id) AS count$3])
- +- Calc(select=[name, +(amount, price) AS $f1, amount, price, id])
- +- TableSourceScan(table=[[default_catalog, default_database,
inventory, project=[name, amount, price, id], metadata=[]]], fields=[name,
amount, price, id])
+SortAggregate(isMerge=[true], select=[Final_MIN(min$0) AS EXPR$0,
Final_MAX(max$1) AS EXPR$1, Final_SUM(sum$2) AS EXPR$2, Final_AVG(sum$3,
count$4) AS EXPR$3, Final_COUNT(count$5) AS EXPR$4])
++- Exchange(distribution=[single])
+ +- TableSourceScan(table=[[default_catalog, default_database, inventory,
project=[id, amount, price], metadata=[], aggregates=[grouping=[],
aggFunctions=[LongMinAggFunction(id),LongMaxAggFunction(amount),LongSumAggFunction(price),LongSum0AggFunction(price),CountAggFunction(price),CountAggFunction(id)]]]],
fields=[min$0, max$1, sum$2, sum$3, count$4, count$5])
]]>
</Resource>
</TestCase>
- <TestCase name="testCannotPushDownWithUnsupportedAggFunction">
+ <TestCase name="testDisablePushDownLocalAgg">
<Resource name="sql">
<![CDATA[SELECT
- min(id),
- max(amount),
- sum(price),
- count(distinct id),
- name
+ sum(amount),
+ name,
+ type
FROM inventory
- group by name]]>
+ group by name, type]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], name=[$0])
-+- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)], EXPR$1=[MAX($2)],
EXPR$2=[SUM($3)], EXPR$3=[COUNT(DISTINCT $1)])
- +- LogicalProject(name=[$1], id=[$0], amount=[$2], price=[$3])
+LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+ +- LogicalProject(name=[$1], type=[$4], amount=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database,
inventory]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3, name])
-+- HashAggregate(isMerge=[true], groupBy=[name], select=[name,
Final_MIN(min$0) AS EXPR$0, Final_MIN(min$1) AS EXPR$1, Final_MIN(min$2) AS
EXPR$2, Final_COUNT(count$3) AS EXPR$3])
- +- Exchange(distribution=[hash[name]])
- +- LocalHashAggregate(groupBy=[name], select=[name, Partial_MIN(EXPR$0)
FILTER $g_1 AS min$0, Partial_MIN(EXPR$1) FILTER $g_1 AS min$1,
Partial_MIN(EXPR$2) FILTER $g_1 AS min$2, Partial_COUNT(id) FILTER $g_0 AS
count$3])
- +- Calc(select=[name, id, EXPR$0, EXPR$1, EXPR$2, =(CASE(=($e, 0), 0,
1), 0) AS $g_0, =(CASE(=($e, 0), 0, 1), 1) AS $g_1])
- +- HashAggregate(isMerge=[true], groupBy=[name, id, $e],
select=[name, id, $e, Final_MIN(min$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1,
Final_SUM(sum$2) AS EXPR$2])
- +- Exchange(distribution=[hash[name, id, $e]])
- +- LocalHashAggregate(groupBy=[name, id, $e], select=[name,
id, $e, Partial_MIN(id_0) AS min$0, Partial_MAX(amount) AS max$1,
Partial_SUM(price) AS sum$2])
- +- Expand(projects=[{name, id, amount, price, 0 AS $e, id
AS id_0}, {name, null AS id, amount, price, 1 AS $e, id AS id_0}])
- +- TableSourceScan(table=[[default_catalog,
default_database, inventory, project=[name, id, amount, price], metadata=[]]],
fields=[name, id, amount, price])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testCannotPushDownWithWindowAggFunction">
- <Resource name="sql">
- <![CDATA[SELECT
- id,
- amount,
- sum(price) over (partition by name),
- name
-FROM inventory]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(id=[$0], amount=[$2], EXPR$2=[CASE(>(COUNT($3) OVER (PARTITION
BY $1), 0), $SUM0($3) OVER (PARTITION BY $1), null:BIGINT)], name=[$1])
-+- LogicalTableScan(table=[[default_catalog, default_database, inventory]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Calc(select=[id, amount, CASE(>(w0$o0, 0), w0$o1, null:BIGINT) AS EXPR$2,
name])
-+- OverAggregate(partitionBy=[name], window#0=[COUNT(price) AS w0$o0,
$SUM0(price) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING], select=[id, name, amount, price, w0$o0, w0$o1])
- +- Sort(orderBy=[name ASC])
- +- Exchange(distribution=[hash[name]])
- +- TableSourceScan(table=[[default_catalog, default_database,
inventory, project=[id, name, amount, price], metadata=[]]], fields=[id, name,
amount, price])
+Calc(select=[EXPR$0, name, type])
++- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type,
Final_SUM(sum$0) AS EXPR$0])
+ +- Exchange(distribution=[hash[name, type]])
+ +- LocalHashAggregate(groupBy=[name, type], select=[name, type,
Partial_SUM(amount) AS sum$0])
+ +- TableSourceScan(table=[[default_catalog, default_database,
inventory, project=[name, type, amount], metadata=[]]], fields=[name, type,
amount])
]]>
</Resource>
</TestCase>
- <TestCase name="testCannotPushDownWithArgFilter">
+ <TestCase name="testCanPushDownLocalSortAggWithSort">
<Resource name="sql">
<![CDATA[SELECT
- min(id),
- max(amount),
- sum(price),
- count(id) FILTER(WHERE id > 100),
- name
+ sum(amount),
+ name,
+ type
FROM inventory
- group by name]]>
+ group by name, type]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], name=[$0])
-+- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)], EXPR$1=[MAX($2)],
EXPR$2=[SUM($3)], EXPR$3=[COUNT($1) FILTER $4])
- +- LogicalProject(name=[$1], id=[$0], amount=[$2], price=[$3], $f4=[IS
TRUE(>($0, 100))])
+LogicalProject(EXPR$0=[$2], name=[$0], type=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+ +- LogicalProject(name=[$1], type=[$4], amount=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database,
inventory]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3, name])
-+- HashAggregate(isMerge=[true], groupBy=[name], select=[name,
Final_MIN(min$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1, Final_SUM(sum$2) AS
EXPR$2, Final_COUNT(count$3) AS EXPR$3])
- +- Exchange(distribution=[hash[name]])
- +- LocalHashAggregate(groupBy=[name], select=[name, Partial_MIN(id) AS
min$0, Partial_MAX(amount) AS max$1, Partial_SUM(price) AS sum$2,
Partial_COUNT(id) FILTER $f4 AS count$3])
- +- Calc(select=[name, id, amount, price, IS TRUE(>(id, 100)) AS $f4])
- +- TableSourceScan(table=[[default_catalog, default_database,
inventory, project=[name, id, amount, price], metadata=[]]], fields=[name, id,
amount, price])
+Calc(select=[EXPR$0, name, type])
++- SortAggregate(isMerge=[true], groupBy=[name, type], select=[name, type,
Final_SUM(sum$0) AS EXPR$0])
+ +- Sort(orderBy=[name ASC, type ASC])
+ +- Exchange(distribution=[hash[name, type]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
inventory, project=[name, type, amount], metadata=[],
aggregates=[grouping=[name,type], aggFunctions=[LongSumAggFunction(amount)]]]],
fields=[name, type, sum$0])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
index b6fb8db11bd..88d4eba4d39 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
@@ -144,7 +144,7 @@ LogicalProject(a=[$0], b=[$1])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, b])
-+- TableSourceScan(table=[[default_catalog, default_database, MyLtzTable,
project=[a, b], metadata=[originTime], watermark=[TO_TIMESTAMP_LTZ(originTime,
3)]]], fields=[a, b, originTime])
++- TableSourceScan(table=[[default_catalog, default_database, MyLtzTable,
project=[a, b], metadata=[originTime],
watermark=[TO_TIMESTAMP_LTZ($metadata$originTime, 3)]]], fields=[a, b,
$metadata$originTime])
]]>
</Resource>
</TestCase>
@@ -182,7 +182,7 @@ LogicalProject(a=[$0], b=[$1])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, b])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable,
project=[a, b], metadata=[originTime],
watermark=[TO_TIMESTAMP(FROM_UNIXTIME(/(originTime, 1000)),
_UTF-16LE'yyyy-MM-dd HH:mm:ss')]]], fields=[a, b, originTime])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable,
project=[a, b], metadata=[originTime],
watermark=[TO_TIMESTAMP(FROM_UNIXTIME(/($metadata$originTime, 1000)),
_UTF-16LE'yyyy-MM-dd HH:mm:ss')]]], fields=[a, b, $metadata$originTime])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 8759c6acf50..7c252f7ae31 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -192,8 +192,8 @@ LogicalProject(a=[$0], other_metadata=[CAST($4):INTEGER],
b=[$1], c=[$2], metada
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, CAST(metadata_3 AS INTEGER) AS other_metadata, b, c,
metadata_1, UPPER(metadata_1) AS computed])
-+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]],
fields=[a, b, c, metadata_1, metadata_3])
+Calc(select=[a, CAST($metadata$metadata_3 AS INTEGER) AS other_metadata, b, c,
$metadata$metadata_1 AS metadata_1, UPPER($metadata$metadata_1) AS computed])
++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]],
fields=[a, b, c, $metadata$metadata_1, $metadata$metadata_3])
]]>
</Resource>
</TestCase>
@@ -209,8 +209,25 @@ LogicalProject(b=[$1], other_metadata=[CAST($4):INTEGER])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[b, CAST(metadata_3 AS INTEGER) AS other_metadata])
-+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable,
project=[b], metadata=[metadata_3]]], fields=[b, metadata_3])
+Calc(select=[b, CAST($metadata$metadata_3 AS INTEGER) AS other_metadata])
++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable,
project=[b], metadata=[metadata_3]]], fields=[b, $metadata$metadata_3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testDDLWithMetadataThatConflictsWithPhysicalColumn">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MetadataTable]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(timestamp=[$0], metadata_timestamp=[$2], other=[$1],
computed_other=[UPPER($1)], computed_timestamp=[CAST($2):VARCHAR(2147483647)
CHARACTER SET "UTF-16LE"])
++- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[timestamp, $metadata$timestamp AS metadata_timestamp,
$metadata$other AS other, UPPER($metadata$other) AS computed_other,
CAST($metadata$timestamp AS VARCHAR(2147483647)) AS computed_timestamp])
++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]],
fields=[timestamp, $metadata$other, $metadata$timestamp])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
index 78d2822cd4a..41f48c1cc4e 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
@@ -489,8 +489,23 @@
LogicalSink(table=[default_catalog.default_database.MetadataTable], fields=[a, b
<Resource name="optimized rel plan">
<![CDATA[
Sink(table=[default_catalog.default_database.MetadataTable], fields=[a, b, c,
metadata_1, metadata_2])
-+- Calc(select=[a, b, c, metadata_1, CAST(CAST(metadata_2 AS INTEGER) AS
BIGINT) AS metadata_2])
- +- TableSourceScan(table=[[default_catalog, default_database,
MetadataTable, project=[a, b, c], metadata=[metadata_1, metadata_2]]],
fields=[a, b, c, metadata_1, metadata_2])
++- Calc(select=[a, b, c, $metadata$metadata_1 AS metadata_1,
CAST(CAST($metadata$metadata_2 AS INTEGER) AS BIGINT) AS metadata_2])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MetadataTable, project=[a, b, c], metadata=[metadata_1, metadata_2]]],
fields=[a, b, c, $metadata$metadata_1, $metadata$metadata_2])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMetadataColumnThatConflictsWithPhysicalColumn">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.MetadataTable],
fields=[metadata_1, metadata_2, other, metadata_23])
++- LogicalProject(metadata_1=[$0], metadata_2=[$1], other=[$2],
metadata_23=[$4])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MetadataTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.MetadataTable],
fields=[metadata_1, metadata_2, other, $metadata$metadata_2])
++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable,
project=[metadata_1, metadata_2, other], metadata=[metadata_2]]],
fields=[metadata_1, metadata_2, other, $metadata$metadata_2])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index f8009ff2197..648a40ad94e 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -79,8 +79,8 @@ LogicalProject(id=[$0], nested1=[$1.nested1],
results=[+(+($1.nested1.value, $1.
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value +
deepNested_nested2_num) + metadata_1) AS results])
-+- TableSourceScan(table=[[default_catalog, default_database, T, project=[id,
deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]],
fields=[id, deepNested_nested1, deepNested_nested2_num, metadata_1])
+Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value +
deepNested_nested2_num) + $metadata$metadata_1) AS results])
++- TableSourceScan(table=[[default_catalog, default_database, T, project=[id,
deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]],
fields=[id, deepNested_nested1, deepNested_nested2_num, $metadata$metadata_1])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index 63051cba01d..c380c7ac9c5 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -99,6 +99,25 @@ class TableScanTest extends TableTestBase {
util.verifyExecPlan("SELECT * FROM MetadataTable")
}
+ @Test
+ def testDDLWithMetadataThatConflictsWithPhysicalColumn(): Unit = {
+ util.addTable(
+ s"""
+ |CREATE TABLE MetadataTable (
+ | `timestamp` TIMESTAMP(9),
+ | `metadata_timestamp` TIMESTAMP(0) METADATA FROM 'timestamp',
+ | `other` STRING METADATA,
+ | `computed_other` AS UPPER(`other`),
+ | `computed_timestamp` AS CAST(`metadata_timestamp` AS STRING)
+ |) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'false',
+ | 'readable-metadata' = 'timestamp:TIMESTAMP(0), other:STRING'
+ |)
+ """.stripMargin)
+ util.verifyExecPlan("SELECT * FROM MetadataTable")
+ }
+
@Test
def testDDLWithMetadataColumnProjectionPushDown(): Unit = {
// tests reordering, skipping, and casting of metadata
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index e8bfaeef453..f568041691f 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -429,6 +429,35 @@ class TableSinkTest extends TableTestBase {
util.verifyRelPlan(stmtSet)
}
+ @Test
+ def testMetadataColumnThatConflictsWithPhysicalColumn(): Unit = {
+ util.addTable(
+ s"""
+ |CREATE TABLE MetadataTable (
+ | `metadata_1` DOUBLE,
+ | `m_1` STRING METADATA FROM 'metadata_1' VIRTUAL,
+ | `m_2` BIGINT METADATA FROM 'metadata_2',
+ | `metadata_2` DOUBLE,
+ | `other` STRING
+ |) WITH (
+ | 'connector' = 'values',
+ | 'readable-metadata' = 'metadata_1:STRING, metadata_2:BIGINT',
+ | 'writable-metadata' = 'metadata_1:STRING, metadata_2:BIGINT'
+ |)
+ """.stripMargin)
+
+ val sql =
+ """
+ |INSERT INTO MetadataTable
+ |SELECT `metadata_1`, `m_2`, `metadata_2`, `other`
+ |FROM MetadataTable
+ |""".stripMargin
+ val stmtSet = util.tableEnv.createStatementSet()
+ stmtSet.addInsertSql(sql)
+
+ util.verifyRelPlan(stmtSet)
+ }
+
@Test
def testSinkDisorderChangeLogWithJoin(): Unit = {
util.tableEnv.executeSql(