This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new f8b98cb [FLINK-18632][table-planner-blink] Assign the missing RowKind
when toRetractStream with POJO type
f8b98cb is described below
commit f8b98cbc4e10e69ff0ea08388224f4994de474d7
Author: lzy3261944 <[email protected]>
AuthorDate: Fri Jul 24 15:20:59 2020 +0800
[FLINK-18632][table-planner-blink] Assign the missing RowKind when
toRetractStream with POJO type
Co-authored-by: luoziyu <[email protected]>
This closes #12955
---
.../table/planner/codegen/SinkCodeGenerator.scala | 1 +
.../stream/sql/StreamTableEnvironmentITCase.scala | 23 ++++++++++++++++++++++
2 files changed, 24 insertions(+)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index efe906b..f46bd7c 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -94,6 +94,7 @@ object SinkCodeGenerator {
afterIndexModify = CodeGenUtils.newName("afterIndexModify")
s"""
|${conversion.code}
+ |${conversion.resultTerm}.setRowKind(${inputTerm}.getRowKind());
|${classOf[RowData].getCanonicalName} $afterIndexModify =
${conversion.resultTerm};
|""".stripMargin
case _ =>
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
index 3248965..1282aba 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
@@ -114,4 +114,27 @@ class StreamTableEnvironmentITCase extends
StreamingTestBase {
"(true,Person{name='Jack', age=3})")
assertEquals(expected.sorted, sink.getResults.sorted)
}
+
+ @Test
+ def testRetractMsgWithPojoType(): Unit = {
+ val orders = env.fromCollection(Seq(
+ new Order(1L, new ProductItem("beer", 10L), 1),
+ new Order(1L, new ProductItem("beer", 10L), 2)
+ ))
+
+ val table = tEnv.fromDataStream(orders, 'user, 'product, 'amount)
+
+ val sink = new StringSink[(Boolean, Order)]()
+ tEnv.sqlQuery(s"""|SELECT user, product, sum(amount) as amount
+ |FROM $table
+ |GROUP BY user, product
+ |""".stripMargin).toRetractStream[Order].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "(true,Order{user=1, product='Product{name='beer', id=10}', amount=1})",
+ "(false,Order{user=1, product='Product{name='beer', id=10}', amount=1})",
+ "(true,Order{user=1, product='Product{name='beer', id=10}', amount=3})")
+ assertEquals(expected.sorted, sink.getResults.sorted)
+ }
}