fsk119 commented on code in PR #20822:
URL: https://github.com/apache/flink/pull/20822#discussion_r1066546132
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistribution.scala:
##########
@@ -87,6 +87,10 @@ class FlinkRelMdDistribution private extends
MetadataHandler[FlinkDistribution]
}
}
+ def flinkDistribution(exchange: Exchange, mq: RelMetadataQuery):
FlinkRelDistribution = {
+ exchange.distribution.asInstanceOf[FlinkRelDistribution]
Review Comment:
It's not safe to just cast to FlinkRelDistribution. I think we should
```
// if exchagne.distribution is instance of FlinkRelDistribution
// return exchange.distribution
// else convert to FlinkRelDistribution according to RelDistribution.Type
```
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.scala:
##########
@@ -334,6 +334,44 @@ class SubplanReuseTest extends TableTestBase {
util.verifyExecPlan(sqlQuery)
}
+ @Test
+ def testSubplanReuseOnSortedView(): Unit = {
+ util.addTableSource[(Int, Long, String, String, String)]("Source", 'a, 'b,
'c, 'd, 'e)
+ val query = "SELECT * FROM Source order by d"
+ val table = util.tableEnv.sqlQuery(query)
+ // Define a sorted view.
+ util.tableEnv.registerTable("SortedTable", table)
Review Comment:
use `createTemporaryView`. It's better we don't use deprecated method in the
future.
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.scala:
##########
@@ -248,6 +248,44 @@ class SubplanReuseTest extends TableTestBase {
util.verifyExecPlan(sqlQuery)
}
+ @Test
+ def testSubplanReuseOnSortedView(): Unit = {
+ util.addTableSource[(Int, Long, String, String, String)]("Source", 'a, 'b,
'c, 'd, 'e)
+ val query = "SELECT * FROM Source order by d"
+ val table = util.tableEnv.sqlQuery(query)
+ // Define a sorted view.
+ util.tableEnv.registerTable("SortedTable", table)
Review Comment:
ditto
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/IntermediateRelTable.scala:
##########
@@ -54,4 +55,19 @@ class IntermediateRelTable(
def this(names: JList[String], relNode: RelNode) {
this(names, relNode, ModifyKindSet.INSERT_ONLY, false, new
util.HashSet[ImmutableBitSet]())
}
+
+ override def getCollationList: util.List[RelCollation] = {
+ val mq = relNode.getCluster.getMetadataQuery
+ mq.collations(relNode)
+ }
+
+ override def getDistribution: RelDistribution = {
+ val fmq =
FlinkRelMetadataQuery.reuseOrCreate(relNode.getCluster.getMetadataQuery)
Review Comment:
Why don't use
```
val fmq = relNode.getCluster.getMetadataQuery
fmq.distribution(relNode)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]