This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9c40c95e72 [INLONG-10130][SDK] Transform SQL support string concat
function (#10143)
9c40c95e72 is described below
commit 9c40c95e72ee520242ca41a656ca9400009e94ca
Author: 卢春亮 <[email protected]>
AuthorDate: Tue May 7 21:36:49 2024 +0800
[INLONG-10130][SDK] Transform SQL support string concat function (#10143)
---
.../transform/process/function/ConcatFunction.java | 70 ++++++++++++++++++++++
.../transform/process/function/NowFunction.java | 48 +++++++++++++++
.../transform/process/operator/OperatorTools.java | 11 ++++
.../transform/process/TestTransformProcessor.java | 41 +++++++++++++
4 files changed, 170 insertions(+)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java
new file mode 100644
index 0000000000..2bfe7a5588
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ConcatFunction
+ *
+ */
+public class ConcatFunction implements ValueParser {
+
+ private List<ValueParser> nodeList;
+
+ /**
+ * Constructor
+ * @param expr
+ */
+ public ConcatFunction(Function expr) {
+ if (expr.getParameters() == null) {
+ this.nodeList = new ArrayList<>();
+ } else {
+ List<Expression> params = expr.getParameters().getExpressions();
+ nodeList = new ArrayList<>(params.size());
+ for (Expression param : params) {
+ ValueParser node = OperatorTools.buildParser(param);
+ nodeList.add(node);
+ }
+ }
+ }
+
+ /**
+ * parse
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex) {
+ StringBuilder builder = new StringBuilder();
+ for (ValueParser node : nodeList) {
+ builder.append(node.parse(sourceData, rowIndex));
+ }
+ return builder.toString();
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java
new file mode 100644
index 0000000000..930a09af05
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Function;
+
+/**
+ * NowFunction
+ *
+ */
+public class NowFunction implements ValueParser {
+
+ /**
+ * Constructor
+ * @param expr
+ */
+ public NowFunction(Function expr) {
+ }
+
+ /**
+ * parse
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex) {
+ return String.valueOf(System.currentTimeMillis());
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
index 361ce1aec3..7f73e11d06 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sdk.transform.process.operator;
+import org.apache.inlong.sdk.transform.process.function.ConcatFunction;
+import org.apache.inlong.sdk.transform.process.function.NowFunction;
import org.apache.inlong.sdk.transform.process.parser.AdditionParser;
import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
import org.apache.inlong.sdk.transform.process.parser.DivisionParser;
@@ -108,6 +110,15 @@ public class OperatorTools {
return new ColumnParser((Function) expr);
} else {
// TODO
+ Function func = (Function) expr;
+ switch (func.getName()) {
+ case "concat":
+ return new ConcatFunction(func);
+ case "now":
+ return new NowFunction(func);
+ default:
+ return new ColumnParser(func);
+ }
}
}
return null;
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
index b508f8f2aa..7e46e28105 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
@@ -295,4 +295,45 @@ public class TestTransformProcessor {
e.printStackTrace();
}
}
+
+ @Test
+ public void testPb2CsvForConcat() {
+ try {
+ List<FieldInfo> fields = this.getTestFieldList();
+ String transformBase64 = this.getPbTestDescription();
+ SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", "msgs");
+ SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ String transformSql = "select
$root.sid,$root.packageID,$child.msgTime,"
+ +
"concat($root.sid,$root.packageID,$child.msgTime,$child.msg)
msg,$root.msgs.msgTime.msg from source";
+ TransformConfig config = new TransformConfig(pbSource, csvSink,
transformSql);
+ // case1
+ TransformProcessor processor = new TransformProcessor(config);
+ byte[] srcBytes = this.getPbTestData();
+ List<String> output = processor.transform(srcBytes, new
HashMap<>());
+ Assert.assertTrue(output.size() == 2);
+ Assert.assertEquals(output.get(0),
"sid|1|1713243918000|sid11713243918000msgValue4");
+ Assert.assertEquals(output.get(1),
"sid|1|1713243918002|sid11713243918002msgValue42");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testPb2CsvForNow() {
+ try {
+ List<FieldInfo> fields = this.getTestFieldList();
+ String transformBase64 = this.getPbTestDescription();
+ SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", "msgs");
+ SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ String transformSql = "select now() from source";
+ TransformConfig config = new TransformConfig(pbSource, csvSink,
transformSql);
+ // case1
+ TransformProcessor processor = new TransformProcessor(config);
+ byte[] srcBytes = this.getPbTestData();
+ List<String> output = processor.transform(srcBytes, new
HashMap<>());
+ Assert.assertTrue(output.size() == 2);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
}