twalthr commented on code in PR #25779:
URL: https://github.com/apache/flink/pull/25779#discussion_r1878142179


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java:
##########
@@ -149,12 +157,129 @@ void testResolvingProctimeOfCustomTableTableApi() throws 
Exception {
         testUtil.verifyExecPlan(table);
     }
 
+    @TestTemplate
+    void testTimeAttributeOfView() {
+        if (!streamingMode) {
+            // time attributes not supported in batch
+            return;
+        }
+        TableTestUtil testUtil = getTestUtil();
+        TableEnvironment tableEnvironment = testUtil.getTableEnv();
+        tableEnvironment.registerCatalog("cat", new CustomCatalog("cat"));
+        tableEnvironment.executeSql(
+                "CREATE TABLE t(i INT, ts TIMESTAMP_LTZ(3), WATERMARK FOR "
+                        + "ts AS ts) WITH ('connector' = 'datagen')");
+        tableEnvironment.executeSql("CREATE VIEW `cat`.`default`.v AS SELECT * 
FROM t");
+        testUtil.verifyExecPlan(
+                "SELECT sum(i), window_start "
+                        + "FROM TABLE(\n"
+                        + "   TUMBLE(\n"
+                        + "     DATA => TABLE `cat`.`default`.v,\n"
+                        + "     TIMECOL => DESCRIPTOR(ts),\n"
+                        + "     SIZE => INTERVAL '10' MINUTES))"
+                        + "GROUP BY window_start, window_end");
+    }
+
+    private static class CustomCatalog extends GenericInMemoryCatalog {
+        public CustomCatalog(String name) {
+            super(name);
+        }
+
+        @Override
+        public CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException {
+            CatalogBaseTable table = super.getTable(tablePath);
+            if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+                return new CustomView((CatalogView) table);
+            }
+            return table;
+        }
+    }
+
+    private static class CustomView implements CatalogView {
+
+        private final CatalogView origin;
+
+        public CustomView(CatalogView table) {
+            this.origin = table;
+        }
+
+        @Override
+        public String getOriginalQuery() {
+            return origin.getOriginalQuery();
+        }
+
+        @Override
+        public String getExpandedQuery() {
+            return origin.getExpandedQuery();
+        }
+
+        @Override
+        public Map<String, String> getOptions() {
+            return origin.getOptions();
+        }
+
+        @Override
+        public Schema getUnresolvedSchema() {
+            Schema originalSchema = origin.getUnresolvedSchema();
+            return Schema.newBuilder()
+                    .fromColumns(
+                            originalSchema.getColumns().stream()
+                                    .map(
+                                            c -> {
+                                                if (c instanceof 
Schema.UnresolvedPhysicalColumn) {
+                                                    DataType dataType =
+                                                            (DataType)
+                                                                    ((Schema
+                                                                               
             .UnresolvedPhysicalColumn)

Review Comment:
   nit: static import on `UnresolvedPhysicalColumn`



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java:
##########
@@ -149,12 +157,129 @@ void testResolvingProctimeOfCustomTableTableApi() throws 
Exception {
         testUtil.verifyExecPlan(table);
     }
 
+    @TestTemplate
+    void testTimeAttributeOfView() {
+        if (!streamingMode) {
+            // time attributes not supported in batch
+            return;
+        }
+        TableTestUtil testUtil = getTestUtil();
+        TableEnvironment tableEnvironment = testUtil.getTableEnv();
+        tableEnvironment.registerCatalog("cat", new CustomCatalog("cat"));
+        tableEnvironment.executeSql(
+                "CREATE TABLE t(i INT, ts TIMESTAMP_LTZ(3), WATERMARK FOR "
+                        + "ts AS ts) WITH ('connector' = 'datagen')");
+        tableEnvironment.executeSql("CREATE VIEW `cat`.`default`.v AS SELECT * 
FROM t");
+        testUtil.verifyExecPlan(
+                "SELECT sum(i), window_start "
+                        + "FROM TABLE(\n"
+                        + "   TUMBLE(\n"
+                        + "     DATA => TABLE `cat`.`default`.v,\n"
+                        + "     TIMECOL => DESCRIPTOR(ts),\n"
+                        + "     SIZE => INTERVAL '10' MINUTES))"
+                        + "GROUP BY window_start, window_end");
+    }
+
+    private static class CustomCatalog extends GenericInMemoryCatalog {
+        public CustomCatalog(String name) {
+            super(name);
+        }
+
+        @Override
+        public CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException {
+            CatalogBaseTable table = super.getTable(tablePath);
+            if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+                return new CustomView((CatalogView) table);
+            }
+            return table;
+        }
+    }
+
+    private static class CustomView implements CatalogView {
+
+        private final CatalogView origin;
+
+        public CustomView(CatalogView table) {
+            this.origin = table;
+        }
+
+        @Override
+        public String getOriginalQuery() {
+            return origin.getOriginalQuery();
+        }
+
+        @Override
+        public String getExpandedQuery() {
+            return origin.getExpandedQuery();
+        }
+
+        @Override
+        public Map<String, String> getOptions() {
+            return origin.getOptions();
+        }
+
+        @Override
+        public Schema getUnresolvedSchema() {
+            Schema originalSchema = origin.getUnresolvedSchema();
+            return Schema.newBuilder()
+                    .fromColumns(
+                            originalSchema.getColumns().stream()
+                                    .map(
+                                            c -> {
+                                                if (c instanceof 
Schema.UnresolvedPhysicalColumn) {
+                                                    DataType dataType =
+                                                            (DataType)
+                                                                    ((Schema
+                                                                               
             .UnresolvedPhysicalColumn)
+                                                                               
     c)
+                                                                            
.getDataType();
+                                                    String stringType =
+                                                            
dataType.getLogicalType()
+                                                                    
.asSerializableString();
+                                                    LogicalType 
parsedLogicalType =

Review Comment:
   use `DataTypes.of`



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java:
##########
@@ -149,12 +157,129 @@ void testResolvingProctimeOfCustomTableTableApi() throws 
Exception {
         testUtil.verifyExecPlan(table);
     }
 
+    @TestTemplate
+    void testTimeAttributeOfView() {
+        if (!streamingMode) {
+            // time attributes not supported in batch
+            return;
+        }
+        TableTestUtil testUtil = getTestUtil();
+        TableEnvironment tableEnvironment = testUtil.getTableEnv();
+        tableEnvironment.registerCatalog("cat", new CustomCatalog("cat"));
+        tableEnvironment.executeSql(
+                "CREATE TABLE t(i INT, ts TIMESTAMP_LTZ(3), WATERMARK FOR "
+                        + "ts AS ts) WITH ('connector' = 'datagen')");
+        tableEnvironment.executeSql("CREATE VIEW `cat`.`default`.v AS SELECT * 
FROM t");
+        testUtil.verifyExecPlan(
+                "SELECT sum(i), window_start "
+                        + "FROM TABLE(\n"
+                        + "   TUMBLE(\n"
+                        + "     DATA => TABLE `cat`.`default`.v,\n"
+                        + "     TIMECOL => DESCRIPTOR(ts),\n"
+                        + "     SIZE => INTERVAL '10' MINUTES))"
+                        + "GROUP BY window_start, window_end");
+    }
+
+    private static class CustomCatalog extends GenericInMemoryCatalog {
+        public CustomCatalog(String name) {
+            super(name);
+        }
+
+        @Override
+        public CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException {
+            CatalogBaseTable table = super.getTable(tablePath);
+            if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+                return new CustomView((CatalogView) table);
+            }
+            return table;
+        }
+    }
+
+    private static class CustomView implements CatalogView {
+
+        private final CatalogView origin;
+
+        public CustomView(CatalogView table) {
+            this.origin = table;
+        }
+
+        @Override
+        public String getOriginalQuery() {
+            return origin.getOriginalQuery();
+        }
+
+        @Override
+        public String getExpandedQuery() {
+            return origin.getExpandedQuery();
+        }
+
+        @Override
+        public Map<String, String> getOptions() {
+            return origin.getOptions();
+        }
+
+        @Override
+        public Schema getUnresolvedSchema() {
+            Schema originalSchema = origin.getUnresolvedSchema();
+            return Schema.newBuilder()
+                    .fromColumns(
+                            originalSchema.getColumns().stream()
+                                    .map(
+                                            c -> {
+                                                if (c instanceof 
Schema.UnresolvedPhysicalColumn) {
+                                                    DataType dataType =
+                                                            (DataType)
+                                                                    ((Schema
+                                                                               
             .UnresolvedPhysicalColumn)
+                                                                               
     c)
+                                                                            
.getDataType();
+                                                    String stringType =
+                                                            
dataType.getLogicalType()
+                                                                    
.asSerializableString();
+                                                    LogicalType 
parsedLogicalType =
+                                                            
LogicalTypeParser.parse(
+                                                                    stringType,
+                                                                    
this.getClass()
+                                                                            
.getClassLoader());
+                                                    return new 
Schema.UnresolvedPhysicalColumn(
+                                                            c.getName(),
+                                                            
TypeConversions.fromLogicalToDataType(

Review Comment:
   use `DataTypes.of()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to