fsk119 commented on code in PR #20822:
URL: https://github.com/apache/flink/pull/20822#discussion_r1066546132


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistribution.scala:
##########
@@ -87,6 +87,10 @@ class FlinkRelMdDistribution private extends 
MetadataHandler[FlinkDistribution]
     }
   }
 
+  def flinkDistribution(exchange: Exchange, mq: RelMetadataQuery): 
FlinkRelDistribution = {
+    exchange.distribution.asInstanceOf[FlinkRelDistribution]

Review Comment:
   It's not safe to just cast to FlinkRelDistribution. I think we should
   
   ```
   // if exchagne.distribution is instance of FlinkRelDistribution 
   // return exchange.distribution
   
   // else convert to FlinkRelDistribution according to RelDistribution.Type 
   
   
   ```



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.scala:
##########
@@ -334,6 +334,44 @@ class SubplanReuseTest extends TableTestBase {
     util.verifyExecPlan(sqlQuery)
   }
 
+  @Test
+  def testSubplanReuseOnSortedView(): Unit = {
+    util.addTableSource[(Int, Long, String, String, String)]("Source", 'a, 'b, 
'c, 'd, 'e)
+    val query = "SELECT * FROM Source order by d"
+    val table = util.tableEnv.sqlQuery(query)
+    // Define a sorted view.
+    util.tableEnv.registerTable("SortedTable", table)

Review Comment:
   use `createTemporaryView`. It's better we don't use deprecated method in the 
future.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.scala:
##########
@@ -248,6 +248,44 @@ class SubplanReuseTest extends TableTestBase {
     util.verifyExecPlan(sqlQuery)
   }
 
+  @Test
+  def testSubplanReuseOnSortedView(): Unit = {
+    util.addTableSource[(Int, Long, String, String, String)]("Source", 'a, 'b, 
'c, 'd, 'e)
+    val query = "SELECT * FROM Source order by d"
+    val table = util.tableEnv.sqlQuery(query)
+    // Define a sorted view.
+    util.tableEnv.registerTable("SortedTable", table)

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/IntermediateRelTable.scala:
##########
@@ -54,4 +55,19 @@ class IntermediateRelTable(
   def this(names: JList[String], relNode: RelNode) {
     this(names, relNode, ModifyKindSet.INSERT_ONLY, false, new 
util.HashSet[ImmutableBitSet]())
   }
+
+  override def getCollationList: util.List[RelCollation] = {
+    val mq = relNode.getCluster.getMetadataQuery
+    mq.collations(relNode)
+  }
+
+  override def getDistribution: RelDistribution = {
+    val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(relNode.getCluster.getMetadataQuery)

Review Comment:
   Why don't use 
   
   ```
       val fmq = relNode.getCluster.getMetadataQuery
       fmq.distribution(relNode)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to