This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c2fc19  convertToRowTypeInfo() return field types and name
     new 84a552d  Merge pull request #8 from liyubin117/convert-return-name
7c2fc19 is described below

commit 7c2fc19b3184a892a0904cbc2a553218093b28d9
Author: Yubin Li <[email protected]>
AuthorDate: Mon Sep 27 10:49:35 2021 +0800

    convertToRowTypeInfo() return field types and name
---
 .../apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
 
b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
index 9839888..8b13eb4 100644
--- 
a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
@@ -186,7 +186,7 @@ public class RocketMQDynamicTableSink implements 
DynamicTableSink, SupportsWriti
                 isDynamicTagIncluded,
                 writeKeysToBody,
                 keyColumns,
-                convertToRowTypeInfo(schema.toRowDataType()),
+                convertToRowTypeInfo(schema.toRowDataType(), 
schema.getFieldNames()),
                 schema.getFieldDataTypes(),
                 metadataKeys.size() > 0,
                 metadataPositions);
@@ -199,12 +199,12 @@ public class RocketMQDynamicTableSink implements 
DynamicTableSink, SupportsWriti
         return producerProps;
     }
 
-    protected static RowTypeInfo convertToRowTypeInfo(DataType fieldsDataType) 
{
+    protected static RowTypeInfo convertToRowTypeInfo(DataType fieldsDataType, 
String[] fieldNames) {
         final TypeInformation<?>[] fieldTypes =
                 fieldsDataType.getChildren().stream()
                         .map(LegacyTypeInfoDataTypeConverter::toLegacyTypeInfo)
                         .toArray(TypeInformation[]::new);
-        return new RowTypeInfo(fieldTypes);
+        return new RowTypeInfo(fieldTypes, fieldNames);
     }
 
     // 
--------------------------------------------------------------------------------------------

Reply via email to