This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new d014c22d880 [FLINK-37098] Fix selecting time attribute from a view
d014c22d880 is described below

commit d014c22d88029aefe578caf20bfad254bace2da4
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Fri Jan 10 15:18:37 2025 +0100

    [FLINK-37098] Fix selecting time attribute from a view
---
 .../flink/table/api/internal/ShowCreateUtil.java   |  6 ++-
 .../table/api/internal/TableEnvironmentImpl.java   |  2 +-
 .../apache/flink/table/catalog/CatalogManager.java | 57 +++++++++++++++++++-
 .../table/catalog/QueryOperationCatalogView.java   | 42 ++++++++++++---
 .../catalog/CatalogBaseTableResolutionTest.java    |  3 +-
 .../flink/table/catalog/CatalogManagerTest.java    |  4 +-
 .../flink/table/utils/CatalogManagerMocks.java     |  3 +-
 .../org/apache/flink/table/utils/ParserMock.java   |  3 +-
 .../org/apache/flink/table/catalog/Column.java     | 18 +++++++
 .../catalog/QueryOperationCatalogViewTable.java    |  4 +-
 .../table/planner/catalog/SqlCatalogViewTable.java | 62 +---------------------
 .../planner/catalog/JavaCatalogTableTest.java      | 25 +++++++++
 .../SqlNodeToOperationConversionTestBase.java      |  3 +-
 .../flink/table/planner/utils/PlannerMocks.java    |  3 +-
 .../analyze/NonDeterministicUpdateAnalyzerTest.xml | 62 ++++++++++++----------
 .../table/planner/catalog/JavaCatalogTableTest.xml | 28 +++++++++-
 .../planner/plan/common/ViewsExpandingTest.xml     | 20 ++++---
 .../plan/stream/sql/NonDeterministicDagTest.xml    | 36 ++++++-------
 .../plan/stream/sql/agg/WindowAggregateTest.xml    | 36 +++++++------
 19 files changed, 261 insertions(+), 156 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
index b3a5d8e6f02..cf789115626 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
@@ -92,8 +92,10 @@ public class ShowCreateUtil {
                             "SHOW CREATE VIEW is only supported for views, but 
%s is a table. Please use SHOW CREATE TABLE instead.",
                             viewIdentifier.asSerializableString()));
         }
-        StringBuilder stringBuilder = new StringBuilder();
-        if (view.getOrigin() instanceof QueryOperationCatalogView) {
+        final StringBuilder stringBuilder = new StringBuilder();
+        final CatalogBaseTable origin = view.getOrigin();
+        if (origin instanceof QueryOperationCatalogView
+                && !((QueryOperationCatalogView) 
origin).supportsShowCreateView()) {
             throw new TableException(
                     "SHOW CREATE VIEW is not supported for views registered by 
Table API.");
         } else {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index c4a58d9d351..c5c8df7e022 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -217,7 +217,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                         getParser()::parseSqlExpression,
                         isStreamingMode);
         catalogManager.initSchemaResolver(
-                isStreamingMode, operationTreeBuilder.getResolverBuilder());
+                isStreamingMode, operationTreeBuilder.getResolverBuilder(), 
getParser());
         this.operationCtx =
                 new ExecutableOperationContextImpl(
                         catalogManager,
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index bba3673c435..3dca9ab10ba 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -43,9 +43,12 @@ import 
org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
 import org.apache.flink.table.catalog.listener.CreateTableEvent;
 import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
 import org.apache.flink.table.catalog.listener.DropTableEvent;
+import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.delegation.Planner;
 import 
org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
@@ -66,6 +69,7 @@ import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static java.lang.String.format;
@@ -93,6 +97,7 @@ public final class CatalogManager implements CatalogRegistry, 
AutoCloseable {
     private @Nullable String currentDatabaseName;
 
     private DefaultSchemaResolver schemaResolver;
+    private Parser parser;
 
     // The name of the built-in catalog
     private final String builtInCatalogName;
@@ -274,9 +279,12 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
      * @see TableEnvironmentImpl#create(EnvironmentSettings)
      */
     public void initSchemaResolver(
-            boolean isStreamingMode, ExpressionResolverBuilder 
expressionResolverBuilder) {
+            boolean isStreamingMode,
+            ExpressionResolverBuilder expressionResolverBuilder,
+            Parser parser) {
         this.schemaResolver =
                 new DefaultSchemaResolver(isStreamingMode, typeFactory, 
expressionResolverBuilder);
+        this.parser = parser;
     }
 
     /** Returns a {@link SchemaResolver} for creating {@link ResolvedSchema} 
from {@link Schema}. */
@@ -1496,8 +1504,53 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
         if (view instanceof ResolvedCatalogView) {
             return (ResolvedCatalogView) view;
         }
+
+        if (view instanceof QueryOperationCatalogView) {
+            final QueryOperation queryOperation =
+                    ((QueryOperationCatalogView) view).getQueryOperation();
+            return new ResolvedCatalogView(view, 
queryOperation.getResolvedSchema());
+        }
+
         final ResolvedSchema resolvedSchema = 
view.getUnresolvedSchema().resolve(schemaResolver);
-        return new ResolvedCatalogView(view, resolvedSchema);
+        final List<Operation> parse;
+        try {
+            parse = parser.parse(view.getExpandedQuery());
+        } catch (Throwable e) {
+            // in case of a failure during parsing, let the lower layers fail
+            return new ResolvedCatalogView(view, resolvedSchema);
+        }
+        if (parse.size() != 1 || !(parse.get(0) instanceof QueryOperation)) {
+            // parsing a view should result in a single query operation
+            // if it is not what we expect, we let the lower layers fail
+            return new ResolvedCatalogView(view, resolvedSchema);
+        } else {
+            final QueryOperation operation = (QueryOperation) parse.get(0);
+            final ResolvedSchema querySchema = operation.getResolvedSchema();
+            if (querySchema.getColumns().size() != 
resolvedSchema.getColumns().size()) {
+                // in case the query does not match the number of expected 
columns, let the lower
+                // layers fail
+                return new ResolvedCatalogView(view, resolvedSchema);
+            }
+            final ResolvedSchema renamedQuerySchema =
+                    new ResolvedSchema(
+                            IntStream.range(0, resolvedSchema.getColumnCount())
+                                    .mapToObj(
+                                            i ->
+                                                    querySchema
+                                                            .getColumn(i)
+                                                            .get()
+                                                            .rename(
+                                                                    
resolvedSchema
+                                                                            
.getColumnNames()
+                                                                            
.get(i)))
+                                    .collect(Collectors.toList()),
+                            resolvedSchema.getWatermarkSpecs(),
+                            resolvedSchema.getPrimaryKey().orElse(null));
+            return new ResolvedCatalogView(
+                    // pass a view that has the query parsed and
+                    // validated already
+                    new QueryOperationCatalogView(operation, view), 
renamedQuerySchema);
+        }
     }
 
     /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java
index dc4f0a7c370..0a820bb3ca9 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.operations.QueryOperation;
 
+import javax.annotation.Nullable;
+
 import java.util.Map;
 import java.util.Optional;
 
@@ -32,9 +34,16 @@ import java.util.Optional;
 public final class QueryOperationCatalogView implements CatalogView {
 
     private final QueryOperation queryOperation;
+    private final @Nullable CatalogView originalView;
 
     public QueryOperationCatalogView(QueryOperation queryOperation) {
+        this(queryOperation, null);
+    }
+
+    public QueryOperationCatalogView(
+            final QueryOperation queryOperation, final CatalogView 
originalView) {
         this.queryOperation = queryOperation;
+        this.originalView = originalView;
     }
 
     public QueryOperation getQueryOperation() {
@@ -48,17 +57,23 @@ public final class QueryOperationCatalogView implements 
CatalogView {
 
     @Override
     public Map<String, String> getOptions() {
-        throw new TableException("A view backed by a query operation has no 
options.");
+        if (originalView == null) {
+            throw new TableException("A view backed by a query operation has 
no options.");
+        } else {
+            return originalView.getOptions();
+        }
     }
 
     @Override
     public String getComment() {
-        return queryOperation.asSummaryString();
+        return Optional.ofNullable(originalView)
+                .map(CatalogView::getComment)
+                .orElseGet(queryOperation::asSummaryString);
     }
 
     @Override
     public QueryOperationCatalogView copy() {
-        return new QueryOperationCatalogView(queryOperation);
+        return new QueryOperationCatalogView(queryOperation, originalView);
     }
 
     @Override
@@ -73,13 +88,26 @@ public final class QueryOperationCatalogView implements 
CatalogView {
 
     @Override
     public String getOriginalQuery() {
-        throw new TableException(
-                "A view backed by a query operation has no serializable 
representation.");
+        if (originalView == null) {
+            throw new TableException(
+                    "A view backed by a query operation has no serializable 
representation.");
+        } else {
+            return originalView.getOriginalQuery();
+        }
     }
 
     @Override
     public String getExpandedQuery() {
-        throw new TableException(
-                "A view backed by a query operation has no serializable 
representation.");
+        if (originalView == null) {
+            throw new TableException(
+                    "A view backed by a query operation has no serializable 
representation.");
+        } else {
+            return originalView.getExpandedQuery();
+        }
+    }
+
+    @Internal
+    public boolean supportsShowCreateView() {
+        return originalView != null;
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
index a9436ac21df..9afc5545d0a 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 import org.apache.flink.table.utils.ExpressionResolverMocks;
+import org.apache.flink.table.utils.ParserMock;
 
 import org.junit.jupiter.api.Test;
 
@@ -482,7 +483,7 @@ class CatalogBaseTableResolutionTest {
                 ExpressionResolverMocks.forSqlExpression(
                         CatalogBaseTableResolutionTest::resolveSqlExpression);
 
-        catalogManager.initSchemaResolver(true, expressionResolverBuilder);
+        catalogManager.initSchemaResolver(true, expressionResolverBuilder, new 
ParserMock());
 
         return catalogManager;
     }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
index 5dac3b1add9..9470eb136f3 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.table.catalog.listener.DropDatabaseEvent;
 import org.apache.flink.table.catalog.listener.DropTableEvent;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 import org.apache.flink.table.utils.ExpressionResolverMocks;
+import org.apache.flink.table.utils.ParserMock;
 
 import org.junit.jupiter.api.Test;
 
@@ -129,7 +130,8 @@ class CatalogManagerTest {
                                 dropFuture,
                                 dropTemporaryFuture));
 
-        catalogManager.initSchemaResolver(true, 
ExpressionResolverMocks.dummyResolver());
+        catalogManager.initSchemaResolver(
+                true, ExpressionResolverMocks.dummyResolver(), new 
ParserMock());
         // Create a view
         catalogManager.createTable(
                 CatalogView.of(Schema.newBuilder().build(), null, "", "", 
Collections.emptyMap()),
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java
index f21ee3f1b85..3946294d480 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java
@@ -71,7 +71,8 @@ public final class CatalogManagerMocks {
             builder.catalogStoreHolder(catalogStoreHolder);
         }
         final CatalogManager catalogManager = builder.build();
-        catalogManager.initSchemaResolver(true, 
ExpressionResolverMocks.dummyResolver());
+        catalogManager.initSchemaResolver(
+                true, ExpressionResolverMocks.dummyResolver(), new 
ParserMock());
         return catalogManager;
     }
 
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
index 526c682fb1d..9cce26051c6 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
@@ -27,13 +27,14 @@ import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.List;
 
 /** Mocks {@link Parser} for tests. */
 public class ParserMock implements Parser {
     @Override
     public List<Operation> parse(String statement) {
-        return null;
+        return Collections.emptyList();
     }
 
     @Override
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java
index faeae16b75b..11a72a0b48d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java
@@ -135,6 +135,9 @@ public abstract class Column {
     /** Returns a copy of the column with a replaced {@link DataType}. */
     public abstract Column copy(DataType newType);
 
+    /** Returns a copy of the column with a replaced name. */
+    public abstract Column rename(String newName);
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -202,6 +205,11 @@ public abstract class Column {
         public Column copy(DataType newDataType) {
             return new PhysicalColumn(name, newDataType, comment);
         }
+
+        @Override
+        public Column rename(String newName) {
+            return new PhysicalColumn(newName, dataType, comment);
+        }
     }
 
     /** Representation of a computed column. */
@@ -252,6 +260,11 @@ public abstract class Column {
             return new ComputedColumn(name, newDataType, expression, comment);
         }
 
+        @Override
+        public Column rename(String newName) {
+            return new ComputedColumn(newName, dataType, expression, comment);
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) {
@@ -344,6 +357,11 @@ public abstract class Column {
             return new MetadataColumn(name, newDataType, metadataKey, 
isVirtual, comment);
         }
 
+        @Override
+        public Column rename(String newName) {
+            return new MetadataColumn(newName, dataType, metadataKey, 
isVirtual, comment);
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
index 32353c182de..91a42179730 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
@@ -30,6 +30,7 @@ import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
 
@@ -77,6 +78,7 @@ public class QueryOperationCatalogViewTable extends 
ExpandingPreparingTable {
         final Context chain = Contexts.of(context, 
cluster.getPlanner().getContext());
         final FlinkRelBuilder relBuilder = FlinkRelBuilder.of(chain, cluster, 
getRelOptSchema());
 
-        return 
relBuilder.queryOperation(catalogView.getQueryOperation()).build();
+        return RelOptUtil.createCastRel(
+                
relBuilder.queryOperation(catalogView.getQueryOperation()).build(), rowType, 
true);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java
index a6ccd06f9cb..f4c60563d8c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java
@@ -20,19 +20,15 @@ package org.apache.flink.table.planner.catalog;
 
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable;
-import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
 
 import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelRecordType;
 
 import javax.annotation.Nullable;
 
-import java.util.AbstractList;
 import java.util.List;
 
 /**
@@ -60,62 +56,6 @@ public class SqlCatalogViewTable extends 
ExpandingPreparingTable {
     public RelNode convertToRel(ToRelContext context) {
         RelNode original =
                 context.expandView(rowType, view.getExpandedQuery(), viewPath, 
names).project();
-        RelDataType castTargetType =
-                adaptTimeAttributes(
-                        original.getRowType(), rowType, 
context.getCluster().getTypeFactory());
-        return RelOptUtil.createCastRel(original, castTargetType, true);
-    }
-
-    private static RelDataType adaptTimeAttributes(
-            RelDataType queryType, RelDataType targetType, RelDataTypeFactory 
typeFactory) {
-        if (queryType instanceof RelRecordType) {
-            if (RelOptUtil.areRowTypesEqual(queryType, targetType, true)) {
-                return targetType;
-            } else if (targetType.getFieldCount() != 
queryType.getFieldCount()) {
-                throw new IllegalArgumentException(
-                        "Field counts are not equal: queryType ["
-                                + queryType
-                                + "]"
-                                + " castRowType ["
-                                + targetType
-                                + "]");
-            } else {
-                return adaptTimeAttributeInRecord(
-                        (RelRecordType) queryType, (RelRecordType) targetType, 
typeFactory);
-            }
-        } else {
-            return adaptTimeAttributeInSimpleType(queryType, targetType, 
typeFactory);
-        }
-    }
-
-    private static RelDataType adaptTimeAttributeInRecord(
-            RelRecordType queryType, RelRecordType targetType, 
RelDataTypeFactory typeFactory) {
-        RelDataType structType =
-                typeFactory.createStructType(
-                        targetType.getStructKind(),
-                        new AbstractList<RelDataType>() {
-                            public RelDataType get(int index) {
-                                RelDataType targetFieldType =
-                                        
(targetType.getFieldList().get(index)).getType();
-                                RelDataType queryFieldType =
-                                        
(queryType.getFieldList().get(index)).getType();
-                                return adaptTimeAttributes(
-                                        queryFieldType, targetFieldType, 
typeFactory);
-                            }
-
-                            public int size() {
-                                return targetType.getFieldCount();
-                            }
-                        },
-                        targetType.getFieldNames());
-        return typeFactory.createTypeWithNullability(structType, 
targetType.isNullable());
-    }
-
-    private static RelDataType adaptTimeAttributeInSimpleType(
-            RelDataType queryType, RelDataType targetType, RelDataTypeFactory 
typeFactory) {
-        if (queryType instanceof TimeIndicatorRelDataType) {
-            return typeFactory.createTypeWithNullability(queryType, 
targetType.isNullable());
-        }
-        return targetType;
+        return RelOptUtil.createCastRel(original, rowType, true);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
index 419d14987f7..3b17fb8d464 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
@@ -177,6 +177,31 @@ class JavaCatalogTableTest extends TableTestBase {
                         + "GROUP BY window_start, window_end");
     }
 
+    @TestTemplate
+    void testTimeAttributeOfViewSelect() {
+        if (!streamingMode) {
+            // time attributes not supported in batch
+            return;
+        }
+        TableTestUtil testUtil = getTestUtil();
+        TableEnvironment tableEnvironment = testUtil.getTableEnv();
+        tableEnvironment.registerCatalog("cat", new CustomCatalog("cat"));
+        tableEnvironment.executeSql(
+                "CREATE TABLE `cat`.`default`.`t`("
+                        + " order_id INT, "
+                        + " customer_id INT, "
+                        + " product_id INT, "
+                        + " product_ids ARRAY<INT>, "
+                        + " ts TIMESTAMP_LTZ(3), WATERMARK FOR ts AS ts) "
+                        + "WITH ('connector' = 'datagen')");
+        tableEnvironment.executeSql(
+                "CREATE VIEW `cat`.`default`.v AS "
+                        + "SELECT `o`.`order_id`, `o`.`customer_id`, 
`pids`.`product_id`, `o`.`ts`\n"
+                        + "FROM `cat`.`default`.`t` AS `o`\n"
+                        + "CROSS JOIN UNNEST(`o`.`product_ids`) AS `pids` 
(`product_id`)");
+        testUtil.verifyExecPlan("SELECT * FROM `cat`.`default`.v");
+    }
+
     private static class CustomCatalog extends GenericInMemoryCatalog {
         public CustomCatalog(String name) {
             super(name);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
index 2b87c89ebf8..38bff172d09 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
@@ -99,7 +99,8 @@ public class SqlNodeToOperationConversionTestBase {
     public void before() throws TableAlreadyExistException, 
DatabaseNotExistException {
         catalogManager.initSchemaResolver(
                 isStreamingMode,
-                ExpressionResolverMocks.basicResolver(catalogManager, 
functionCatalog, parser));
+                ExpressionResolverMocks.basicResolver(catalogManager, 
functionCatalog, parser),
+                parser);
 
         final ObjectPath path1 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t1");
         final ObjectPath path2 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t2");
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
index b852661f34e..1b430e0f448 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
@@ -105,7 +105,8 @@ public class PlannerMocks {
                         },
                         functionCatalog.asLookup(parser::parseIdentifier),
                         catalogManager.getDataTypeFactory(),
-                        parser::parseSqlExpression));
+                        parser::parseSqlExpression),
+                parser);
     }
 
     public FlinkPlannerImpl getPlanner() {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml
index 9726b5753b9..a2bb0a112ab 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml
@@ -77,6 +77,26 @@ 
Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c])
 
 advice[1]: [WARNING] You might want to enable upsert materialization for look 
up join operator by configuring 
('table.optimizer.non-deterministic-update.strategy' to 'TRY_RESOLVE') to 
resolve the correctness issue caused by 'Non-Deterministic Updates' (NDU) in a 
changelog pipeline.
 
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCdcSourceWithoutPkSinkWithoutPk">
+    <Resource name="sql">
+      <![CDATA[insert into sink_without_pk
+select metadata_1, b, metadata_2
+from cdc_without_pk]]>
+    </Resource>
+    <Resource name="optimized rel plan with advice">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.sink_without_pk], 
fields=[metadata_1, b, metadata_2])
++- Calc(select=[metadata_1, b, metadata_2])
+   +- TableSourceScan(table=[[default_catalog, default_database, 
cdc_without_pk, project=[b], metadata=[metadata_1, metadata_2]]], fields=[b, 
metadata_1, metadata_2])
+
+advice[1]: [WARNING] The metadata column(s): 'metadata_1, metadata_2' in cdc 
source may cause wrong result or error on downstream operators, please consider 
removing these columns or use a non-cdc source that only has insert messages.
+source node:
+TableSourceScan(table=[[default_catalog, default_database, cdc_without_pk, 
project=[b], metadata=[metadata_1, metadata_2]]], fields=[b, metadata_1, 
metadata_2], changelogMode=[I,UB,UA,D])
+
+
 ]]>
     </Resource>
   </TestCase>
@@ -157,45 +177,31 @@ Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), 
_UTF-16LE'yyMMdd') AS day],
 Sink(table=[default_catalog.default_database.sink1], fields=[a, day, EXPR$2, 
EXPR$3])
 +- GroupAggregate(advice=[1], groupBy=[a, day], select=[a, day, SUM_RETRACT(b) 
AS EXPR$2, COUNT_RETRACT(DISTINCT c) AS EXPR$3])
    +- Exchange(distribution=[hash[a, day]])
-      +- Calc(select=[a, day, b, c])
-         +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, day, b, c, 
d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+      +- Calc(select=[a, day, b0 AS b, c])
+         +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, day, b0, 
day0, c, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
             :- Exchange(distribution=[hash[a]])
-            :  +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') 
AS day])
-            :     +- TableSourceScan(table=[[default_catalog, 
default_database, src1, project=[a], metadata=[]]], fields=[a])
+            :  +- Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), 
'yyMMdd') AS day])
+            :     +- TableSourceScan(table=[[default_catalog, 
default_database, src1, project=[a, b], metadata=[]]], fields=[a, b])
             +- Exchange(distribution=[hash[d]])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
src2, project=[b, c, d], metadata=[]]], fields=[b, c, d])
+               +- Calc(select=[b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP(), 
'yyMMdd')) AS day, c, d])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, src2, project=[b, c, d], metadata=[]]], fields=[b, c, d])
 
-Sink(table=[default_catalog.default_database.sink2], fields=[a, day, b, c])
-+- Calc(select=[a, day, b, c])
-   +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, day, b, c, d], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+Sink(table=[default_catalog.default_database.sink2], fields=[a, day, b0, c])
++- Calc(select=[a, day, b0, c], where=[>(b0, 100)])
+   +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, day, b0, day0, 
c, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
       :- Exchange(distribution=[hash[a]])
-      :  +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day])
-      :     +- TableSourceScan(table=[[default_catalog, default_database, 
src1, project=[a], metadata=[]]], fields=[a])
+      :  +- Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS 
day])
+      :     +- TableSourceScan(table=[[default_catalog, default_database, 
src1, project=[a, b], metadata=[]]], fields=[a, b])
       +- Exchange(distribution=[hash[d]])
-         +- Calc(select=[b, c, d], where=[>(b, 100)])
+         +- Calc(select=[b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP(), 
'yyMMdd')) AS day, c, d])
             +- TableSourceScan(table=[[default_catalog, default_database, 
src2, project=[b, c, d], metadata=[]]], fields=[b, c, d])
 
 advice[1]: [ADVICE] You might want to enable local-global two-phase 
optimization by configuring ('table.exec.mini-batch.enabled' to 'true', 
'table.exec.mini-batch.allow-latency' to a positive long value, 
'table.exec.mini-batch.size' to a positive long value).
 advice[2]: [WARNING] The column(s): day(generated by non-deterministic 
function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for 
correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' 
only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) 
or current node outputs non-deterministic update messages. Please consider 
removing these non-deterministic columns or making them deterministic by using 
deterministic functions.
 
 related rel plan:
-Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], 
changelogMode=[I,UB,UA,D])
-+- TableSourceScan(table=[[default_catalog, default_database, src1, 
project=[a], metadata=[]]], fields=[a], changelogMode=[I,UB,UA,D])
-
-
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testCdcSourceWithoutPkSinkWithoutPk">
-    <Resource name="optimized rel plan with advice">
-      <![CDATA[
-Sink(table=[default_catalog.default_database.sink_without_pk], 
fields=[metadata_1, b, metadata_2])
-+- Calc(select=[metadata_1, b, metadata_2])
-   +- TableSourceScan(table=[[default_catalog, default_database, 
cdc_without_pk, project=[b], metadata=[metadata_1, metadata_2]]], fields=[b, 
metadata_1, metadata_2])
-
-advice[1]: [WARNING] The metadata column(s): 'metadata_1, metadata_2' in cdc 
source may cause wrong result or error on downstream operators, please consider 
removing these columns or use a non-cdc source that only has insert messages.
-source node:
-TableSourceScan(table=[[default_catalog, default_database, cdc_without_pk, 
project=[b], metadata=[metadata_1, metadata_2]]], fields=[b, metadata_1, 
metadata_2], changelogMode=[I,UB,UA,D])
+Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS 
day], changelogMode=[I,UB,UA,D])
++- TableSourceScan(table=[[default_catalog, default_database, src1, 
project=[a, b], metadata=[]]], fields=[a, b], changelogMode=[I,UB,UA,D])
 
 
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
index 224f8503f5a..a8b689b40b0 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
@@ -161,7 +161,7 @@ GROUP BY window_start, window_end]]>
 LogicalProject(EXPR$0=[$2], window_start=[$0])
 +- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
    +- LogicalProject(window_start=[$2], window_end=[$3], i=[$0])
-      +- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 
600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER i, 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) ts, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) window_time)])
+      +- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 
600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER i, TIMESTAMP_LTZ(3) 
*ROWTIME* ts, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, 
TIMESTAMP_LTZ(3) *ROWTIME* window_time)])
          +- LogicalProject(i=[$0], ts=[$1])
             +- LogicalProject(i=[$0], ts=[$1])
                +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[$1])
@@ -176,6 +176,32 @@ Calc(select=[EXPR$0, window_start])
       +- LocalWindowAggregate(window=[TUMBLE(time_col=[ts], size=[10 min])], 
select=[SUM(i) AS sum$0, slice_end('w$) AS $slice_end])
          +- WatermarkAssigner(rowtime=[ts], watermark=[ts])
             +- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[i, ts])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTimeAttributeOfViewSelect[streamingMode = true]">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM `cat`.`default`.v]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(order_id=[$0], customer_id=[$1], product_id=[$2], ts=[$3])
++- LogicalProject(order_id=[$0], customer_id=[$1], product_id=[$5], ts=[$4])
+   +- LogicalCorrelate(correlation=[$cor3], joinType=[inner], 
requiredColumns=[{3}])
+      :- LogicalWatermarkAssigner(rowtime=[ts], watermark=[$4])
+      :  +- LogicalTableScan(table=[[cat, default, t]])
+      +- LogicalProject(product_id=[$0])
+         +- Uncollect
+            +- LogicalProject(product_ids=[$cor3.product_ids])
+               +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[order_id, customer_id, f0 AS product_id, ts])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor3.product_ids)], 
correlate=[table($UNNEST_ROWS$1($cor3.product_ids))], 
select=[order_id,customer_id,product_id,product_ids,ts,f0], 
rowType=[RecordType(INTEGER order_id, INTEGER customer_id, INTEGER product_id, 
INTEGER ARRAY product_ids, TIMESTAMP_LTZ(3) *ROWTIME* ts, INTEGER f0)], 
joinType=[INNER])
+   +- WatermarkAssigner(rowtime=[ts], watermark=[ts])
+      +- TableSourceScan(table=[[cat, default, t]], fields=[order_id, 
customer_id, product_id, product_ids, ts])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml
index ad37a1cdc1a..88ca88bf901 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml
@@ -144,18 +144,17 @@ DataStreamScan(table=[[default_catalog, default_database, 
t1]], fields=[a, b, c]
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalProject(a=[CAST($0):INTEGER NOT NULL], b=[$1], c=[CAST($2):INTEGER])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
    +- LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT($2)])
       +- LogicalTableScan(table=[[default_catalog, default_database, t1]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[CAST(a AS INTEGER) AS a, b, CAST(EXPR$2 AS INTEGER) AS c])
-+- HashAggregate(isMerge=[true], groupBy=[a, b], select=[a, b, 
Final_COUNT(count$0) AS EXPR$2])
-   +- Exchange(distribution=[hash[a, b]])
-      +- LocalHashAggregate(groupBy=[a, b], select=[a, b, Partial_COUNT(c) AS 
count$0])
-         +- BoundedStreamScan(table=[[default_catalog, default_database, t1]], 
fields=[a, b, c])
+HashAggregate(isMerge=[true], groupBy=[a, b], select=[a, b, 
Final_COUNT(count$0) AS EXPR$2])
++- Exchange(distribution=[hash[a, b]])
+   +- LocalHashAggregate(groupBy=[a, b], select=[a, b, Partial_COUNT(c) AS 
count$0])
+      +- BoundedStreamScan(table=[[default_catalog, default_database, t1]], 
fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -189,17 +188,16 @@ Calc(select=[f0, name AS f1])
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalProject(a=[CAST($0):INTEGER NOT NULL], b=[$1], c=[CAST($2):INTEGER])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
    +- LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT($2)])
       +- LogicalTableScan(table=[[default_catalog, default_database, t1]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[CAST(a AS INTEGER) AS a, b, CAST(EXPR$2 AS INTEGER) AS c])
-+- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(c) AS EXPR$2])
-   +- Exchange(distribution=[hash[a, b]])
-      +- DataStreamScan(table=[[default_catalog, default_database, t1]], 
fields=[a, b, c])
+GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(c) AS EXPR$2])
++- Exchange(distribution=[hash[a, b]])
+   +- DataStreamScan(table=[[default_catalog, default_database, t1]], 
fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
index b15149604fe..742207b6bd0 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
@@ -2462,28 +2462,23 @@ 
LogicalSink(table=[default_catalog.default_database.sink2], fields=[a, day, b, c
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Exchange(distribution=[hash[a]])(reuse_id=[1])
-+- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day])
-   +- TableSourceScan(table=[[default_catalog, default_database, src1, 
project=[a], metadata=[]]], fields=[a])
-
-TableSourceScan(table=[[default_catalog, default_database, src2, project=[b, 
c, d], metadata=[]]], fields=[b, c, d])(reuse_id=[2])
+Join(joinType=[InnerJoin], where=[(a = d)], select=[a, b, day, b0, day0, c, 
d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])(reuse_id=[1])
+:- Exchange(distribution=[hash[a]])
+:  +- Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day])
+:     +- TableSourceScan(table=[[default_catalog, default_database, src1, 
project=[a, b], metadata=[]]], fields=[a, b])
++- Exchange(distribution=[hash[d]])
+   +- Calc(select=[b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd')) AS 
day, c, d])
+      +- TableSourceScan(table=[[default_catalog, default_database, src2, 
project=[b, c, d], metadata=[]]], fields=[b, c, d])
 
 Sink(table=[default_catalog.default_database.sink1], fields=[a, day, EXPR$2, 
EXPR$3])
 +- GroupAggregate(groupBy=[a, day], select=[a, day, SUM_RETRACT(b) AS EXPR$2, 
COUNT_RETRACT(DISTINCT c) AS EXPR$3])
    +- Exchange(distribution=[hash[a, day]])
-      +- Calc(select=[a, day, b, c])
-         +- Join(joinType=[InnerJoin], where=[(a = d)], select=[a, day, b, c, 
d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
-            :- Reused(reference_id=[1])
-            +- Exchange(distribution=[hash[d]])
-               +- Reused(reference_id=[2])
+      +- Calc(select=[a, day, b0 AS b, c])
+         +- Reused(reference_id=[1])
 
-Sink(table=[default_catalog.default_database.sink2], fields=[a, day, b, c])
-+- Calc(select=[a, day, b, c])
-   +- Join(joinType=[InnerJoin], where=[(a = d)], select=[a, day, b, c, d], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
-      :- Reused(reference_id=[1])
-      +- Exchange(distribution=[hash[d]])
-         +- Calc(select=[b, c, d], where=[(b > 100)])
-            +- Reused(reference_id=[2])
+Sink(table=[default_catalog.default_database.sink2], fields=[a, day, b0, c])
++- Calc(select=[a, day, b0, c], where=[(b0 > 100)])
+   +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -2547,17 +2542,18 @@ 
LogicalSink(table=[default_catalog.default_database.sink2], fields=[a, b, d])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-TableSourceScan(table=[[default_catalog, default_database, nested_src, 
project=[deepNested, deepNested_nested1_value, deepNested_nested2_num, 
metadata_1], metadata=[metadata_1]]], fields=[deepNested, 
deepNested_nested1_value, deepNested_nested2_num, metadata_1])(reuse_id=[1])
+Calc(select=[id, deepNested_nested2_num AS a, deepNested_nested1_name AS name, 
((deepNested_nested1_value + deepNested_nested2_num) + metadata_1) AS 
b])(reuse_id=[1])
++- TableSourceScan(table=[[default_catalog, default_database, nested_src, 
project=[id, deepNested_nested2_num, deepNested_nested1_name, 
deepNested_nested1_value], metadata=[metadata_1]]], fields=[id, 
deepNested_nested2_num, deepNested_nested1_name, deepNested_nested1_value, 
metadata_1])
 
 Sink(table=[default_catalog.default_database.sink1], fields=[a, b, d])
 +- Calc(select=[a, day AS b, CAST(EXPR$2 AS BIGINT) AS d])
    +- GroupAggregate(groupBy=[a, day], select=[a, day, SUM_RETRACT(b) AS 
EXPR$2])
       +- Exchange(distribution=[hash[a, day]])
-         +- Calc(select=[deepNested_nested2_num AS a, 
DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day, ((deepNested_nested1_value + 
deepNested_nested2_num) + metadata_1) AS b])
+         +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day, 
b])
             +- Reused(reference_id=[1])
 
 Sink(table=[default_catalog.default_database.sink2], fields=[a, b, d])
-+- Calc(select=[deepNested.nested2.num AS a, deepNested.nested1.name AS b, 
CAST(((deepNested.nested1.value + deepNested.nested2.num) + metadata_1) AS 
BIGINT) AS d], where=[(((deepNested.nested1.value + deepNested.nested2.num) + 
metadata_1) > 100)])
++- Calc(select=[a, name AS b, CAST(b AS BIGINT) AS d], where=[(b > 100)])
    +- Reused(reference_id=[1])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 75fb934413f..9d83070a4f1 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -2209,22 +2209,24 @@ 
LogicalSink(table=[default_catalog.default_database.s1], fields=[window_start, w
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[b, e, rowtime], metadata=[]]], fields=[b, e, rowtime])(reuse_id=[1])
+WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 
min])])(reuse_id=[1])
++- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL 
SECOND)])
+   +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e, rowtime])
 
 Sink(table=[default_catalog.default_database.s1], fields=[window_start, 
window_end, wAvg])
 +- Calc(select=[window_start, window_end, wAvg])
-   +- WindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 min])], 
select=[weightedAvg(b, e) AS wAvg, start('w$) AS window_start, end('w$) AS 
window_end])
+   +- WindowAggregate(window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], select=[weightedAvg(b, e) AS wAvg, 
start('w$) AS window_start, end('w$) AS window_end])
       +- Exchange(distribution=[single])
-         +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
+         +- Calc(select=[window_start, window_end, b, e])
             +- Reused(reference_id=[1])
 
 Sink(table=[default_catalog.default_database.s1], fields=[window_start, 
window_end, EXPR$2])
 +- Calc(select=[window_start, window_end, EXPR$2])
-   +- WindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 min])], 
select=[COUNT(*) AS EXPR$2, start('w$) AS window_start, end('w$) AS window_end])
+   +- WindowAggregate(window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], select=[COUNT(*) AS EXPR$2, start('w$) 
AS window_start, end('w$) AS window_end])
       +- Exchange(distribution=[single])
-         +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
-            +- Calc(select=[rowtime])
-               +- Reused(reference_id=[1])
+         +- Calc(select=[window_start, window_end])
+            +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -2254,24 +2256,26 @@ 
LogicalSink(table=[default_catalog.default_database.s1], fields=[window_start, w
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[b, e, rowtime], metadata=[]]], fields=[b, e, rowtime])(reuse_id=[1])
+WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 
min])])(reuse_id=[1])
++- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL 
SECOND)])
+   +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e, rowtime])
 
 Sink(table=[default_catalog.default_database.s1], fields=[window_start, 
window_end, wAvg])
 +- Calc(select=[window_start, window_end, wAvg])
-   +- GlobalWindowAggregate(window=[TUMBLE(slice_end=[$slice_end], size=[15 
min])], select=[weightedAvg(weightedavg$0) AS wAvg, start('w$) AS window_start, 
end('w$) AS window_end])
+   +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[15 
min])], select=[weightedAvg(weightedavg$0) AS wAvg, start('w$) AS window_start, 
end('w$) AS window_end])
       +- Exchange(distribution=[single])
-         +- LocalWindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 
min])], select=[weightedAvg(b, e) AS weightedavg$0, slice_end('w$) AS 
$slice_end])
-            +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
+         +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], select=[weightedAvg(b, e) AS 
weightedavg$0, slice_end('w$) AS $window_end])
+            +- Calc(select=[window_start, window_end, b, e])
                +- Reused(reference_id=[1])
 
 Sink(table=[default_catalog.default_database.s1], fields=[window_start, 
window_end, EXPR$2])
 +- Calc(select=[window_start, window_end, EXPR$2])
-   +- GlobalWindowAggregate(window=[TUMBLE(slice_end=[$slice_end], size=[15 
min])], select=[COUNT(count1$0) AS EXPR$2, start('w$) AS window_start, end('w$) 
AS window_end])
+   +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[15 
min])], select=[COUNT(count1$0) AS EXPR$2, start('w$) AS window_start, end('w$) 
AS window_end])
       +- Exchange(distribution=[single])
-         +- LocalWindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 
min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $slice_end])
-            +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
-               +- Calc(select=[rowtime])
-                  +- Reused(reference_id=[1])
+         +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], select=[COUNT(*) AS count1$0, 
slice_end('w$) AS $window_end])
+            +- Calc(select=[window_start, window_end])
+               +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>

Reply via email to