This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 40a24bd85 [INLONG-4815][Sort] Supporting field type cast when sinking
data to HBase (#4856)
40a24bd85 is described below
commit 40a24bd85f987394be32993f14a6ed7e428ae081
Author: Xin Gong <[email protected]>
AuthorDate: Mon Jul 4 19:34:31 2022 +0800
[INLONG-4815][Sort] Supporting field type cast when sinking data to HBase
(#4856)
---
.../inlong/sort/parser/impl/FlinkSqlParser.java | 7 +++-
.../sort/parser/DataTypeConvertSqlParseTest.java | 47 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 2 deletions(-)
diff --git
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 6b438b309..f818a2b61 100644
---
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -534,8 +534,11 @@ public class FlinkSqlParser implements Parser {
for (Map.Entry<String, List<FieldRelation>> entry :
columnFamilyMapFields.entrySet()) {
StringBuilder fieldAppend = new StringBuilder(" ROW(");
for (FieldRelation fieldRelation : entry.getValue()) {
- FieldInfo fieldInfo = (FieldInfo)
fieldRelation.getInputField();
- fieldAppend.append(fieldInfo.getName()).append(",");
+ FieldInfo outputField = fieldRelation.getOutputField();
+ FieldInfo inputField = (FieldInfo)
fieldRelation.getInputField();
+ String targetType =
TableFormatUtils.deriveLogicalType(outputField.getFormatInfo()).asSummaryString();
+ fieldAppend.append("
CAST(").append(inputField.format()).append(" AS ")
+ .append(targetType).append(" ) ").append(",");
}
if (fieldAppend.length() > 0) {
fieldAppend.delete(fieldAppend.lastIndexOf(","),
fieldAppend.length());
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
index 60180ed68..c7dffd0ff 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
@@ -35,6 +35,7 @@ import
org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.transform.TransformNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
@@ -140,4 +141,50 @@ public class DataTypeConvertSqlParseTest extends
AbstractTestBase {
ParseResult result = parser.parse();
Assert.assertTrue(result.tryExecute());
}
+
+ /**
+ * build hbase load node
+ *
+ * @return hbase load node
+ */
+ private HbaseLoadNode buildHbaseLoadNode() {
+ return new HbaseLoadNode("2", "test_hbase",
+ Arrays.asList(new FieldInfo("cf:id", new StringFormatInfo()),
new FieldInfo("cf:age",
+ new StringFormatInfo())),
+ Arrays.asList(new FieldRelation(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("cf:id", new
StringFormatInfo())),
+ new FieldRelation(new FieldInfo("age", new
IntFormatInfo()),
+ new FieldInfo("cf:age", new
StringFormatInfo()))), null, null, 1, null, "mytable",
+ "default",
+ "localhost:2181", "MD5(CAST(`id` as String))", null, null,
null, null);
+ }
+
+ /**
+ * Test data type convert implicitly for sinking data into HBase
+ *
+ * @throws Exception The exception may throws when executing
+ */
+ @Test
+ public void testHBaseDataTypeConvertSqlParse() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node inputNode = buildKafkaExtractNode();
+ Node outputNode = buildHbaseLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1",
+ Arrays.asList(inputNode, outputNode),
+ Arrays.asList(
+ buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1",
Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
}