This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push: new d014c22d880 [FLINK-37098] Fix selecting time attribute from a view d014c22d880 is described below commit d014c22d88029aefe578caf20bfad254bace2da4 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Fri Jan 10 15:18:37 2025 +0100 [FLINK-37098] Fix selecting time attribute from a view --- .../flink/table/api/internal/ShowCreateUtil.java | 6 ++- .../table/api/internal/TableEnvironmentImpl.java | 2 +- .../apache/flink/table/catalog/CatalogManager.java | 57 +++++++++++++++++++- .../table/catalog/QueryOperationCatalogView.java | 42 ++++++++++++--- .../catalog/CatalogBaseTableResolutionTest.java | 3 +- .../flink/table/catalog/CatalogManagerTest.java | 4 +- .../flink/table/utils/CatalogManagerMocks.java | 3 +- .../org/apache/flink/table/utils/ParserMock.java | 3 +- .../org/apache/flink/table/catalog/Column.java | 18 +++++++ .../catalog/QueryOperationCatalogViewTable.java | 4 +- .../table/planner/catalog/SqlCatalogViewTable.java | 62 +--------------------- .../planner/catalog/JavaCatalogTableTest.java | 25 +++++++++ .../SqlNodeToOperationConversionTestBase.java | 3 +- .../flink/table/planner/utils/PlannerMocks.java | 3 +- .../analyze/NonDeterministicUpdateAnalyzerTest.xml | 62 ++++++++++++---------- .../table/planner/catalog/JavaCatalogTableTest.xml | 28 +++++++++- .../planner/plan/common/ViewsExpandingTest.xml | 20 ++++--- .../plan/stream/sql/NonDeterministicDagTest.xml | 36 ++++++------- .../plan/stream/sql/agg/WindowAggregateTest.xml | 36 +++++++------ 19 files changed, 261 insertions(+), 156 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java index b3a5d8e6f02..cf789115626 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java @@ -92,8 +92,10 @@ public class ShowCreateUtil { "SHOW CREATE VIEW is only supported for views, but %s is a table. Please use SHOW CREATE TABLE instead.", viewIdentifier.asSerializableString())); } - StringBuilder stringBuilder = new StringBuilder(); - if (view.getOrigin() instanceof QueryOperationCatalogView) { + final StringBuilder stringBuilder = new StringBuilder(); + final CatalogBaseTable origin = view.getOrigin(); + if (origin instanceof QueryOperationCatalogView + && !((QueryOperationCatalogView) origin).supportsShowCreateView()) { throw new TableException( "SHOW CREATE VIEW is not supported for views registered by Table API."); } else { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index c4a58d9d351..c5c8df7e022 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -217,7 +217,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { getParser()::parseSqlExpression, isStreamingMode); catalogManager.initSchemaResolver( - isStreamingMode, operationTreeBuilder.getResolverBuilder()); + isStreamingMode, operationTreeBuilder.getResolverBuilder(), getParser()); this.operationCtx = new ExecutableOperationContextImpl( catalogManager, diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index bba3673c435..3dca9ab10ba 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -43,9 +43,12 @@ import org.apache.flink.table.catalog.listener.CreateDatabaseEvent; import org.apache.flink.table.catalog.listener.CreateTableEvent; import org.apache.flink.table.catalog.listener.DropDatabaseEvent; import org.apache.flink.table.catalog.listener.DropTableEvent; +import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -66,6 +69,7 @@ import java.util.Set; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import static java.lang.String.format; @@ -93,6 +97,7 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { private @Nullable String currentDatabaseName; private DefaultSchemaResolver schemaResolver; + private Parser parser; // The name of the built-in catalog private final String builtInCatalogName; @@ -274,9 +279,12 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { * @see TableEnvironmentImpl#create(EnvironmentSettings) */ public void initSchemaResolver( - boolean isStreamingMode, ExpressionResolverBuilder expressionResolverBuilder) { + boolean isStreamingMode, + ExpressionResolverBuilder expressionResolverBuilder, + Parser parser) { this.schemaResolver = new DefaultSchemaResolver(isStreamingMode, typeFactory, expressionResolverBuilder); + this.parser = parser; } /** Returns a {@link SchemaResolver} for creating {@link ResolvedSchema} from {@link Schema}. */ @@ -1496,8 +1504,53 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { if (view instanceof ResolvedCatalogView) { return (ResolvedCatalogView) view; } + + if (view instanceof QueryOperationCatalogView) { + final QueryOperation queryOperation = + ((QueryOperationCatalogView) view).getQueryOperation(); + return new ResolvedCatalogView(view, queryOperation.getResolvedSchema()); + } + final ResolvedSchema resolvedSchema = view.getUnresolvedSchema().resolve(schemaResolver); - return new ResolvedCatalogView(view, resolvedSchema); + final List<Operation> parse; + try { + parse = parser.parse(view.getExpandedQuery()); + } catch (Throwable e) { + // in case of a failure during parsing, let the lower layers fail + return new ResolvedCatalogView(view, resolvedSchema); + } + if (parse.size() != 1 || !(parse.get(0) instanceof QueryOperation)) { + // parsing a view should result in a single query operation + // if it is not what we expect, we let the lower layers fail + return new ResolvedCatalogView(view, resolvedSchema); + } else { + final QueryOperation operation = (QueryOperation) parse.get(0); + final ResolvedSchema querySchema = operation.getResolvedSchema(); + if (querySchema.getColumns().size() != resolvedSchema.getColumns().size()) { + // in case the query does not match the number of expected columns, let the lower + // layers fail + return new ResolvedCatalogView(view, resolvedSchema); + } + final ResolvedSchema renamedQuerySchema = + new ResolvedSchema( + IntStream.range(0, resolvedSchema.getColumnCount()) + .mapToObj( + i -> + querySchema + .getColumn(i) + .get() + .rename( + resolvedSchema + .getColumnNames() + .get(i))) + .collect(Collectors.toList()), + resolvedSchema.getWatermarkSpecs(), + resolvedSchema.getPrimaryKey().orElse(null)); + return new ResolvedCatalogView( + // pass a view that has the query parsed and + // validated already + new QueryOperationCatalogView(operation, view), renamedQuerySchema); + } } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java index dc4f0a7c370..0a820bb3ca9 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java @@ -24,6 +24,8 @@ import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableException; import org.apache.flink.table.operations.QueryOperation; +import javax.annotation.Nullable; + import java.util.Map; import java.util.Optional; @@ -32,9 +34,16 @@ import java.util.Optional; public final class QueryOperationCatalogView implements CatalogView { private final QueryOperation queryOperation; + private final @Nullable CatalogView originalView; public QueryOperationCatalogView(QueryOperation queryOperation) { + this(queryOperation, null); + } + + public QueryOperationCatalogView( + final QueryOperation queryOperation, final CatalogView originalView) { this.queryOperation = queryOperation; + this.originalView = originalView; } public QueryOperation getQueryOperation() { @@ -48,17 +57,23 @@ public final class QueryOperationCatalogView implements CatalogView { @Override public Map<String, String> getOptions() { - throw new TableException("A view backed by a query operation has no options."); + if (originalView == null) { + throw new TableException("A view backed by a query operation has no options."); + } else { + return originalView.getOptions(); + } } @Override public String getComment() { - return queryOperation.asSummaryString(); + return Optional.ofNullable(originalView) + .map(CatalogView::getComment) + .orElseGet(queryOperation::asSummaryString); } @Override public QueryOperationCatalogView copy() { - return new QueryOperationCatalogView(queryOperation); + return new QueryOperationCatalogView(queryOperation, originalView); } @Override @@ -73,13 +88,26 @@ public final class QueryOperationCatalogView implements CatalogView { @Override public String getOriginalQuery() { - throw new TableException( - "A view backed by a query operation has no serializable representation."); + if (originalView == null) { + throw new TableException( + "A view backed by a query operation has no serializable representation."); + } else { + return originalView.getOriginalQuery(); + } } @Override public String getExpandedQuery() { - throw new TableException( - "A view backed by a query operation has no serializable representation."); + if (originalView == null) { + throw new TableException( + "A view backed by a query operation has no serializable representation."); + } else { + return originalView.getExpandedQuery(); + } + } + + @Internal + public boolean supportsShowCreateView() { + return originalView != null; } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java index a9436ac21df..9afc5545d0a 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java @@ -33,6 +33,7 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.utils.CatalogManagerMocks; import org.apache.flink.table.utils.ExpressionResolverMocks; +import org.apache.flink.table.utils.ParserMock; import org.junit.jupiter.api.Test; @@ -482,7 +483,7 @@ class CatalogBaseTableResolutionTest { ExpressionResolverMocks.forSqlExpression( CatalogBaseTableResolutionTest::resolveSqlExpression); - catalogManager.initSchemaResolver(true, expressionResolverBuilder); + catalogManager.initSchemaResolver(true, expressionResolverBuilder, new ParserMock()); return catalogManager; } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java index 5dac3b1add9..9470eb136f3 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java @@ -32,6 +32,7 @@ import org.apache.flink.table.catalog.listener.DropDatabaseEvent; import org.apache.flink.table.catalog.listener.DropTableEvent; import org.apache.flink.table.utils.CatalogManagerMocks; import org.apache.flink.table.utils.ExpressionResolverMocks; +import org.apache.flink.table.utils.ParserMock; import org.junit.jupiter.api.Test; @@ -129,7 +130,8 @@ class CatalogManagerTest { dropFuture, dropTemporaryFuture)); - catalogManager.initSchemaResolver(true, ExpressionResolverMocks.dummyResolver()); + catalogManager.initSchemaResolver( + true, ExpressionResolverMocks.dummyResolver(), new ParserMock()); // Create a view catalogManager.createTable( CatalogView.of(Schema.newBuilder().build(), null, "", "", Collections.emptyMap()), diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java index f21ee3f1b85..3946294d480 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java @@ -71,7 +71,8 @@ public final class CatalogManagerMocks { builder.catalogStoreHolder(catalogStoreHolder); } final CatalogManager catalogManager = builder.build(); - catalogManager.initSchemaResolver(true, ExpressionResolverMocks.dummyResolver()); + catalogManager.initSchemaResolver( + true, ExpressionResolverMocks.dummyResolver(), new ParserMock()); return catalogManager; } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java index 526c682fb1d..9cce26051c6 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java @@ -27,13 +27,14 @@ import org.apache.flink.table.types.logical.RowType; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; /** Mocks {@link Parser} for tests. */ public class ParserMock implements Parser { @Override public List<Operation> parse(String statement) { - return null; + return Collections.emptyList(); } @Override diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java index faeae16b75b..11a72a0b48d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java @@ -135,6 +135,9 @@ public abstract class Column { /** Returns a copy of the column with a replaced {@link DataType}. */ public abstract Column copy(DataType newType); + /** Returns a copy of the column with a replaced name. */ + public abstract Column rename(String newName); + @Override public boolean equals(Object o) { if (this == o) { @@ -202,6 +205,11 @@ public abstract class Column { public Column copy(DataType newDataType) { return new PhysicalColumn(name, newDataType, comment); } + + @Override + public Column rename(String newName) { + return new PhysicalColumn(newName, dataType, comment); + } } /** Representation of a computed column. */ @@ -252,6 +260,11 @@ public abstract class Column { return new ComputedColumn(name, newDataType, expression, comment); } + @Override + public Column rename(String newName) { + return new ComputedColumn(newName, dataType, expression, comment); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -344,6 +357,11 @@ public abstract class Column { return new MetadataColumn(name, newDataType, metadataKey, isVirtual, comment); } + @Override + public Column rename(String newName) { + return new MetadataColumn(newName, dataType, metadataKey, isVirtual, comment); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java index 32353c182de..91a42179730 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java @@ -30,6 +30,7 @@ import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptSchema; import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; @@ -77,6 +78,7 @@ public class QueryOperationCatalogViewTable extends ExpandingPreparingTable { final Context chain = Contexts.of(context, cluster.getPlanner().getContext()); final FlinkRelBuilder relBuilder = FlinkRelBuilder.of(chain, cluster, getRelOptSchema()); - return relBuilder.queryOperation(catalogView.getQueryOperation()).build(); + return RelOptUtil.createCastRel( + relBuilder.queryOperation(catalogView.getQueryOperation()).build(), rowType, true); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java index a6ccd06f9cb..f4c60563d8c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java @@ -20,19 +20,15 @@ package org.apache.flink.table.planner.catalog; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable; -import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType; import org.apache.flink.table.planner.plan.stats.FlinkStatistic; import org.apache.calcite.plan.RelOptSchema; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelRecordType; import javax.annotation.Nullable; -import java.util.AbstractList; import java.util.List; /** @@ -60,62 +56,6 @@ public class SqlCatalogViewTable extends ExpandingPreparingTable { public RelNode convertToRel(ToRelContext context) { RelNode original = context.expandView(rowType, view.getExpandedQuery(), viewPath, names).project(); - RelDataType castTargetType = - adaptTimeAttributes( - original.getRowType(), rowType, context.getCluster().getTypeFactory()); - return RelOptUtil.createCastRel(original, castTargetType, true); - } - - private static RelDataType adaptTimeAttributes( - RelDataType queryType, RelDataType targetType, RelDataTypeFactory typeFactory) { - if (queryType instanceof RelRecordType) { - if (RelOptUtil.areRowTypesEqual(queryType, targetType, true)) { - return targetType; - } else if (targetType.getFieldCount() != queryType.getFieldCount()) { - throw new IllegalArgumentException( - "Field counts are not equal: queryType [" - + queryType - + "]" - + " castRowType [" - + targetType - + "]"); - } else { - return adaptTimeAttributeInRecord( - (RelRecordType) queryType, (RelRecordType) targetType, typeFactory); - } - } else { - return adaptTimeAttributeInSimpleType(queryType, targetType, typeFactory); - } - } - - private static RelDataType adaptTimeAttributeInRecord( - RelRecordType queryType, RelRecordType targetType, RelDataTypeFactory typeFactory) { - RelDataType structType = - typeFactory.createStructType( - targetType.getStructKind(), - new AbstractList<RelDataType>() { - public RelDataType get(int index) { - RelDataType targetFieldType = - (targetType.getFieldList().get(index)).getType(); - RelDataType queryFieldType = - (queryType.getFieldList().get(index)).getType(); - return adaptTimeAttributes( - queryFieldType, targetFieldType, typeFactory); - } - - public int size() { - return targetType.getFieldCount(); - } - }, - targetType.getFieldNames()); - return typeFactory.createTypeWithNullability(structType, targetType.isNullable()); - } - - private static RelDataType adaptTimeAttributeInSimpleType( - RelDataType queryType, RelDataType targetType, RelDataTypeFactory typeFactory) { - if (queryType instanceof TimeIndicatorRelDataType) { - return typeFactory.createTypeWithNullability(queryType, targetType.isNullable()); - } - return targetType; + return RelOptUtil.createCastRel(original, rowType, true); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java index 419d14987f7..3b17fb8d464 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java @@ -177,6 +177,31 @@ class JavaCatalogTableTest extends TableTestBase { + "GROUP BY window_start, window_end"); } + @TestTemplate + void testTimeAttributeOfViewSelect() { + if (!streamingMode) { + // time attributes not supported in batch + return; + } + TableTestUtil testUtil = getTestUtil(); + TableEnvironment tableEnvironment = testUtil.getTableEnv(); + tableEnvironment.registerCatalog("cat", new CustomCatalog("cat")); + tableEnvironment.executeSql( + "CREATE TABLE `cat`.`default`.`t`(" + + " order_id INT, " + + " customer_id INT, " + + " product_id INT, " + + " product_ids ARRAY<INT>, " + + " ts TIMESTAMP_LTZ(3), WATERMARK FOR ts AS ts) " + + "WITH ('connector' = 'datagen')"); + tableEnvironment.executeSql( + "CREATE VIEW `cat`.`default`.v AS " + + "SELECT `o`.`order_id`, `o`.`customer_id`, `pids`.`product_id`, `o`.`ts`\n" + + "FROM `cat`.`default`.`t` AS `o`\n" + + "CROSS JOIN UNNEST(`o`.`product_ids`) AS `pids` (`product_id`)"); + testUtil.verifyExecPlan("SELECT * FROM `cat`.`default`.v"); + } + private static class CustomCatalog extends GenericInMemoryCatalog { public CustomCatalog(String name) { super(name); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java index 2b87c89ebf8..38bff172d09 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java @@ -99,7 +99,8 @@ public class SqlNodeToOperationConversionTestBase { public void before() throws TableAlreadyExistException, DatabaseNotExistException { catalogManager.initSchemaResolver( isStreamingMode, - ExpressionResolverMocks.basicResolver(catalogManager, functionCatalog, parser)); + ExpressionResolverMocks.basicResolver(catalogManager, functionCatalog, parser), + parser); final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1"); final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2"); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java index b852661f34e..1b430e0f448 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java @@ -105,7 +105,8 @@ public class PlannerMocks { }, functionCatalog.asLookup(parser::parseIdentifier), catalogManager.getDataTypeFactory(), - parser::parseSqlExpression)); + parser::parseSqlExpression), + parser); } public FlinkPlannerImpl getPlanner() { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml index 9726b5753b9..a2bb0a112ab 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml @@ -77,6 +77,26 @@ Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c]) advice[1]: [WARNING] You might want to enable upsert materialization for look up join operator by configuring ('table.optimizer.non-deterministic-update.strategy' to 'TRY_RESOLVE') to resolve the correctness issue caused by 'Non-Deterministic Updates' (NDU) in a changelog pipeline. +]]> + </Resource> + </TestCase> + <TestCase name="testCdcSourceWithoutPkSinkWithoutPk"> + <Resource name="sql"> + <![CDATA[insert into sink_without_pk +select metadata_1, b, metadata_2 +from cdc_without_pk]]> + </Resource> + <Resource name="optimized rel plan with advice"> + <![CDATA[ +Sink(table=[default_catalog.default_database.sink_without_pk], fields=[metadata_1, b, metadata_2]) ++- Calc(select=[metadata_1, b, metadata_2]) + +- TableSourceScan(table=[[default_catalog, default_database, cdc_without_pk, project=[b], metadata=[metadata_1, metadata_2]]], fields=[b, metadata_1, metadata_2]) + +advice[1]: [WARNING] The metadata column(s): 'metadata_1, metadata_2' in cdc source may cause wrong result or error on downstream operators, please consider removing these columns or use a non-cdc source that only has insert messages. +source node: +TableSourceScan(table=[[default_catalog, default_database, cdc_without_pk, project=[b], metadata=[metadata_1, metadata_2]]], fields=[b, metadata_1, metadata_2], changelogMode=[I,UB,UA,D]) + + ]]> </Resource> </TestCase> @@ -157,45 +177,31 @@ Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], Sink(table=[default_catalog.default_database.sink1], fields=[a, day, EXPR$2, EXPR$3]) +- GroupAggregate(advice=[1], groupBy=[a, day], select=[a, day, SUM_RETRACT(b) AS EXPR$2, COUNT_RETRACT(DISTINCT c) AS EXPR$3]) +- Exchange(distribution=[hash[a, day]]) - +- Calc(select=[a, day, b, c]) - +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, day, b, c, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + +- Calc(select=[a, day, b0 AS b, c]) + +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, day, b0, day0, c, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) - : +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day]) - : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a]) + : +- Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day]) + : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a, b], metadata=[]]], fields=[a, b]) +- Exchange(distribution=[hash[d]]) - +- TableSourceScan(table=[[default_catalog, default_database, src2, project=[b, c, d], metadata=[]]], fields=[b, c, d]) + +- Calc(select=[b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd')) AS day, c, d]) + +- TableSourceScan(table=[[default_catalog, default_database, src2, project=[b, c, d], metadata=[]]], fields=[b, c, d]) -Sink(table=[default_catalog.default_database.sink2], fields=[a, day, b, c]) -+- Calc(select=[a, day, b, c]) - +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, day, b, c, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +Sink(table=[default_catalog.default_database.sink2], fields=[a, day, b0, c]) ++- Calc(select=[a, day, b0, c], where=[>(b0, 100)]) + +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, day, b0, day0, c, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) - : +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day]) - : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a]) + : +- Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day]) + : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a, b], metadata=[]]], fields=[a, b]) +- Exchange(distribution=[hash[d]]) - +- Calc(select=[b, c, d], where=[>(b, 100)]) + +- Calc(select=[b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd')) AS day, c, d]) +- TableSourceScan(table=[[default_catalog, default_database, src2, project=[b, c, d], metadata=[]]], fields=[b, c, d]) advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.exec.mini-batch.enabled' to 'true', 'table.exec.mini-batch.allow-latency' to a positive long value, 'table.exec.mini-batch.size' to a positive long value). advice[2]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions. related rel plan: -Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], changelogMode=[I,UB,UA,D]) -+- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a], changelogMode=[I,UB,UA,D]) - - -]]> - </Resource> - </TestCase> - <TestCase name="testCdcSourceWithoutPkSinkWithoutPk"> - <Resource name="optimized rel plan with advice"> - <![CDATA[ -Sink(table=[default_catalog.default_database.sink_without_pk], fields=[metadata_1, b, metadata_2]) -+- Calc(select=[metadata_1, b, metadata_2]) - +- TableSourceScan(table=[[default_catalog, default_database, cdc_without_pk, project=[b], metadata=[metadata_1, metadata_2]]], fields=[b, metadata_1, metadata_2]) - -advice[1]: [WARNING] The metadata column(s): 'metadata_1, metadata_2' in cdc source may cause wrong result or error on downstream operators, please consider removing these columns or use a non-cdc source that only has insert messages. -source node: -TableSourceScan(table=[[default_catalog, default_database, cdc_without_pk, project=[b], metadata=[metadata_1, metadata_2]]], fields=[b, metadata_1, metadata_2], changelogMode=[I,UB,UA,D]) +Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], changelogMode=[I,UB,UA,D]) ++- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a, b], metadata=[]]], fields=[a, b], changelogMode=[I,UB,UA,D]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml index 224f8503f5a..a8b689b40b0 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml @@ -161,7 +161,7 @@ GROUP BY window_start, window_end]]> LogicalProject(EXPR$0=[$2], window_start=[$0]) +- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)]) +- LogicalProject(window_start=[$2], window_end=[$3], i=[$0]) - +- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER i, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) ts, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) window_time)]) + +- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER i, TIMESTAMP_LTZ(3) *ROWTIME* ts, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *ROWTIME* window_time)]) +- LogicalProject(i=[$0], ts=[$1]) +- LogicalProject(i=[$0], ts=[$1]) +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[$1]) @@ -176,6 +176,32 @@ Calc(select=[EXPR$0, window_start]) +- LocalWindowAggregate(window=[TUMBLE(time_col=[ts], size=[10 min])], select=[SUM(i) AS sum$0, slice_end('w$) AS $slice_end]) +- WatermarkAssigner(rowtime=[ts], watermark=[ts]) +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[i, ts]) +]]> + </Resource> + </TestCase> + <TestCase name="testTimeAttributeOfViewSelect[streamingMode = true]"> + <Resource name="sql"> + <![CDATA[SELECT * FROM `cat`.`default`.v]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(order_id=[$0], customer_id=[$1], product_id=[$2], ts=[$3]) ++- LogicalProject(order_id=[$0], customer_id=[$1], product_id=[$5], ts=[$4]) + +- LogicalCorrelate(correlation=[$cor3], joinType=[inner], requiredColumns=[{3}]) + :- LogicalWatermarkAssigner(rowtime=[ts], watermark=[$4]) + : +- LogicalTableScan(table=[[cat, default, t]]) + +- LogicalProject(product_id=[$0]) + +- Uncollect + +- LogicalProject(product_ids=[$cor3.product_ids]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[order_id, customer_id, f0 AS product_id, ts]) ++- Correlate(invocation=[$UNNEST_ROWS$1($cor3.product_ids)], correlate=[table($UNNEST_ROWS$1($cor3.product_ids))], select=[order_id,customer_id,product_id,product_ids,ts,f0], rowType=[RecordType(INTEGER order_id, INTEGER customer_id, INTEGER product_id, INTEGER ARRAY product_ids, TIMESTAMP_LTZ(3) *ROWTIME* ts, INTEGER f0)], joinType=[INNER]) + +- WatermarkAssigner(rowtime=[ts], watermark=[ts]) + +- TableSourceScan(table=[[cat, default, t]], fields=[order_id, customer_id, product_id, product_ids, ts]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml index ad37a1cdc1a..88ca88bf901 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml @@ -144,18 +144,17 @@ DataStreamScan(table=[[default_catalog, default_database, t1]], fields=[a, b, c] <Resource name="ast"> <![CDATA[ LogicalProject(a=[$0], b=[$1], c=[$2]) -+- LogicalProject(a=[CAST($0):INTEGER NOT NULL], b=[$1], c=[CAST($2):INTEGER]) ++- LogicalProject(a=[$0], b=[$1], c=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT($2)]) +- LogicalTableScan(table=[[default_catalog, default_database, t1]]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[CAST(a AS INTEGER) AS a, b, CAST(EXPR$2 AS INTEGER) AS c]) -+- HashAggregate(isMerge=[true], groupBy=[a, b], select=[a, b, Final_COUNT(count$0) AS EXPR$2]) - +- Exchange(distribution=[hash[a, b]]) - +- LocalHashAggregate(groupBy=[a, b], select=[a, b, Partial_COUNT(c) AS count$0]) - +- BoundedStreamScan(table=[[default_catalog, default_database, t1]], fields=[a, b, c]) +HashAggregate(isMerge=[true], groupBy=[a, b], select=[a, b, Final_COUNT(count$0) AS EXPR$2]) ++- Exchange(distribution=[hash[a, b]]) + +- LocalHashAggregate(groupBy=[a, b], select=[a, b, Partial_COUNT(c) AS count$0]) + +- BoundedStreamScan(table=[[default_catalog, default_database, t1]], fields=[a, b, c]) ]]> </Resource> </TestCase> @@ -189,17 +188,16 @@ Calc(select=[f0, name AS f1]) <Resource name="ast"> <![CDATA[ LogicalProject(a=[$0], b=[$1], c=[$2]) -+- LogicalProject(a=[CAST($0):INTEGER NOT NULL], b=[$1], c=[CAST($2):INTEGER]) ++- LogicalProject(a=[$0], b=[$1], c=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT($2)]) +- LogicalTableScan(table=[[default_catalog, default_database, t1]]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[CAST(a AS INTEGER) AS a, b, CAST(EXPR$2 AS INTEGER) AS c]) -+- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(c) AS EXPR$2]) - +- Exchange(distribution=[hash[a, b]]) - +- DataStreamScan(table=[[default_catalog, default_database, t1]], fields=[a, b, c]) +GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(c) AS EXPR$2]) ++- Exchange(distribution=[hash[a, b]]) + +- DataStreamScan(table=[[default_catalog, default_database, t1]], fields=[a, b, c]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml index b15149604fe..742207b6bd0 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml @@ -2462,28 +2462,23 @@ LogicalSink(table=[default_catalog.default_database.sink2], fields=[a, day, b, c </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Exchange(distribution=[hash[a]])(reuse_id=[1]) -+- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day]) - +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a]) - -TableSourceScan(table=[[default_catalog, default_database, src2, project=[b, c, d], metadata=[]]], fields=[b, c, d])(reuse_id=[2]) +Join(joinType=[InnerJoin], where=[(a = d)], select=[a, b, day, b0, day0, c, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])(reuse_id=[1]) +:- Exchange(distribution=[hash[a]]) +: +- Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day]) +: +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a, b], metadata=[]]], fields=[a, b]) ++- Exchange(distribution=[hash[d]]) + +- Calc(select=[b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd')) AS day, c, d]) + +- TableSourceScan(table=[[default_catalog, default_database, src2, project=[b, c, d], metadata=[]]], fields=[b, c, d]) Sink(table=[default_catalog.default_database.sink1], fields=[a, day, EXPR$2, EXPR$3]) +- GroupAggregate(groupBy=[a, day], select=[a, day, SUM_RETRACT(b) AS EXPR$2, COUNT_RETRACT(DISTINCT c) AS EXPR$3]) +- Exchange(distribution=[hash[a, day]]) - +- Calc(select=[a, day, b, c]) - +- Join(joinType=[InnerJoin], where=[(a = d)], select=[a, day, b, c, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) - :- Reused(reference_id=[1]) - +- Exchange(distribution=[hash[d]]) - +- Reused(reference_id=[2]) + +- Calc(select=[a, day, b0 AS b, c]) + +- Reused(reference_id=[1]) -Sink(table=[default_catalog.default_database.sink2], fields=[a, day, b, c]) -+- Calc(select=[a, day, b, c]) - +- Join(joinType=[InnerJoin], where=[(a = d)], select=[a, day, b, c, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) - :- Reused(reference_id=[1]) - +- Exchange(distribution=[hash[d]]) - +- Calc(select=[b, c, d], where=[(b > 100)]) - +- Reused(reference_id=[2]) +Sink(table=[default_catalog.default_database.sink2], fields=[a, day, b0, c]) ++- Calc(select=[a, day, b0, c], where=[(b0 > 100)]) + +- Reused(reference_id=[1]) ]]> </Resource> </TestCase> @@ -2547,17 +2542,18 @@ LogicalSink(table=[default_catalog.default_database.sink2], fields=[a, b, d]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -TableSourceScan(table=[[default_catalog, default_database, nested_src, project=[deepNested, deepNested_nested1_value, deepNested_nested2_num, metadata_1], metadata=[metadata_1]]], fields=[deepNested, deepNested_nested1_value, deepNested_nested2_num, metadata_1])(reuse_id=[1]) +Calc(select=[id, deepNested_nested2_num AS a, deepNested_nested1_name AS name, ((deepNested_nested1_value + deepNested_nested2_num) + metadata_1) AS b])(reuse_id=[1]) ++- TableSourceScan(table=[[default_catalog, default_database, nested_src, project=[id, deepNested_nested2_num, deepNested_nested1_name, deepNested_nested1_value], metadata=[metadata_1]]], fields=[id, deepNested_nested2_num, deepNested_nested1_name, deepNested_nested1_value, metadata_1]) Sink(table=[default_catalog.default_database.sink1], fields=[a, b, d]) +- Calc(select=[a, day AS b, CAST(EXPR$2 AS BIGINT) AS d]) +- GroupAggregate(groupBy=[a, day], select=[a, day, SUM_RETRACT(b) AS EXPR$2]) +- Exchange(distribution=[hash[a, day]]) - +- Calc(select=[deepNested_nested2_num AS a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day, ((deepNested_nested1_value + deepNested_nested2_num) + metadata_1) AS b]) + +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day, b]) +- Reused(reference_id=[1]) Sink(table=[default_catalog.default_database.sink2], fields=[a, b, d]) -+- Calc(select=[deepNested.nested2.num AS a, deepNested.nested1.name AS b, CAST(((deepNested.nested1.value + deepNested.nested2.num) + metadata_1) AS BIGINT) AS d], where=[(((deepNested.nested1.value + deepNested.nested2.num) + metadata_1) > 100)]) ++- Calc(select=[a, name AS b, CAST(b AS BIGINT) AS d], where=[(b > 100)]) +- Reused(reference_id=[1]) ]]> </Resource> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml index 75fb934413f..9d83070a4f1 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml @@ -2209,22 +2209,24 @@ LogicalSink(table=[default_catalog.default_database.s1], fields=[window_start, w </Resource> <Resource name="optimized exec plan"> <![CDATA[ -TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[b, e, rowtime], metadata=[]]], fields=[b, e, rowtime])(reuse_id=[1]) +WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])])(reuse_id=[1]) ++- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) Sink(table=[default_catalog.default_database.s1], fields=[window_start, window_end, wAvg]) +- Calc(select=[window_start, window_end, wAvg]) - +- WindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[weightedAvg(b, e) AS wAvg, start('w$) AS window_start, end('w$) AS window_end]) + +- WindowAggregate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], select=[weightedAvg(b, e) AS wAvg, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[single]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)]) + +- Calc(select=[window_start, window_end, b, e]) +- Reused(reference_id=[1]) Sink(table=[default_catalog.default_database.s1], fields=[window_start, window_end, EXPR$2]) +- Calc(select=[window_start, window_end, EXPR$2]) - +- WindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[COUNT(*) AS EXPR$2, start('w$) AS window_start, end('w$) AS window_end]) + +- WindowAggregate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], select=[COUNT(*) AS EXPR$2, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[single]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)]) - +- Calc(select=[rowtime]) - +- Reused(reference_id=[1]) + +- Calc(select=[window_start, window_end]) + +- Reused(reference_id=[1]) ]]> </Resource> </TestCase> @@ -2254,24 +2256,26 @@ LogicalSink(table=[default_catalog.default_database.s1], fields=[window_start, w </Resource> <Resource name="optimized exec plan"> <![CDATA[ -TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[b, e, rowtime], metadata=[]]], fields=[b, e, rowtime])(reuse_id=[1]) +WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])])(reuse_id=[1]) ++- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) Sink(table=[default_catalog.default_database.s1], fields=[window_start, window_end, wAvg]) +- Calc(select=[window_start, window_end, wAvg]) - +- GlobalWindowAggregate(window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[weightedAvg(weightedavg$0) AS wAvg, start('w$) AS window_start, end('w$) AS window_end]) + +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[15 min])], select=[weightedAvg(weightedavg$0) AS wAvg, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[single]) - +- LocalWindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[weightedAvg(b, e) AS weightedavg$0, slice_end('w$) AS $slice_end]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)]) + +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], select=[weightedAvg(b, e) AS weightedavg$0, slice_end('w$) AS $window_end]) + +- Calc(select=[window_start, window_end, b, e]) +- Reused(reference_id=[1]) Sink(table=[default_catalog.default_database.s1], fields=[window_start, window_end, EXPR$2]) +- Calc(select=[window_start, window_end, EXPR$2]) - +- GlobalWindowAggregate(window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[COUNT(count1$0) AS EXPR$2, start('w$) AS window_start, end('w$) AS window_end]) + +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[15 min])], select=[COUNT(count1$0) AS EXPR$2, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[single]) - +- LocalWindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $slice_end]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)]) - +- Calc(select=[rowtime]) - +- Reused(reference_id=[1]) + +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $window_end]) + +- Calc(select=[window_start, window_end]) + +- Reused(reference_id=[1]) ]]> </Resource> </TestCase>