fsk119 commented on code in PR #20822:
URL: https://github.com/apache/flink/pull/20822#discussion_r1288206191


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala:
##########
@@ -54,6 +60,45 @@ class FlinkRelMdColumnUniquenessTest extends 
FlinkRelMdHandlerTestBase {
           idx => assertNull(mq.areColumnsUnique(scan, ImmutableBitSet.of(idx)))
         }
     }
+
+    // Test intermediate table scan.
+    relBuilder.clear()
+    val studentRel = relBuilder
+      .scan("student")
+      .build()
+    val flinkLogicalIntermediateTableScan: FlinkLogicalIntermediateTableScan =
+      createIntermediateScan(
+        studentRel,
+        flinkLogicalTraits,
+        Set(ImmutableBitSet.of(0)),
+        new FlinkStatistic(new TableStats(1000L), 
Set(Set("name").asJava).asJava))

Review Comment:
   I think it should be 
   ```
   
    createIntermediateScan(
           studentRel,
           flinkLogicalTraits,
           Set(ImmutableBitSet.of(1)),
           new FlinkStatistic(new TableStats(1000L), 
Set(Set("name").asJava).asJava))
   );
   ```
   
   Because the current student schema is 
   ```
       val schema = new TableSchema(
         Array("id", "name", "score", "age", "height", "sex", "class"),
         Array(
           BasicTypeInfo.LONG_TYPE_INFO,
           BasicTypeInfo.STRING_TYPE_INFO,
           BasicTypeInfo.DOUBLE_TYPE_INFO,
           BasicTypeInfo.INT_TYPE_INFO,
           BasicTypeInfo.DOUBLE_TYPE_INFO,
           BasicTypeInfo.STRING_TYPE_INFO,
           BasicTypeInfo.INT_TYPE_INFO
         ))
   ```



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala:
##########
@@ -54,6 +60,45 @@ class FlinkRelMdColumnUniquenessTest extends 
FlinkRelMdHandlerTestBase {
           idx => assertNull(mq.areColumnsUnique(scan, ImmutableBitSet.of(idx)))
         }
     }
+
+    // Test intermediate table scan.
+    relBuilder.clear()
+    val studentRel = relBuilder
+      .scan("student")
+      .build()
+    val flinkLogicalIntermediateTableScan: FlinkLogicalIntermediateTableScan =
+      createIntermediateScan(
+        studentRel,
+        flinkLogicalTraits,
+        Set(ImmutableBitSet.of(0)),
+        new FlinkStatistic(new TableStats(1000L), 
Set(Set("name").asJava).asJava))
+    val batchPhysicalIntermediateTableScan: BatchPhysicalIntermediateTableScan 
=
+      createIntermediateScan(
+        studentRel,
+        batchPhysicalTraits,
+        Set(ImmutableBitSet.of(0)),
+        new FlinkStatistic(new TableStats(1000L), 
Set(Set("name").asJava).asJava))
+    val streamPhysicalIntermediateTableScan: 
StreamPhysicalIntermediateTableScan =
+      createIntermediateScan(
+        studentRel,
+        streamPhysicalTraits,
+        Set(ImmutableBitSet.of(0)),
+        new FlinkStatistic(new TableStats(1000L), 
Set(Set("name").asJava).asJava))
+    Array(
+      flinkLogicalIntermediateTableScan,
+      batchPhysicalIntermediateTableScan,
+      streamPhysicalIntermediateTableScan).foreach {
+      scan =>
+        assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of()))
+        assertTrue(mq.areColumnsUnique(scan, ImmutableBitSet.of(0)))

Review Comment:
   It's very strange. If the intermediate table has the upsert key, I think we 
should respect this. 
   
   In this case, if you treat the column `name` as the upsert keys, I think 
`id` should not be unique. For example, the intermediate table represents the 
query
   
   ```
   SELECT 1 AS id, name FROM STUDENTS GROUP BY name
   ```
   
   In this case, I think we can not have the promise.
   



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.scala:
##########
@@ -334,6 +334,43 @@ class SubplanReuseTest extends TableTestBase {
     util.verifyExecPlan(sqlQuery)
   }
 
+  @Test
+  def testSubplanReuseOnSortedView(): Unit = {
+    util.addTableSource[(Int, Long, String, String, String)]("Source", 'a, 'b, 
'c, 'd, 'e)
+    val query = "SELECT * FROM Source order by d"
+    val table = util.tableEnv.sqlQuery(query)
+    // Define a sorted view.
+    util.tableEnv.createTemporaryView("SortedView", table)
+    util.tableEnv.executeSql("""
+                               |CREATE TABLE Sink1 (
+                               |   a int,
+                               |   b bigint,
+                               |   c string
+                               |) WITH (
+                               |   'connector' = 'filesystem',

Review Comment:
   nit: it's better we use values source/sink in the test. 
   



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.scala:
##########
@@ -334,6 +334,43 @@ class SubplanReuseTest extends TableTestBase {
     util.verifyExecPlan(sqlQuery)
   }
 
+  @Test
+  def testSubplanReuseOnSortedView(): Unit = {
+    util.addTableSource[(Int, Long, String, String, String)]("Source", 'a, 'b, 
'c, 'd, 'e)
+    val query = "SELECT * FROM Source order by d"
+    val table = util.tableEnv.sqlQuery(query)
+    // Define a sorted view.
+    util.tableEnv.createTemporaryView("SortedView", table)
+    util.tableEnv.executeSql("""
+                               |CREATE TABLE Sink1 (
+                               |   a int,
+                               |   b bigint,
+                               |   c string
+                               |) WITH (
+                               |   'connector' = 'filesystem',
+                               |   'format' = 'testcsv',
+                               |   'path' = '/tmp/test'
+                               |)
+                               |""".stripMargin)
+    util.tableEnv.executeSql("""
+                               |CREATE TABLE Sink2 (
+                               |   a int,
+                               |   b bigint,
+                               |   c string,
+                               |   d string
+                               |) WITH (
+                               |   'connector' = 'filesystem',
+                               |   'format' = 'testcsv',
+                               |   'path' = '/tmp/test'
+                               |)
+                               |""".stripMargin)
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql("INSERT INTO Sink1 select a, b, e from SortedView")

Review Comment:
   I think it's better to use the SQL in the jira to verify we have already fix 
this problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to