This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 9d7dc33 [FLINK-17466][table-planner-blink] Fix toRetractStream
doesn't work correctly with Pojo conversion class
9d7dc33 is described below
commit 9d7dc33f7c05e6b44293e5bae9dff2fac9f647cf
Author: Jark Wu <[email protected]>
AuthorDate: Thu Jun 4 12:03:06 2020 +0800
[FLINK-17466][table-planner-blink] Fix toRetractStream doesn't work
correctly with Pojo conversion class
This closes #12425
---
.../table/planner/codegen/SinkCodeGenerator.scala | 10 ++++-----
.../table/planner/runtime/utils/JavaPojos.java | 24 ++++++++++++++++++++
.../stream/sql/StreamTableEnvironmentITCase.scala | 26 +++++++++++++++++++---
3 files changed, 52 insertions(+), 8 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index 8ea67c0..efe906b 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -54,22 +54,22 @@ object SinkCodeGenerator {
sink.getConsumedDataType,
inputRowType,
withChangeFlag)
+ val physicalTypeInfo = fromDataTypeToTypeInfo(physicalOutputType)
val outputTypeInfo = if (withChangeFlag) {
- val typeInfo = fromDataTypeToTypeInfo(physicalOutputType)
val consumedClass = sink.getConsumedDataType.getConversionClass
if (consumedClass == classOf[(_, _)]) {
- createTuple2TypeInformation(Types.BOOLEAN, typeInfo)
+ createTuple2TypeInformation(Types.BOOLEAN, physicalTypeInfo)
} else if (consumedClass == classOf[JTuple2[_, _]]) {
- new TupleTypeInfo(Types.BOOLEAN, typeInfo)
+ new TupleTypeInfo(Types.BOOLEAN, physicalTypeInfo)
}
} else {
- fromDataTypeToTypeInfo(physicalOutputType)
+ physicalTypeInfo
}
val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
var afterIndexModify = inputTerm
- val fieldIndexProcessCode = outputTypeInfo match {
+ val fieldIndexProcessCode = physicalTypeInfo match {
case pojo: PojoTypeInfo[_] =>
val mapping = pojo.getFieldNames.map { name =>
val index = inputRowType.getFieldIndex(name)
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaPojos.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaPojos.java
index a64fe05..86c8e51 100644
---
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaPojos.java
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaPojos.java
@@ -123,4 +123,28 @@ public class JavaPojos {
'}';
}
}
+
+ /**
+ * A POJO class.
+ */
+ public static class Person {
+ public String name;
+ public int age;
+
+ public Person() {
+ }
+
+ public Person(String name, int age) {
+ this.name = name;
+ this.age = age;
+ }
+
+ @Override
+ public String toString() {
+ return "Person{" +
+ "name='" + name + '\'' +
+ ", age=" + age +
+ '}';
+ }
+ }
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
index 115e5bb..3248965 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
@@ -22,9 +22,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
-import org.apache.flink.table.planner.runtime.utils.JavaPojos.{Device, Order,
ProductItem}
+import org.apache.flink.table.planner.runtime.utils.JavaPojos.{Device, Order,
Person, ProductItem}
import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase,
StringSink}
-
import org.junit.Assert.assertEquals
import org.junit.Test
@@ -81,7 +80,7 @@ class StreamTableEnvironmentITCase extends StreamingTestBase {
))
// register DataStream as Table
- tEnv.createTemporaryView("devices", devices,'deviceId, 'deviceName,
'metrics)
+ tEnv.createTemporaryView("devices", devices, 'deviceId, 'deviceName,
'metrics)
val result = tEnv.sqlQuery("SELECT * FROM devices WHERE deviceId >= 2")
val sink = new StringSink[Device]()
@@ -94,4 +93,25 @@ class StreamTableEnvironmentITCase extends StreamingTestBase
{
"Device{deviceId=3, deviceName='device3', metrics={B=20}}")
assertEquals(expected.sorted, sink.getResults.sorted)
}
+
+ @Test
+ def testToRetractStreamWithPojoType(): Unit = {
+ val persons = env.fromCollection(Seq(
+ new Person("bob", 1),
+ new Person("Liz", 2),
+ new Person("Jack", 3)
+ ))
+
+ tEnv.createTemporaryView("person", persons)
+ val sink = new StringSink[(Boolean, Person)]()
+ // reorder the fields (fields order in PojoTypeInfo is [age, name])
+ tEnv.sqlQuery("select name, age from
person").toRetractStream[Person].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "(true,Person{name='bob', age=1})",
+ "(true,Person{name='Liz', age=2})",
+ "(true,Person{name='Jack', age=3})")
+ assertEquals(expected.sorted, sink.getResults.sorted)
+ }
}