This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a27664530 [INLONG-10618][SDK] Transform SQL support common 
functions(Including substring, locate, to_date and date_format) (#10744)
8a27664530 is described below

commit 8a2766453099e281e3a4d9f490207737a9f7cda1
Author: LeeWY <[email protected]>
AuthorDate: Tue Aug 6 19:03:46 2024 +0800

    [INLONG-10618][SDK] Transform SQL support common functions(Including 
substring, locate, to_date and date_format) (#10744)
    
    * [INLONG-10618][SDK] Transform SQL support common functions(Including 
substring, locate, to_date and date_format)
    
    * [INLONG-10618][SDK] Fix unit test failure, set time zone to Shanghai
    
    * [INLONG-10618][SDK] Make the DateTimeFormatter object reusable to avoid 
creating multiple identical DateTimeFormatter objects.
    
    * [INLONG-10618][SDK] Make the SimpleDateFormat object reusable to avoid 
creating multiple identical SimpleDateFormat objects.
    
    ---------
    
    Co-authored-by: jameswyli <[email protected]>
---
 .../process/function/DateFormatFunction.java       |  90 +++++++++++++++
 .../transform/process/function/LocateFunction.java |  87 +++++++++++++++
 .../process/function/SubstringFunction.java        |  81 ++++++++++++++
 .../transform/process/function/ToDateFunction.java |  93 ++++++++++++++++
 .../transform/process/operator/OperatorTools.java  |   8 ++
 .../TestTransformStringFunctionsProcessor.java     | 121 ++++++++++++++++++++
 .../TestTransformTemporalFunctionsProcessor.java   | 123 +++++++++++++++++++++
 7 files changed, 603 insertions(+)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateFormatFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateFormatFunction.java
new file mode 100644
index 0000000000..9e233ff882
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateFormatFunction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.math.BigDecimal;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * DateFormatFunction
+ * description: date_format(timestamp,format)--converts timestamp(in seconds) 
to a value of string in the format
+ * specified by the date format string. The format string is compatible with 
Java’s SimpleDateFormat
+ */
+public class DateFormatFunction implements ValueParser {
+
+    private ValueParser timestampParser;
+    private ValueParser formatParser;
+    private static final Map<String, SimpleDateFormat> SIMPLE_DATE_FORMATS = 
new ConcurrentHashMap<>();
+
+    /**
+     * Constructor
+     *
+     * @param expr
+     */
+    public DateFormatFunction(Function expr) {
+        List<Expression> expressions = expr.getParameters().getExpressions();
+        timestampParser = OperatorTools.buildParser(expressions.get(0));
+        formatParser = OperatorTools.buildParser(expressions.get(1));
+    }
+
+    /**
+     * parse
+     *
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object timestampObj = timestampParser.parse(sourceData, rowIndex, 
context);
+        Object formatObj = formatParser.parse(sourceData, rowIndex, context);
+        BigDecimal timestamp = OperatorTools.parseBigDecimal(timestampObj);
+        String format = OperatorTools.parseString(formatObj);
+        SimpleDateFormat sdf = getSimpleDateFormat(format);
+        // the timestamp is in seconds, multiply 1000 to get milliseconds
+        Date date = new Date(timestamp.longValue() * 1000);
+        return sdf.format(date);
+    }
+
+    /**
+     * getSimpleDateFormat
+     *
+     * @param pattern
+     * @return
+     */
+    private SimpleDateFormat getSimpleDateFormat(String pattern) {
+        SimpleDateFormat sdf = SIMPLE_DATE_FORMATS.get(pattern);
+        if (sdf == null) {
+            sdf = new SimpleDateFormat(pattern);
+            SIMPLE_DATE_FORMATS.put(pattern, sdf);
+        }
+        return sdf;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LocateFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LocateFunction.java
new file mode 100644
index 0000000000..e300815eec
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LocateFunction.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.List;
+
+/**
+ * LocateFunction
+ * description: locate(string1, string2[, integer])
+ * - returns the position of the first occurrence of string1 in string2 after 
position integer
+ * - returns 0 if not found
+ * - returns NULL if any of arguments is NULL
+ */
+public class LocateFunction implements ValueParser {
+
+    private ValueParser stringParser1;
+    private ValueParser stringParser2;
+    private ValueParser startPositionParser;
+
+    /**
+     * Constructor
+     *
+     * @param expr
+     */
+    public LocateFunction(Function expr) {
+        List<Expression> expressions = expr.getParameters().getExpressions();
+        // Determine the number of arguments and build parser
+        stringParser1 = OperatorTools.buildParser(expressions.get(0));
+        stringParser2 = OperatorTools.buildParser(expressions.get(1));
+        if (expressions.size() == 3) {
+            startPositionParser = 
OperatorTools.buildParser(expressions.get(2));
+        }
+    }
+
+    /**
+     * parse
+     *
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object stringObj1 = stringParser1.parse(sourceData, rowIndex, context);
+        Object stringObj2 = stringParser2.parse(sourceData, rowIndex, context);
+        // If any of arguments is null, return null
+        if (stringObj1 == null || stringObj2 == null) {
+            return null;
+        }
+        String str1 = OperatorTools.parseString(stringObj1);
+        String str2 = OperatorTools.parseString(stringObj2);
+        if (startPositionParser != null) {
+            Object startPositionObj = startPositionParser.parse(sourceData, 
rowIndex, context);
+            // if startPositionObj is null, return null
+            if (startPositionObj == null) {
+                return null;
+            }
+            int startPosition = 
OperatorTools.parseBigDecimal(startPositionObj).intValue();
+            return str2.indexOf(str1, startPosition - 1) + 1;
+        } else {
+            return str2.indexOf(str1) + 1;
+        }
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SubstringFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SubstringFunction.java
new file mode 100644
index 0000000000..063686aa55
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SubstringFunction.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.List;
+
+/**
+ * SubstringFunction
+ * description: substring(string FROM INT1 [ FOR INT2 ])--returns a substring 
of STRING starting from position INT1 with
+ * length INT2 (to the end by default)
+ */
+public class SubstringFunction implements ValueParser {
+
+    private ValueParser stringParser;
+    private ValueParser startPositionParser;
+    private ValueParser lengthParser;
+
+    /**
+     * Constructor
+     * @param expr
+     */
+    public SubstringFunction(Function expr) {
+        List<Expression> expressions = expr.getParameters().getExpressions();
+        // Determine the number of arguments and build parser
+        stringParser = OperatorTools.buildParser(expressions.get(0));
+        startPositionParser = OperatorTools.buildParser(expressions.get(1));
+        if (expressions.size() == 3) {
+            lengthParser = OperatorTools.buildParser(expressions.get(2));
+        }
+    }
+
+    /**
+     * parse
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object stringObj = stringParser.parse(sourceData, rowIndex, context);
+        Object startPositionObj = startPositionParser.parse(sourceData, 
rowIndex, context);
+        String str = OperatorTools.parseString(stringObj);
+        int start = OperatorTools.parseBigDecimal(startPositionObj).intValue();
+        if (start > str.length()) {
+            return "";
+        }
+        if (lengthParser != null) {
+            Object lengthObj = lengthParser.parse(sourceData, rowIndex, 
context);
+            int len = OperatorTools.parseBigDecimal(lengthObj).intValue();
+            if (len <= 0) {
+                return "";
+            }
+            return str.substring(Math.max(start - 1, 0), Math.min(start - 1 + 
len, str.length()));
+        } else {
+            return str.substring(Math.max(start - 1, 0));
+        }
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ToDateFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ToDateFunction.java
new file mode 100644
index 0000000000..5a03f1d2e4
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ToDateFunction.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * ToDateFunction
+ * description: to_date(string1[, string2])--converts a date string string1 
with format string2 (by default ‘yyyy-MM-dd’) to a date
+ */
+public class ToDateFunction implements ValueParser {
+
+    private ValueParser stringParser1;
+    private ValueParser stringParser2;
+    private static final Map<String, DateTimeFormatter> INPUT_FORMATTERS = new 
ConcurrentHashMap<>();
+    private static final DateTimeFormatter OUTPUT_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
+    /**
+     * Constructor
+     *
+     * @param expr
+     */
+    public ToDateFunction(Function expr) {
+        List<Expression> expressions = expr.getParameters().getExpressions();
+        // Determine the number of arguments and build parser
+        stringParser1 = OperatorTools.buildParser(expressions.get(0));
+        if (expressions.size() == 2) {
+            stringParser2 = OperatorTools.buildParser(expressions.get(1));
+        }
+    }
+
+    /**
+     * parse
+     *
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object stringObj1 = stringParser1.parse(sourceData, rowIndex, context);
+        String str1 = OperatorTools.parseString(stringObj1);
+        String str2 = "yyyy-MM-dd";
+        if (stringParser2 != null) {
+            Object stringObj2 = stringParser2.parse(sourceData, rowIndex, 
context);
+            str2 = OperatorTools.parseString(stringObj2);
+        }
+        LocalDate date = LocalDate.parse(str1, getDateTimeFormatter(str2));
+        return date.format(OUTPUT_FORMATTER);
+    }
+
+    /**
+     * getDateTimeFormatter
+     *
+     * @param pattern
+     * @return
+     */
+    private DateTimeFormatter getDateTimeFormatter(String pattern) {
+        DateTimeFormatter formatter = INPUT_FORMATTERS.get(pattern);
+        if (formatter == null) {
+            formatter = DateTimeFormatter.ofPattern(pattern);
+            INPUT_FORMATTERS.put(pattern, formatter);
+        }
+        return formatter;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
index 94cbba090c..bc97d11148 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
@@ -19,14 +19,18 @@ package org.apache.inlong.sdk.transform.process.operator;
 
 import org.apache.inlong.sdk.transform.process.function.AbsFunction;
 import org.apache.inlong.sdk.transform.process.function.ConcatFunction;
+import org.apache.inlong.sdk.transform.process.function.DateFormatFunction;
 import org.apache.inlong.sdk.transform.process.function.ExpFunction;
 import org.apache.inlong.sdk.transform.process.function.LnFunction;
+import org.apache.inlong.sdk.transform.process.function.LocateFunction;
 import org.apache.inlong.sdk.transform.process.function.Log10Function;
 import org.apache.inlong.sdk.transform.process.function.Log2Function;
 import org.apache.inlong.sdk.transform.process.function.LogFunction;
 import org.apache.inlong.sdk.transform.process.function.NowFunction;
 import org.apache.inlong.sdk.transform.process.function.PowerFunction;
 import org.apache.inlong.sdk.transform.process.function.SqrtFunction;
+import org.apache.inlong.sdk.transform.process.function.SubstringFunction;
+import org.apache.inlong.sdk.transform.process.function.ToDateFunction;
 import org.apache.inlong.sdk.transform.process.parser.AdditionParser;
 import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
 import org.apache.inlong.sdk.transform.process.parser.DivisionParser;
@@ -85,6 +89,10 @@ public class OperatorTools {
         functionMap.put("log2", Log2Function::new);
         functionMap.put("log", LogFunction::new);
         functionMap.put("exp", ExpFunction::new);
+        functionMap.put("substring", SubstringFunction::new);
+        functionMap.put("locate", LocateFunction::new);
+        functionMap.put("to_date", ToDateFunction::new);
+        functionMap.put("date_format", DateFormatFunction::new);
     }
 
     public static ExpressionOperator buildOperator(Expression expr) {
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java
new file mode 100644
index 0000000000..8dabad12f6
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * TestTransformStringFunctionsProcessor
+ * description: test the string functions in transform processor
+ */
+public class TestTransformStringFunctionsProcessor {
+
+    private static final List<FieldInfo> srcFields = new ArrayList<>();
+    private static final List<FieldInfo> dstFields = new ArrayList<>();
+    private static final CsvSourceInfo csvSource;
+    private static final KvSinkInfo kvSink;
+    static {
+        for (int i = 1; i < 4; i++) {
+            FieldInfo field = new FieldInfo();
+            field.setName("string" + i);
+            srcFields.add(field);
+        }
+        for (int i = 1; i < 4; i++) {
+            FieldInfo field = new FieldInfo();
+            field.setName("numeric" + i);
+            srcFields.add(field);
+        }
+        FieldInfo field = new FieldInfo();
+        field.setName("result");
+        dstFields.add(field);
+        csvSource = new CsvSourceInfo("UTF-8", '|', '\\', srcFields);
+        kvSink = new KvSinkInfo("UTF-8", dstFields);
+    }
+
+    @Test
+    public void testSubstringFunction() throws Exception {
+        String transformSql1 = "select substring(string2, numeric1) from 
source";
+        TransformConfig config1 = new TransformConfig(transformSql1);
+        TransformProcessor<String, String> processor1 = TransformProcessor
+                .create(config1, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case1: substring('banana', 2)
+        List<String> output1 = 
processor1.transform("apple|banana|cloud|2|1|3", new HashMap<>());
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output1.get(0), "result=anana");
+        String transformSql2 = "select substring(string1, numeric1, numeric3) 
from source";
+        TransformConfig config2 = new TransformConfig(transformSql2);
+        TransformProcessor<String, String> processor2 = TransformProcessor
+                .create(config2, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case2: substring('apple', 1, 3)
+        List<String> output2 = 
processor2.transform("apple|banana|cloud|1|1|3", new HashMap<>());
+        Assert.assertEquals(1, output2.size());
+        Assert.assertEquals(output2.get(0), "result=app");
+        // case3: substring('apple', 2, 9)
+        List<String> output3 = 
processor2.transform("apple|banana|cloud|2|1|9", new HashMap<>());
+        Assert.assertEquals(1, output3.size());
+        Assert.assertEquals(output3.get(0), "result=pple");
+    }
+
+    @Test
+    public void testLocateFunction() throws Exception {
+        String transformSql1 = "select locate(string1, string2) from source";
+        TransformConfig config1 = new TransformConfig(transformSql1);
+        TransformProcessor<String, String> processor1 = TransformProcessor
+                .create(config1, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case1: locate('app', 'apple')
+        List<String> output1 = processor1.transform("app|apple|cloud|2|1|3", 
new HashMap<>());
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output1.get(0), "result=1");
+        // case2: locate('ape', 'apple')
+        List<String> output2 = processor1.transform("ape|apple|cloud|2|1|3", 
new HashMap<>());
+        Assert.assertEquals(1, output2.size());
+        Assert.assertEquals(output2.get(0), "result=0");
+        String transformSql2 = "select locate(string1, string2, numeric1) from 
source";
+        TransformConfig config2 = new TransformConfig(transformSql2);
+        TransformProcessor<String, String> processor2 = TransformProcessor
+                .create(config2, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case3: locate('app', 'appapp', 2)
+        List<String> output3 = processor2.transform("app|appapp|cloud|2|1|3", 
new HashMap<>());
+        Assert.assertEquals(1, output3.size());
+        Assert.assertEquals(output3.get(0), "result=4");
+        // case4: locate('app', 'appape', 2)
+        List<String> output4 = processor2.transform("app|appape|cloud|2|1|9", 
new HashMap<>());
+        Assert.assertEquals(1, output4.size());
+        Assert.assertEquals(output4.get(0), "result=0");
+        // case5: locate('app', null)
+        List<String> output5 = processor1.transform("app", new HashMap<>());
+        Assert.assertEquals(1, output5.size());
+        Assert.assertEquals(output5.get(0), "result=null");
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
new file mode 100644
index 0000000000..25675b25f3
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TimeZone;
+
+/**
+ * TestTransformTemporalFunctionsProcessor
+ * description: test the temporal functions in transform processor
+ */
+public class TestTransformTemporalFunctionsProcessor {
+
+    private static final List<FieldInfo> srcFields = new ArrayList<>();
+    private static final List<FieldInfo> dstFields = new ArrayList<>();
+    private static final CsvSourceInfo csvSource;
+    private static final KvSinkInfo kvSink;
+    static {
+        for (int i = 1; i < 4; i++) {
+            FieldInfo field = new FieldInfo();
+            field.setName("string" + i);
+            srcFields.add(field);
+        }
+        for (int i = 1; i < 4; i++) {
+            FieldInfo field = new FieldInfo();
+            field.setName("numeric" + i);
+            srcFields.add(field);
+        }
+        FieldInfo field = new FieldInfo();
+        field.setName("result");
+        dstFields.add(field);
+        csvSource = new CsvSourceInfo("UTF-8", '|', '\\', srcFields);
+        kvSink = new KvSinkInfo("UTF-8", dstFields);
+    }
+
+    @Before
+    public void setUp() {
+        TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
+    }
+
+    @Test
+    public void testToDateFunction() throws Exception {
+        String transformSql1 = "select to_date(string1) from source";
+        TransformConfig config1 = new TransformConfig(transformSql1);
+        TransformProcessor<String, String> processor1 = TransformProcessor
+                .create(config1, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case1: to_date('2024-08-15')
+        List<String> output1 = 
processor1.transform("2024-08-15|apple|cloud|2|1|3", new HashMap<>());
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output1.get(0), "result=2024-08-15");
+        String transformSql2 = "select to_date(string1, string2) from source";
+        TransformConfig config2 = new TransformConfig(transformSql2);
+        TransformProcessor<String, String> processor2 = TransformProcessor
+                .create(config2, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case2: to_date('20240815', 'yyyyMMdd')
+        List<String> output2 = 
processor2.transform("20240815|yyyyMMdd|cloud|2|1|3", new HashMap<>());
+        Assert.assertEquals(1, output2.size());
+        Assert.assertEquals(output2.get(0), "result=2024-08-15");
+        // case3: to_date('08152024', 'MMddyyyy')
+        List<String> output3 = 
processor2.transform("08152024|MMddyyyy|cloud|2|1|3", new HashMap<>());
+        Assert.assertEquals(1, output3.size());
+        Assert.assertEquals(output3.get(0), "result=2024-08-15");
+        // case4: to_date('2024/08/15', 'yyyy/MM/dd')
+        List<String> output4 = 
processor2.transform("2024/08/15|yyyy/MM/dd|cloud|2|1|3", new HashMap<>());
+        Assert.assertEquals(1, output4.size());
+        Assert.assertEquals(output4.get(0), "result=2024-08-15");
+    }
+
+    @Test
+    public void testDateFormatFunction() throws Exception {
+        String transformSql = "select date_format(numeric1, string1) from 
source";
+        TransformConfig config = new TransformConfig(transformSql);
+        TransformProcessor<String, String> processor1 = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case1: date_format(1722524216, 'yyyy-MM-dd HH:mm:ss')
+        List<String> output1 = processor1.transform("yyyy-MM-dd 
HH:mm:ss|apple|cloud|1722524216|1|3", new HashMap<>());
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output1.get(0), "result=2024-08-01 22:56:56");
+        // case2: date_format(1722524216, 'yyyy-MM-dd')
+        List<String> output2 = 
processor1.transform("yyyy-MM-dd|apple|cloud|1722524216|1|3", new HashMap<>());
+        Assert.assertEquals(1, output2.size());
+        Assert.assertEquals(output2.get(0), "result=2024-08-01");
+        // case3: date_format(1722524216, 'yyyyMMddHHmmss')
+        List<String> output3 = 
processor1.transform("yyyyMMddHHmmss|apple|cloud|1722524216|1|3", new 
HashMap<>());
+        Assert.assertEquals(1, output3.size());
+        Assert.assertEquals(output3.get(0), "result=20240801225656");
+        // case1: date_format(1722524216, 'yyyy/MM/dd HH:mm:ss')
+        List<String> output4 = processor1.transform("yyyy/MM/dd 
HH:mm:ss|apple|cloud|1722524216|1|3", new HashMap<>());
+        Assert.assertEquals(1, output4.size());
+        Assert.assertEquals(output4.get(0), "result=2024/08/01 22:56:56");
+    }
+}

Reply via email to