This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 40a24bd85 [INLONG-4815][Sort] Supporting field type cast when sinking 
data to HBase (#4856)
40a24bd85 is described below

commit 40a24bd85f987394be32993f14a6ed7e428ae081
Author: Xin Gong <[email protected]>
AuthorDate: Mon Jul 4 19:34:31 2022 +0800

    [INLONG-4815][Sort] Supporting field type cast when sinking data to HBase 
(#4856)
---
 .../inlong/sort/parser/impl/FlinkSqlParser.java    |  7 +++-
 .../sort/parser/DataTypeConvertSqlParseTest.java   | 47 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 6b438b309..f818a2b61 100644
--- 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -534,8 +534,11 @@ public class FlinkSqlParser implements Parser {
         for (Map.Entry<String, List<FieldRelation>> entry : 
columnFamilyMapFields.entrySet()) {
             StringBuilder fieldAppend = new StringBuilder(" ROW(");
             for (FieldRelation fieldRelation : entry.getValue()) {
-                FieldInfo fieldInfo = (FieldInfo) 
fieldRelation.getInputField();
-                fieldAppend.append(fieldInfo.getName()).append(",");
+                FieldInfo outputField = fieldRelation.getOutputField();
+                FieldInfo inputField = (FieldInfo) 
fieldRelation.getInputField();
+                String targetType = 
TableFormatUtils.deriveLogicalType(outputField.getFormatInfo()).asSummaryString();
+                fieldAppend.append(" 
CAST(").append(inputField.format()).append(" AS ")
+                        .append(targetType).append(" ) ").append(",");
             }
             if (fieldAppend.length() > 0) {
                 fieldAppend.delete(fieldAppend.lastIndexOf(","), 
fieldAppend.length());
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
index 60180ed68..c7dffd0ff 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
@@ -35,6 +35,7 @@ import 
org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.transform.TransformNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
@@ -140,4 +141,50 @@ public class DataTypeConvertSqlParseTest extends 
AbstractTestBase {
         ParseResult result = parser.parse();
         Assert.assertTrue(result.tryExecute());
     }
+
+    /**
+     * build hbase load node
+     *
+     * @return hbase load node
+     */
+    private HbaseLoadNode buildHbaseLoadNode() {
+        return new HbaseLoadNode("2", "test_hbase",
+                Arrays.asList(new FieldInfo("cf:id", new StringFormatInfo()), 
new FieldInfo("cf:age",
+                        new StringFormatInfo())),
+                Arrays.asList(new FieldRelation(new FieldInfo("id", new 
LongFormatInfo()),
+                                new FieldInfo("cf:id", new 
StringFormatInfo())),
+                        new FieldRelation(new FieldInfo("age", new 
IntFormatInfo()),
+                                new FieldInfo("cf:age", new 
StringFormatInfo()))), null, null, 1, null, "mytable",
+                "default",
+                "localhost:2181", "MD5(CAST(`id` as String))", null, null, 
null, null);
+    }
+
+    /**
+     * Test data type convert implicitly for sinking data into HBase
+     *
+     * @throws Exception The exception may throws when executing
+     */
+    @Test
+    public void testHBaseDataTypeConvertSqlParse() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Node inputNode = buildKafkaExtractNode();
+        Node outputNode = buildHbaseLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1",
+                Arrays.asList(inputNode, outputNode),
+                Arrays.asList(
+                        buildNodeRelation(Collections.singletonList(inputNode),
+                                Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", 
Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, 
groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
 }

Reply via email to