This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a412840bc077d471813c84d0387a664fd95dae27
Author: Jark Wu <[email protected]>
AuthorDate: Thu Jun 4 12:03:06 2020 +0800

    [FLINK-17466][table-planner-blink] Fix toRetractStream doesn't work 
correctly with Pojo conversion class
    
    This closes #12425
---
 .../table/planner/codegen/SinkCodeGenerator.scala  | 10 ++++-----
 .../table/planner/runtime/utils/JavaPojos.java     | 24 ++++++++++++++++++++
 .../stream/sql/StreamTableEnvironmentITCase.scala  | 26 +++++++++++++++++++---
 3 files changed, 52 insertions(+), 8 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index 2cc9a65..7999f34 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -56,22 +56,22 @@ object SinkCodeGenerator {
       sink.getConsumedDataType,
       inputRowType,
       withChangeFlag)
+    val physicalTypeInfo = fromDataTypeToTypeInfo(physicalOutputType)
 
     val outputTypeInfo = if (withChangeFlag) {
-      val typeInfo = fromDataTypeToTypeInfo(physicalOutputType)
       val consumedClass = sink.getConsumedDataType.getConversionClass
       if (consumedClass == classOf[(_, _)]) {
-        createTuple2TypeInformation(Types.BOOLEAN, typeInfo)
+        createTuple2TypeInformation(Types.BOOLEAN, physicalTypeInfo)
       } else if (consumedClass == classOf[JTuple2[_, _]]) {
-        new TupleTypeInfo(Types.BOOLEAN, typeInfo)
+        new TupleTypeInfo(Types.BOOLEAN, physicalTypeInfo)
       }
     } else {
-      fromDataTypeToTypeInfo(physicalOutputType)
+      physicalTypeInfo
     }
 
     val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
     var afterIndexModify = inputTerm
-    val fieldIndexProcessCode = outputTypeInfo match {
+    val fieldIndexProcessCode = physicalTypeInfo match {
       case pojo: PojoTypeInfo[_] =>
         val mapping = pojo.getFieldNames.map { name =>
           val index = inputRowType.getFieldIndex(name)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaPojos.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaPojos.java
index a64fe05..86c8e51 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaPojos.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaPojos.java
@@ -123,4 +123,28 @@ public class JavaPojos {
                                '}';
                }
        }
+
+       /**
+        * A POJO class.
+        */
+       public static class Person {
+               public String name;
+               public int age;
+
+               public Person() {
+               }
+
+               public Person(String name, int age) {
+                       this.name = name;
+                       this.age = age;
+               }
+
+               @Override
+               public String toString() {
+                       return "Person{" +
+                               "name='" + name + '\'' +
+                               ", age=" + age +
+                               '}';
+               }
+       }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
index d26c864..d42048a 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
@@ -21,9 +21,8 @@ package org.apache.flink.table.planner.runtime.stream.sql
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.planner.runtime.utils.JavaPojos.{Device, Order, 
ProductItem}
+import org.apache.flink.table.planner.runtime.utils.JavaPojos.{Device, Order, 
Person, ProductItem}
 import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, 
StringSink}
-
 import org.junit.Assert.assertEquals
 import org.junit.Test
 
@@ -80,7 +79,7 @@ class StreamTableEnvironmentITCase extends StreamingTestBase {
     ))
 
     // register DataStream as Table
-    tEnv.createTemporaryView("devices", devices,'deviceId, 'deviceName, 
'metrics)
+    tEnv.createTemporaryView("devices", devices, 'deviceId, 'deviceName, 
'metrics)
 
     val result = tEnv.sqlQuery("SELECT * FROM devices WHERE deviceId >= 2")
     val sink = new StringSink[Device]()
@@ -93,4 +92,25 @@ class StreamTableEnvironmentITCase extends StreamingTestBase 
{
       "Device{deviceId=3, deviceName='device3', metrics={B=20}}")
     assertEquals(expected.sorted, sink.getResults.sorted)
   }
+
+  @Test
+  def testToRetractStreamWithPojoType(): Unit = {
+    val persons = env.fromCollection(Seq(
+      new Person("bob", 1),
+      new Person("Liz", 2),
+      new Person("Jack", 3)
+    ))
+
+    tEnv.createTemporaryView("person", persons)
+    val sink = new StringSink[(Boolean, Person)]()
+    // reorder the fields (fields order in PojoTypeInfo is [age, name])
+    tEnv.sqlQuery("select name, age from 
person").toRetractStream[Person].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "(true,Person{name='bob', age=1})",
+      "(true,Person{name='Liz', age=2})",
+      "(true,Person{name='Jack', age=3})")
+    assertEquals(expected.sorted, sink.getResults.sorted)
+  }
 }

Reply via email to