This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new f8b98cb  [FLINK-18632][table-planner-blink] Assign the missing RowKind 
when toRetractStream with POJO type
f8b98cb is described below

commit f8b98cbc4e10e69ff0ea08388224f4994de474d7
Author: lzy3261944 <[email protected]>
AuthorDate: Fri Jul 24 15:20:59 2020 +0800

    [FLINK-18632][table-planner-blink] Assign the missing RowKind when 
toRetractStream with POJO type
    
    Co-authored-by: luoziyu <[email protected]>
    
    This closes #12955
---
 .../table/planner/codegen/SinkCodeGenerator.scala  |  1 +
 .../stream/sql/StreamTableEnvironmentITCase.scala  | 23 ++++++++++++++++++++++
 2 files changed, 24 insertions(+)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index efe906b..f46bd7c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -94,6 +94,7 @@ object SinkCodeGenerator {
         afterIndexModify = CodeGenUtils.newName("afterIndexModify")
         s"""
            |${conversion.code}
+           |${conversion.resultTerm}.setRowKind(${inputTerm}.getRowKind());
            |${classOf[RowData].getCanonicalName} $afterIndexModify = 
${conversion.resultTerm};
            |""".stripMargin
       case _ =>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
index 3248965..1282aba 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
@@ -114,4 +114,27 @@ class StreamTableEnvironmentITCase extends 
StreamingTestBase {
       "(true,Person{name='Jack', age=3})")
     assertEquals(expected.sorted, sink.getResults.sorted)
   }
+
+  @Test
+  def testRetractMsgWithPojoType(): Unit = {
+    val orders = env.fromCollection(Seq(
+      new Order(1L, new ProductItem("beer", 10L), 1),
+      new Order(1L, new ProductItem("beer", 10L), 2)
+    ))
+
+    val table = tEnv.fromDataStream(orders, 'user, 'product, 'amount)
+
+    val sink = new StringSink[(Boolean, Order)]()
+    tEnv.sqlQuery(s"""|SELECT user, product, sum(amount) as amount
+                      |FROM $table
+                      |GROUP BY user, product
+                      |""".stripMargin).toRetractStream[Order].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "(true,Order{user=1, product='Product{name='beer', id=10}', amount=1})",
+      "(false,Order{user=1, product='Product{name='beer', id=10}', amount=1})",
+      "(true,Order{user=1, product='Product{name='beer', id=10}', amount=3})")
+    assertEquals(expected.sorted, sink.getResults.sorted)
+  }
 }

Reply via email to