This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c40c95e72 [INLONG-10130][SDK] Transform SQL support string concat 
function (#10143)
9c40c95e72 is described below

commit 9c40c95e72ee520242ca41a656ca9400009e94ca
Author: 卢春亮 <[email protected]>
AuthorDate: Tue May 7 21:36:49 2024 +0800

    [INLONG-10130][SDK] Transform SQL support string concat function (#10143)
---
 .../transform/process/function/ConcatFunction.java | 70 ++++++++++++++++++++++
 .../transform/process/function/NowFunction.java    | 48 +++++++++++++++
 .../transform/process/operator/OperatorTools.java  | 11 ++++
 .../transform/process/TestTransformProcessor.java  | 41 +++++++++++++
 4 files changed, 170 insertions(+)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java
new file mode 100644
index 0000000000..2bfe7a5588
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ConcatFunction
+ * 
+ */
+public class ConcatFunction implements ValueParser {
+
+    private List<ValueParser> nodeList;
+
+    /**
+     * Constructor
+     * @param expr
+     */
+    public ConcatFunction(Function expr) {
+        if (expr.getParameters() == null) {
+            this.nodeList = new ArrayList<>();
+        } else {
+            List<Expression> params = expr.getParameters().getExpressions();
+            nodeList = new ArrayList<>(params.size());
+            for (Expression param : params) {
+                ValueParser node = OperatorTools.buildParser(param);
+                nodeList.add(node);
+            }
+        }
+    }
+
+    /**
+     * parse
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex) {
+        StringBuilder builder = new StringBuilder();
+        for (ValueParser node : nodeList) {
+            builder.append(node.parse(sourceData, rowIndex));
+        }
+        return builder.toString();
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java
new file mode 100644
index 0000000000..930a09af05
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Function;
+
+/**
+ * NowFunction
+ * 
+ */
+public class NowFunction implements ValueParser {
+
+    /**
+     * Constructor
+     * @param expr
+     */
+    public NowFunction(Function expr) {
+    }
+
+    /**
+     * parse
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex) {
+        return String.valueOf(System.currentTimeMillis());
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
index 361ce1aec3..7f73e11d06 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sdk.transform.process.operator;
 
+import org.apache.inlong.sdk.transform.process.function.ConcatFunction;
+import org.apache.inlong.sdk.transform.process.function.NowFunction;
 import org.apache.inlong.sdk.transform.process.parser.AdditionParser;
 import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
 import org.apache.inlong.sdk.transform.process.parser.DivisionParser;
@@ -108,6 +110,15 @@ public class OperatorTools {
                 return new ColumnParser((Function) expr);
             } else {
                 // TODO
+                Function func = (Function) expr;
+                switch (func.getName()) {
+                    case "concat":
+                        return new ConcatFunction(func);
+                    case "now":
+                        return new NowFunction(func);
+                    default:
+                        return new ColumnParser(func);
+                }
             }
         }
         return null;
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
index b508f8f2aa..7e46e28105 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
@@ -295,4 +295,45 @@ public class TestTransformProcessor {
             e.printStackTrace();
         }
     }
+
+    @Test
+    public void testPb2CsvForConcat() {
+        try {
+            List<FieldInfo> fields = this.getTestFieldList();
+            String transformBase64 = this.getPbTestDescription();
+            SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
+            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+            String transformSql = "select 
$root.sid,$root.packageID,$child.msgTime,"
+                    + 
"concat($root.sid,$root.packageID,$child.msgTime,$child.msg) 
msg,$root.msgs.msgTime.msg from source";
+            TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
+            // case1
+            TransformProcessor processor = new TransformProcessor(config);
+            byte[] srcBytes = this.getPbTestData();
+            List<String> output = processor.transform(srcBytes, new 
HashMap<>());
+            Assert.assertTrue(output.size() == 2);
+            Assert.assertEquals(output.get(0), 
"sid|1|1713243918000|sid11713243918000msgValue4");
+            Assert.assertEquals(output.get(1), 
"sid|1|1713243918002|sid11713243918002msgValue42");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testPb2CsvForNow() {
+        try {
+            List<FieldInfo> fields = this.getTestFieldList();
+            String transformBase64 = this.getPbTestDescription();
+            SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
+            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+            String transformSql = "select now() from source";
+            TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
+            // case1
+            TransformProcessor processor = new TransformProcessor(config);
+            byte[] srcBytes = this.getPbTestData();
+            List<String> output = processor.transform(srcBytes, new 
HashMap<>());
+            Assert.assertTrue(output.size() == 2);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
 }

Reply via email to