godfreyhe commented on a change in pull request #13721:
URL: https://github.com/apache/flink/pull/13721#discussion_r512519756
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
##########
@@ -691,6 +691,18 @@ class FlinkRelMdHandlerTestBase {
(calcOfFirstRow, calcOfLastRow)
}
+ protected lazy val streamUpsertMaterialize = {
Review comment:
rename to streamChangelogNormalize
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
##########
@@ -315,5 +315,30 @@ class FlinkRelMdModifiedMonotonicityTest extends
FlinkRelMdHandlerTestBase {
assertNull(mq.getRelModifiedMonotonicity(logicalAntiJoinOnUniqueKeys))
}
+ @Test
+ def testGetRelMonotonicityOnDeduplicate(): Unit = {
+ assertEquals(
+ new RelModifiedMonotonicity(Array(NOT_MONOTONIC, CONSTANT,
NOT_MONOTONIC)),
+ mq.getRelModifiedMonotonicity(streamDeduplicateFirstRow))
+
+ assertEquals(
+ new RelModifiedMonotonicity(Array(NOT_MONOTONIC, CONSTANT, CONSTANT)),
+ mq.getRelModifiedMonotonicity(streamDeduplicateLastRow))
+
+ assertEquals(
+ new RelModifiedMonotonicity(Array(
+ NOT_MONOTONIC, CONSTANT, NOT_MONOTONIC, NOT_MONOTONIC, NOT_MONOTONIC)),
+ mq.getRelModifiedMonotonicity(rowtimeDeduplicate))
+ }
+
+ @Test
+ def testGetRelMonotonicityOnUpsertMaterialize(): Unit = {
Review comment:
rename to testGetRelMonotonicityOnChangelogNormalize
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
##########
@@ -1140,48 +1140,4 @@ class JoinITCase(state: StateBackendMode) extends
StreamingWithStateTestBase(sta
val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo
Welt")
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
-
- @Test
- def testJoinOnChangelogSource(): Unit = {
Review comment:
why this test is removed ?
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala
##########
@@ -155,7 +155,12 @@ class FlinkRelMdUniqueKeysTest extends
FlinkRelMdHandlerTestBase {
def testGetUniqueKeysOnStreamExecDeduplicate(): Unit = {
assertEquals(uniqueKeys(Array(1)),
mq.getUniqueKeys(streamDeduplicateFirstRow).toSet)
assertEquals(uniqueKeys(Array(1, 2)),
mq.getUniqueKeys(streamDeduplicateLastRow).toSet)
- assertEquals(uniqueKeys(Array(5)),
mq.getUniqueKeys(rowtimeDeduplicate).toSet)
+ assertEquals(uniqueKeys(Array(1)),
mq.getUniqueKeys(rowtimeDeduplicate).toSet)
+ }
+
+ @Test
+ def testGetUniqueKeysOnStreamExecUpsertMaterialize(): Unit = {
Review comment:
rename to testGetUniqueKeysOnStreamExecChangelogNormalize
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala
##########
@@ -275,6 +275,15 @@ class FlinkRelMdColumnUniquenessTest extends
FlinkRelMdHandlerTestBase {
assertFalse(mq.areColumnsUnique(streamDeduplicateLastRow,
ImmutableBitSet.of(0, 1, 2)))
}
+ @Test
+ def testAreColumnsUniqueCountOnStreamExecUpsertMaterialize(): Unit = {
Review comment:
rename to testAreColumnsUniqueCountOnStreamExecChangelogNormalize
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]