This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 55f399c5b43f5fa39484c01a5f1d305e8b262209 Author: Jark Wu <[email protected]> AuthorDate: Tue Jul 30 18:17:01 2019 +0800 [FLINK-13290][table-planner-blink] SinkCodeGenerator should not compare row type field names --- .../table/planner/codegen/SinkCodeGenerator.scala | 13 +++++----- .../runtime/stream/table/TableSinkITCase.scala | 30 ++++++++++++++++++++++ 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala index e972ad7..21ea4f6 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala @@ -34,6 +34,7 @@ import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo import org.apache.flink.table.sinks.TableSink +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.areTypesCompatible import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import org.apache.flink.types.Row @@ -108,11 +109,10 @@ object SinkCodeGenerator { val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM var afterIndexModify = inputTerm val fieldIndexProcessCode = - if (getCompositeTypes(convertOutputType).map(fromTypeInfoToLogicalType) sameElements - inputTypeInfo.getFieldTypes.map(fromTypeInfoToLogicalType)) { + if (!resultType.isInstanceOf[PojoTypeInfo[_]]) { "" } else { - // field index change (pojo) + // field index may change (pojo) val mapping = convertOutputType match { case ct: CompositeType[_] => ct.getFieldNames.map { name => @@ -223,9 +223,10 @@ object SinkCodeGenerator { case (fieldTypeInfo, i) => val requestedTypeInfo = tt.getTypeAt(i) validateFieldType(requestedTypeInfo) - if (fromTypeInfoToLogicalType(fieldTypeInfo) != - fromTypeInfoToLogicalType(requestedTypeInfo) && - !requestedTypeInfo.isInstanceOf[GenericTypeInfo[Object]]) { + if (!areTypesCompatible( + fromTypeInfoToLogicalType(fieldTypeInfo), + fromTypeInfoToLogicalType(requestedTypeInfo)) && + !requestedTypeInfo.isInstanceOf[GenericTypeInfo[Object]]) { val fieldNames = tt.getFieldNames throw new TableException(s"Result field '${fieldNames(i)}' does not match requested" + s" type. Requested: $requestedTypeInfo; Actual: $fieldTypeInfo") diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala index 0ab3480..c013308 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala @@ -151,6 +151,36 @@ class TableSinkITCase extends AbstractTestBase { assertEquals(expected, result) } + + @Test + def testAppendSinkWithNestedRow(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) + + val t = env.fromCollection(smallTupleData3) + .toTable(tEnv, 'id, 'num, 'text) + tEnv.registerTable("src", t) + + val sink = new TestingAppendTableSink() + tEnv.registerTableSink( + "appendSink", + sink.configure( + Array[String]("t", "item"), + Array[TypeInformation[_]](Types.INT(), Types.ROW(Types.LONG, Types.STRING())))) + + tEnv.sqlUpdate("INSERT INTO appendSink SELECT id, ROW(num, text) FROM src") + + env.execute() + + val result = sink.getAppendResults.sorted + val expected = List( + "1,1,Hi", + "2,2,Hello", + "3,2,Hello world").sorted + assertEquals(expected, result) + } + @Test def testAppendSinkOnAppendTableForInnerJoin(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment
