This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a412840bc077d471813c84d0387a664fd95dae27 Author: Jark Wu <[email protected]> AuthorDate: Thu Jun 4 12:03:06 2020 +0800 [FLINK-17466][table-planner-blink] Fix toRetractStream doesn't work correctly with Pojo conversion class This closes #12425 --- .../table/planner/codegen/SinkCodeGenerator.scala | 10 ++++----- .../table/planner/runtime/utils/JavaPojos.java | 24 ++++++++++++++++++++ .../stream/sql/StreamTableEnvironmentITCase.scala | 26 +++++++++++++++++++--- 3 files changed, 52 insertions(+), 8 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala index 2cc9a65..7999f34 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala @@ -56,22 +56,22 @@ object SinkCodeGenerator { sink.getConsumedDataType, inputRowType, withChangeFlag) + val physicalTypeInfo = fromDataTypeToTypeInfo(physicalOutputType) val outputTypeInfo = if (withChangeFlag) { - val typeInfo = fromDataTypeToTypeInfo(physicalOutputType) val consumedClass = sink.getConsumedDataType.getConversionClass if (consumedClass == classOf[(_, _)]) { - createTuple2TypeInformation(Types.BOOLEAN, typeInfo) + createTuple2TypeInformation(Types.BOOLEAN, physicalTypeInfo) } else if (consumedClass == classOf[JTuple2[_, _]]) { - new TupleTypeInfo(Types.BOOLEAN, typeInfo) + new TupleTypeInfo(Types.BOOLEAN, physicalTypeInfo) } } else { - fromDataTypeToTypeInfo(physicalOutputType) + physicalTypeInfo } val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM var afterIndexModify = inputTerm - val fieldIndexProcessCode = outputTypeInfo match { + val fieldIndexProcessCode = physicalTypeInfo match { case pojo: PojoTypeInfo[_] => val mapping = pojo.getFieldNames.map { name => val index = inputRowType.getFieldIndex(name) diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaPojos.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaPojos.java index a64fe05..86c8e51 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaPojos.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaPojos.java @@ -123,4 +123,28 @@ public class JavaPojos { '}'; } } + + /** + * A POJO class. + */ + public static class Person { + public String name; + public int age; + + public Person() { + } + + public Person(String name, int age) { + this.name = name; + this.age = age; + } + + @Override + public String toString() { + return "Person{" + + "name='" + name + '\'' + + ", age=" + age + + '}'; + } + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala index d26c864..d42048a 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala @@ -21,9 +21,8 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.table.api.scala._ -import org.apache.flink.table.planner.runtime.utils.JavaPojos.{Device, Order, ProductItem} +import org.apache.flink.table.planner.runtime.utils.JavaPojos.{Device, Order, Person, ProductItem} import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, StringSink} - import org.junit.Assert.assertEquals import org.junit.Test @@ -80,7 +79,7 @@ class StreamTableEnvironmentITCase extends StreamingTestBase { )) // register DataStream as Table - tEnv.createTemporaryView("devices", devices,'deviceId, 'deviceName, 'metrics) + tEnv.createTemporaryView("devices", devices, 'deviceId, 'deviceName, 'metrics) val result = tEnv.sqlQuery("SELECT * FROM devices WHERE deviceId >= 2") val sink = new StringSink[Device]() @@ -93,4 +92,25 @@ class StreamTableEnvironmentITCase extends StreamingTestBase { "Device{deviceId=3, deviceName='device3', metrics={B=20}}") assertEquals(expected.sorted, sink.getResults.sorted) } + + @Test + def testToRetractStreamWithPojoType(): Unit = { + val persons = env.fromCollection(Seq( + new Person("bob", 1), + new Person("Liz", 2), + new Person("Jack", 3) + )) + + tEnv.createTemporaryView("person", persons) + val sink = new StringSink[(Boolean, Person)]() + // reorder the fields (fields order in PojoTypeInfo is [age, name]) + tEnv.sqlQuery("select name, age from person").toRetractStream[Person].addSink(sink) + env.execute() + + val expected = List( + "(true,Person{name='bob', age=1})", + "(true,Person{name='Liz', age=2})", + "(true,Person{name='Jack', age=3})") + assertEquals(expected.sorted, sink.getResults.sorted) + } }
