This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8a27664530 [INLONG-10618][SDK] Transform SQL support common
functions(Including substring, locate, to_date and date_format) (#10744)
8a27664530 is described below
commit 8a2766453099e281e3a4d9f490207737a9f7cda1
Author: LeeWY <[email protected]>
AuthorDate: Tue Aug 6 19:03:46 2024 +0800
[INLONG-10618][SDK] Transform SQL support common functions(Including
substring, locate, to_date and date_format) (#10744)
* [INLONG-10618][SDK] Transform SQL support common functions(Including
substring, locate, to_date and date_format)
* [INLONG-10618][SDK] Fix unit test failure, set time zone to Shanghai
* [INLONG-10618][SDK] Make the DateTimeFormatter object reusable to avoid
creating multiple identical DateTimeFormatter objects.
* [INLONG-10618][SDK] Make the SimpleDateFormat object reusable to avoid
creating multiple identical SimpleDateFormat objects.
---------
Co-authored-by: jameswyli <[email protected]>
---
.../process/function/DateFormatFunction.java | 90 +++++++++++++++
.../transform/process/function/LocateFunction.java | 87 +++++++++++++++
.../process/function/SubstringFunction.java | 81 ++++++++++++++
.../transform/process/function/ToDateFunction.java | 93 ++++++++++++++++
.../transform/process/operator/OperatorTools.java | 8 ++
.../TestTransformStringFunctionsProcessor.java | 121 ++++++++++++++++++++
.../TestTransformTemporalFunctionsProcessor.java | 123 +++++++++++++++++++++
7 files changed, 603 insertions(+)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateFormatFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateFormatFunction.java
new file mode 100644
index 0000000000..9e233ff882
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateFormatFunction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.math.BigDecimal;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * DateFormatFunction
+ * description: date_format(timestamp,format)--converts timestamp(in seconds)
to a value of string in the format
+ * specified by the date format string. The format string is compatible with
Java’s SimpleDateFormat
+ */
+public class DateFormatFunction implements ValueParser {
+
+ private ValueParser timestampParser;
+ private ValueParser formatParser;
+ private static final Map<String, SimpleDateFormat> SIMPLE_DATE_FORMATS =
new ConcurrentHashMap<>();
+
+ /**
+ * Constructor
+ *
+ * @param expr
+ */
+ public DateFormatFunction(Function expr) {
+ List<Expression> expressions = expr.getParameters().getExpressions();
+ timestampParser = OperatorTools.buildParser(expressions.get(0));
+ formatParser = OperatorTools.buildParser(expressions.get(1));
+ }
+
+ /**
+ * parse
+ *
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object timestampObj = timestampParser.parse(sourceData, rowIndex,
context);
+ Object formatObj = formatParser.parse(sourceData, rowIndex, context);
+ BigDecimal timestamp = OperatorTools.parseBigDecimal(timestampObj);
+ String format = OperatorTools.parseString(formatObj);
+ SimpleDateFormat sdf = getSimpleDateFormat(format);
+ // the timestamp is in seconds, multiply 1000 to get milliseconds
+ Date date = new Date(timestamp.longValue() * 1000);
+ return sdf.format(date);
+ }
+
+ /**
+ * getSimpleDateFormat
+ *
+ * @param pattern
+ * @return
+ */
+ private SimpleDateFormat getSimpleDateFormat(String pattern) {
+ SimpleDateFormat sdf = SIMPLE_DATE_FORMATS.get(pattern);
+ if (sdf == null) {
+ sdf = new SimpleDateFormat(pattern);
+ SIMPLE_DATE_FORMATS.put(pattern, sdf);
+ }
+ return sdf;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LocateFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LocateFunction.java
new file mode 100644
index 0000000000..e300815eec
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LocateFunction.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.List;
+
+/**
+ * LocateFunction
+ * description: locate(string1, string2[, integer])
+ * - returns the position of the first occurrence of string1 in string2 after
position integer
+ * - returns 0 if not found
+ * - returns NULL if any of arguments is NULL
+ */
+public class LocateFunction implements ValueParser {
+
+ private ValueParser stringParser1;
+ private ValueParser stringParser2;
+ private ValueParser startPositionParser;
+
+ /**
+ * Constructor
+ *
+ * @param expr
+ */
+ public LocateFunction(Function expr) {
+ List<Expression> expressions = expr.getParameters().getExpressions();
+ // Determine the number of arguments and build parser
+ stringParser1 = OperatorTools.buildParser(expressions.get(0));
+ stringParser2 = OperatorTools.buildParser(expressions.get(1));
+ if (expressions.size() == 3) {
+ startPositionParser =
OperatorTools.buildParser(expressions.get(2));
+ }
+ }
+
+ /**
+ * parse
+ *
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object stringObj1 = stringParser1.parse(sourceData, rowIndex, context);
+ Object stringObj2 = stringParser2.parse(sourceData, rowIndex, context);
+ // If any of arguments is null, return null
+ if (stringObj1 == null || stringObj2 == null) {
+ return null;
+ }
+ String str1 = OperatorTools.parseString(stringObj1);
+ String str2 = OperatorTools.parseString(stringObj2);
+ if (startPositionParser != null) {
+ Object startPositionObj = startPositionParser.parse(sourceData,
rowIndex, context);
+ // if startPositionObj is null, return null
+ if (startPositionObj == null) {
+ return null;
+ }
+ int startPosition =
OperatorTools.parseBigDecimal(startPositionObj).intValue();
+ return str2.indexOf(str1, startPosition - 1) + 1;
+ } else {
+ return str2.indexOf(str1) + 1;
+ }
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SubstringFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SubstringFunction.java
new file mode 100644
index 0000000000..063686aa55
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SubstringFunction.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.List;
+
+/**
+ * SubstringFunction
+ * description: substring(string FROM INT1 [ FOR INT2 ])--returns a substring
of STRING starting from position INT1 with
+ * length INT2 (to the end by default)
+ */
+public class SubstringFunction implements ValueParser {
+
+ private ValueParser stringParser;
+ private ValueParser startPositionParser;
+ private ValueParser lengthParser;
+
+ /**
+ * Constructor
+ * @param expr
+ */
+ public SubstringFunction(Function expr) {
+ List<Expression> expressions = expr.getParameters().getExpressions();
+ // Determine the number of arguments and build parser
+ stringParser = OperatorTools.buildParser(expressions.get(0));
+ startPositionParser = OperatorTools.buildParser(expressions.get(1));
+ if (expressions.size() == 3) {
+ lengthParser = OperatorTools.buildParser(expressions.get(2));
+ }
+ }
+
+ /**
+ * parse
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object stringObj = stringParser.parse(sourceData, rowIndex, context);
+ Object startPositionObj = startPositionParser.parse(sourceData,
rowIndex, context);
+ String str = OperatorTools.parseString(stringObj);
+ int start = OperatorTools.parseBigDecimal(startPositionObj).intValue();
+ if (start > str.length()) {
+ return "";
+ }
+ if (lengthParser != null) {
+ Object lengthObj = lengthParser.parse(sourceData, rowIndex,
context);
+ int len = OperatorTools.parseBigDecimal(lengthObj).intValue();
+ if (len <= 0) {
+ return "";
+ }
+ return str.substring(Math.max(start - 1, 0), Math.min(start - 1 +
len, str.length()));
+ } else {
+ return str.substring(Math.max(start - 1, 0));
+ }
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ToDateFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ToDateFunction.java
new file mode 100644
index 0000000000..5a03f1d2e4
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ToDateFunction.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * ToDateFunction
+ * description: to_date(string1[, string2])--converts a date string string1
with format string2 (by default ‘yyyy-MM-dd’) to a date
+ */
+public class ToDateFunction implements ValueParser {
+
+ private ValueParser stringParser1;
+ private ValueParser stringParser2;
+ private static final Map<String, DateTimeFormatter> INPUT_FORMATTERS = new
ConcurrentHashMap<>();
+ private static final DateTimeFormatter OUTPUT_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
+ /**
+ * Constructor
+ *
+ * @param expr
+ */
+ public ToDateFunction(Function expr) {
+ List<Expression> expressions = expr.getParameters().getExpressions();
+ // Determine the number of arguments and build parser
+ stringParser1 = OperatorTools.buildParser(expressions.get(0));
+ if (expressions.size() == 2) {
+ stringParser2 = OperatorTools.buildParser(expressions.get(1));
+ }
+ }
+
+ /**
+ * parse
+ *
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object stringObj1 = stringParser1.parse(sourceData, rowIndex, context);
+ String str1 = OperatorTools.parseString(stringObj1);
+ String str2 = "yyyy-MM-dd";
+ if (stringParser2 != null) {
+ Object stringObj2 = stringParser2.parse(sourceData, rowIndex,
context);
+ str2 = OperatorTools.parseString(stringObj2);
+ }
+ LocalDate date = LocalDate.parse(str1, getDateTimeFormatter(str2));
+ return date.format(OUTPUT_FORMATTER);
+ }
+
+ /**
+ * getDateTimeFormatter
+ *
+ * @param pattern
+ * @return
+ */
+ private DateTimeFormatter getDateTimeFormatter(String pattern) {
+ DateTimeFormatter formatter = INPUT_FORMATTERS.get(pattern);
+ if (formatter == null) {
+ formatter = DateTimeFormatter.ofPattern(pattern);
+ INPUT_FORMATTERS.put(pattern, formatter);
+ }
+ return formatter;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
index 94cbba090c..bc97d11148 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
@@ -19,14 +19,18 @@ package org.apache.inlong.sdk.transform.process.operator;
import org.apache.inlong.sdk.transform.process.function.AbsFunction;
import org.apache.inlong.sdk.transform.process.function.ConcatFunction;
+import org.apache.inlong.sdk.transform.process.function.DateFormatFunction;
import org.apache.inlong.sdk.transform.process.function.ExpFunction;
import org.apache.inlong.sdk.transform.process.function.LnFunction;
+import org.apache.inlong.sdk.transform.process.function.LocateFunction;
import org.apache.inlong.sdk.transform.process.function.Log10Function;
import org.apache.inlong.sdk.transform.process.function.Log2Function;
import org.apache.inlong.sdk.transform.process.function.LogFunction;
import org.apache.inlong.sdk.transform.process.function.NowFunction;
import org.apache.inlong.sdk.transform.process.function.PowerFunction;
import org.apache.inlong.sdk.transform.process.function.SqrtFunction;
+import org.apache.inlong.sdk.transform.process.function.SubstringFunction;
+import org.apache.inlong.sdk.transform.process.function.ToDateFunction;
import org.apache.inlong.sdk.transform.process.parser.AdditionParser;
import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
import org.apache.inlong.sdk.transform.process.parser.DivisionParser;
@@ -85,6 +89,10 @@ public class OperatorTools {
functionMap.put("log2", Log2Function::new);
functionMap.put("log", LogFunction::new);
functionMap.put("exp", ExpFunction::new);
+ functionMap.put("substring", SubstringFunction::new);
+ functionMap.put("locate", LocateFunction::new);
+ functionMap.put("to_date", ToDateFunction::new);
+ functionMap.put("date_format", DateFormatFunction::new);
}
public static ExpressionOperator buildOperator(Expression expr) {
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java
new file mode 100644
index 0000000000..8dabad12f6
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * TestTransformStringFunctionsProcessor
+ * description: test the string functions in transform processor
+ */
+public class TestTransformStringFunctionsProcessor {
+
+ private static final List<FieldInfo> srcFields = new ArrayList<>();
+ private static final List<FieldInfo> dstFields = new ArrayList<>();
+ private static final CsvSourceInfo csvSource;
+ private static final KvSinkInfo kvSink;
+ static {
+ for (int i = 1; i < 4; i++) {
+ FieldInfo field = new FieldInfo();
+ field.setName("string" + i);
+ srcFields.add(field);
+ }
+ for (int i = 1; i < 4; i++) {
+ FieldInfo field = new FieldInfo();
+ field.setName("numeric" + i);
+ srcFields.add(field);
+ }
+ FieldInfo field = new FieldInfo();
+ field.setName("result");
+ dstFields.add(field);
+ csvSource = new CsvSourceInfo("UTF-8", '|', '\\', srcFields);
+ kvSink = new KvSinkInfo("UTF-8", dstFields);
+ }
+
+ @Test
+ public void testSubstringFunction() throws Exception {
+ String transformSql1 = "select substring(string2, numeric1) from
source";
+ TransformConfig config1 = new TransformConfig(transformSql1);
+ TransformProcessor<String, String> processor1 = TransformProcessor
+ .create(config1,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case1: substring('banana', 2)
+ List<String> output1 =
processor1.transform("apple|banana|cloud|2|1|3", new HashMap<>());
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "result=anana");
+ String transformSql2 = "select substring(string1, numeric1, numeric3)
from source";
+ TransformConfig config2 = new TransformConfig(transformSql2);
+ TransformProcessor<String, String> processor2 = TransformProcessor
+ .create(config2,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case2: substring('apple', 1, 3)
+ List<String> output2 =
processor2.transform("apple|banana|cloud|1|1|3", new HashMap<>());
+ Assert.assertEquals(1, output2.size());
+ Assert.assertEquals(output2.get(0), "result=app");
+ // case3: substring('apple', 2, 9)
+ List<String> output3 =
processor2.transform("apple|banana|cloud|2|1|9", new HashMap<>());
+ Assert.assertEquals(1, output3.size());
+ Assert.assertEquals(output3.get(0), "result=pple");
+ }
+
+ @Test
+ public void testLocateFunction() throws Exception {
+ String transformSql1 = "select locate(string1, string2) from source";
+ TransformConfig config1 = new TransformConfig(transformSql1);
+ TransformProcessor<String, String> processor1 = TransformProcessor
+ .create(config1,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case1: locate('app', 'apple')
+ List<String> output1 = processor1.transform("app|apple|cloud|2|1|3",
new HashMap<>());
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "result=1");
+ // case2: locate('ape', 'apple')
+ List<String> output2 = processor1.transform("ape|apple|cloud|2|1|3",
new HashMap<>());
+ Assert.assertEquals(1, output2.size());
+ Assert.assertEquals(output2.get(0), "result=0");
+ String transformSql2 = "select locate(string1, string2, numeric1) from
source";
+ TransformConfig config2 = new TransformConfig(transformSql2);
+ TransformProcessor<String, String> processor2 = TransformProcessor
+ .create(config2,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case3: locate('app', 'appapp', 2)
+ List<String> output3 = processor2.transform("app|appapp|cloud|2|1|3",
new HashMap<>());
+ Assert.assertEquals(1, output3.size());
+ Assert.assertEquals(output3.get(0), "result=4");
+ // case4: locate('app', 'appape', 2)
+ List<String> output4 = processor2.transform("app|appape|cloud|2|1|9",
new HashMap<>());
+ Assert.assertEquals(1, output4.size());
+ Assert.assertEquals(output4.get(0), "result=0");
+ // case5: locate('app', null)
+ List<String> output5 = processor1.transform("app", new HashMap<>());
+ Assert.assertEquals(1, output5.size());
+ Assert.assertEquals(output5.get(0), "result=null");
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
new file mode 100644
index 0000000000..25675b25f3
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TimeZone;
+
+/**
+ * TestTransformTemporalFunctionsProcessor
+ * description: test the temporal functions in transform processor
+ */
+public class TestTransformTemporalFunctionsProcessor {
+
+ private static final List<FieldInfo> srcFields = new ArrayList<>();
+ private static final List<FieldInfo> dstFields = new ArrayList<>();
+ private static final CsvSourceInfo csvSource;
+ private static final KvSinkInfo kvSink;
+ static {
+ for (int i = 1; i < 4; i++) {
+ FieldInfo field = new FieldInfo();
+ field.setName("string" + i);
+ srcFields.add(field);
+ }
+ for (int i = 1; i < 4; i++) {
+ FieldInfo field = new FieldInfo();
+ field.setName("numeric" + i);
+ srcFields.add(field);
+ }
+ FieldInfo field = new FieldInfo();
+ field.setName("result");
+ dstFields.add(field);
+ csvSource = new CsvSourceInfo("UTF-8", '|', '\\', srcFields);
+ kvSink = new KvSinkInfo("UTF-8", dstFields);
+ }
+
+ @Before
+ public void setUp() {
+ TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
+ }
+
+ @Test
+ public void testToDateFunction() throws Exception {
+ String transformSql1 = "select to_date(string1) from source";
+ TransformConfig config1 = new TransformConfig(transformSql1);
+ TransformProcessor<String, String> processor1 = TransformProcessor
+ .create(config1,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case1: to_date('2024-08-15')
+ List<String> output1 =
processor1.transform("2024-08-15|apple|cloud|2|1|3", new HashMap<>());
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "result=2024-08-15");
+ String transformSql2 = "select to_date(string1, string2) from source";
+ TransformConfig config2 = new TransformConfig(transformSql2);
+ TransformProcessor<String, String> processor2 = TransformProcessor
+ .create(config2,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case2: to_date('20240815', 'yyyyMMdd')
+ List<String> output2 =
processor2.transform("20240815|yyyyMMdd|cloud|2|1|3", new HashMap<>());
+ Assert.assertEquals(1, output2.size());
+ Assert.assertEquals(output2.get(0), "result=2024-08-15");
+ // case3: to_date('08152024', 'MMddyyyy')
+ List<String> output3 =
processor2.transform("08152024|MMddyyyy|cloud|2|1|3", new HashMap<>());
+ Assert.assertEquals(1, output3.size());
+ Assert.assertEquals(output3.get(0), "result=2024-08-15");
+ // case4: to_date('2024/08/15', 'yyyy/MM/dd')
+ List<String> output4 =
processor2.transform("2024/08/15|yyyy/MM/dd|cloud|2|1|3", new HashMap<>());
+ Assert.assertEquals(1, output4.size());
+ Assert.assertEquals(output4.get(0), "result=2024-08-15");
+ }
+
+ @Test
+ public void testDateFormatFunction() throws Exception {
+ String transformSql = "select date_format(numeric1, string1) from
source";
+ TransformConfig config = new TransformConfig(transformSql);
+ TransformProcessor<String, String> processor1 = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case1: date_format(1722524216, 'yyyy-MM-dd HH:mm:ss')
+ List<String> output1 = processor1.transform("yyyy-MM-dd
HH:mm:ss|apple|cloud|1722524216|1|3", new HashMap<>());
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "result=2024-08-01 22:56:56");
+ // case2: date_format(1722524216, 'yyyy-MM-dd')
+ List<String> output2 =
processor1.transform("yyyy-MM-dd|apple|cloud|1722524216|1|3", new HashMap<>());
+ Assert.assertEquals(1, output2.size());
+ Assert.assertEquals(output2.get(0), "result=2024-08-01");
+ // case3: date_format(1722524216, 'yyyyMMddHHmmss')
+ List<String> output3 =
processor1.transform("yyyyMMddHHmmss|apple|cloud|1722524216|1|3", new
HashMap<>());
+ Assert.assertEquals(1, output3.size());
+ Assert.assertEquals(output3.get(0), "result=20240801225656");
+ // case1: date_format(1722524216, 'yyyy/MM/dd HH:mm:ss')
+ List<String> output4 = processor1.transform("yyyy/MM/dd
HH:mm:ss|apple|cloud|1722524216|1|3", new HashMap<>());
+ Assert.assertEquals(1, output4.size());
+ Assert.assertEquals(output4.get(0), "result=2024/08/01 22:56:56");
+ }
+}