This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b82f40f746 [INLONG-11154][SDK] Transform SQL supports TIMESTAMP 
function (#11171)
b82f40f746 is described below

commit b82f40f74655d4bf143f726ac29148b2db037e7d
Author: Zkplo <[email protected]>
AuthorDate: Wed Sep 25 12:31:12 2024 +0800

    [INLONG-11154][SDK] Transform SQL supports TIMESTAMP function (#11171)
    
    Co-authored-by: ZKpLo <[email protected]>
---
 .../process/function/TimestampFunction.java        | 89 ++++++++++++++++++++++
 .../sdk/transform/process/utils/DateUtil.java      | 79 ++++++++++++++-----
 .../function/temporal/TestTimestampFunction.java   | 86 +++++++++++++++++++++
 3 files changed, 233 insertions(+), 21 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampFunction.java
new file mode 100644
index 0000000000..698942badf
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampFunction.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+import org.apache.inlong.sdk.transform.process.utils.DateUtil;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+
+/**
+ * TimestampFunction  -> timestamp(expr1[, expr2])
+ * description:
+ * - return NULL if expr1 or expr2 is NULL.
+ * - return the date or datetime expression expr as a datetime value if there 
is only one parameter
+ * - return the result of the date or date time expression expr1 plus the time 
expression expr2 if there are two parameters
+ */
+@TransformFunction(names = {"timestamp"})
+public class TimestampFunction implements ValueParser {
+
+    private ValueParser dateTimeExprParser;
+    private ValueParser timeExprParser;
+
+    public TimestampFunction(Function expr) {
+        List<Expression> expressions = expr.getParameters().getExpressions();
+        dateTimeExprParser = OperatorTools.buildParser(expressions.get(0));
+        if (expressions.size() == 2) {
+            timeExprParser = OperatorTools.buildParser(expressions.get(1));
+        }
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object dateTimeExprObj = dateTimeExprParser.parse(sourceData, 
rowIndex, context);
+        if (dateTimeExprObj == null) {
+            return null;
+        }
+        String dateTimeStr = dateTimeExprObj.toString();
+        LocalDateTime localDateTime = 
DateUtil.parseLocalDateTime(dateTimeExprObj.toString());
+        if (localDateTime == null) {
+            // Not meeting the format requirements
+            return null;
+        }
+        boolean hasMicroSecond = dateTimeStr.indexOf('.') != -1;
+        String formatStr = DateUtil.YEAR_TO_SECOND;
+        // Support the second parameter
+        if (timeExprParser != null) {
+            Object timeExprObj = timeExprParser.parse(sourceData, rowIndex, 
context);
+            if (timeExprObj != null) {
+                String timeStr = timeExprObj.toString();
+                LocalTime localTime = DateUtil.parseLocalTime(timeStr);
+                if (localTime == null) {
+                    // Not meeting the format requirements
+                    return null;
+                }
+                hasMicroSecond |= timeStr.indexOf('.') != -1;
+                localDateTime = DateUtil.dateAdd(localDateTime, localTime);
+            } else {
+                return null;
+            }
+        }
+        if (hasMicroSecond) {
+            formatStr = DateUtil.YEAR_TO_MICRO;
+        }
+        return localDateTime.format(DateUtil.getDateTimeFormatter(formatStr));
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java
index 998f838cf2..e703bb543f 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java
@@ -24,19 +24,28 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.format.DateTimeFormatter;
 import java.time.temporal.ChronoField;
-import java.util.Arrays;
-import java.util.List;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 public class DateUtil {
 
     // Need to follow this order
-    private static final List<DateTimeFormatter> DATE_TIME_FORMATTER_LIST = 
Arrays.asList(
-            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"),
-            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"),
-            DateTimeFormatter.ofPattern("yyyy-MM-dd"));
-    private static final List<DateTimeFormatter> TIME_FORMATTER_LIST = 
Arrays.asList(
-            DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS"), 
DateTimeFormatter.ofPattern("HH:mm:ss"));
+    private static final Map<String, DateTimeFormatter> 
DATE_TIME_FORMATTER_MAP = new LinkedHashMap<>();
+    private static final Map<String, DateTimeFormatter> TIME_FORMATTER_MAP = 
new LinkedHashMap<>();
+    public static String YEAR_TO_MICRO = "yyyy-MM-dd HH:mm:ss.SSSSSS";
+    public static String YEAR_TO_SECOND = "yyyy-MM-dd HH:mm:ss";
+    public static String YEAR_TO_MONTH = "yyyy-MM-dd";
+    public static String HOUR_TO_MICRO = "HH:mm:ss.SSSSSS";
+    public static String HOUR_TO_SECOND = "HH:mm:ss";
+
+    static {
+        DATE_TIME_FORMATTER_MAP.put(YEAR_TO_MICRO, 
DateTimeFormatter.ofPattern(YEAR_TO_MICRO));
+        DATE_TIME_FORMATTER_MAP.put(YEAR_TO_SECOND, 
DateTimeFormatter.ofPattern(YEAR_TO_SECOND));
+        DATE_TIME_FORMATTER_MAP.put(YEAR_TO_MONTH, 
DateTimeFormatter.ofPattern(YEAR_TO_MONTH));
+
+        TIME_FORMATTER_MAP.put(HOUR_TO_MICRO, 
DateTimeFormatter.ofPattern(HOUR_TO_MICRO));
+        TIME_FORMATTER_MAP.put(HOUR_TO_SECOND, 
DateTimeFormatter.ofPattern(HOUR_TO_SECOND));
+    }
 
     /**
      * Time calculation
@@ -56,8 +65,27 @@ public class DateUtil {
             return null;
         }
 
-        Object dateParserObj = null;
-        for (DateTimeFormatter dateTimeFormatter : DATE_TIME_FORMATTER_LIST) {
+        Object dateParserObj = parseLocalDateTime(dateStr);
+        if (dateParserObj != null) {
+            return addDateTime(intervalPair, sign, (LocalDateTime) 
dateParserObj, dateStr);
+        }
+        dateParserObj = parseLocalTime(dateStr);
+        if (dateParserObj != null) {
+            return addTime(intervalPair, sign, (LocalTime) dateParserObj, 
dateStr);
+        }
+        return null;
+    }
+
+    public static LocalDateTime dateAdd(LocalDateTime localDateTime, LocalTime 
localTime) {
+        return localDateTime.plusHours(localTime.getHour())
+                .plusMinutes(localTime.getMinute())
+                .plusSeconds(localTime.getSecond())
+                .plusNanos(localTime.getNano());
+    }
+
+    public static LocalDateTime parseLocalDateTime(String dateStr) {
+        LocalDateTime dateParserObj = null;
+        for (DateTimeFormatter dateTimeFormatter : 
DATE_TIME_FORMATTER_MAP.values()) {
             try {
                 dateParserObj = LocalDateTime.parse(dateStr, 
dateTimeFormatter);
             } catch (Exception e) {
@@ -68,24 +96,35 @@ public class DateUtil {
                 }
             }
             if (dateParserObj != null) {
-                return addDateTime(intervalPair, sign, (LocalDateTime) 
dateParserObj, dateStr);
+                return dateParserObj;
             }
         }
+        return null;
+    }
 
-        for (DateTimeFormatter dateTimeFormatter : TIME_FORMATTER_LIST) {
+    public static LocalTime parseLocalTime(String dateStr) {
+        LocalTime dateParserObj = null;
+        for (DateTimeFormatter dateTimeFormatter : 
TIME_FORMATTER_MAP.values()) {
             try {
                 dateParserObj = LocalTime.parse(dateStr, dateTimeFormatter);
             } catch (Exception ignored) {
 
             }
             if (dateParserObj != null) {
-                return addTime(intervalPair, sign, (LocalTime) dateParserObj, 
dateStr);
+                return dateParserObj;
             }
         }
-
         return null;
     }
 
+    public static DateTimeFormatter getDateTimeFormatter(String formatStr) {
+        DateTimeFormatter formatter = DATE_TIME_FORMATTER_MAP.get(formatStr);
+        if (formatter != null) {
+            return formatter;
+        }
+        return TIME_FORMATTER_MAP.get(formatStr);
+    }
+
     private static String addDateTime(Pair<Integer, Map<ChronoField, Long>> 
intervalPair, int sign,
             LocalDateTime dateTime, String dataStr) {
         int factor = intervalPair.getKey();
@@ -127,16 +166,14 @@ public class DateUtil {
                     return null;
             }
         }
-
-        String result = dateTime.toLocalDate().toString();
         if (hasTime) {
             if (hasMicroSecond) {
-                result += " " + 
dateTime.toLocalTime().format(TIME_FORMATTER_LIST.get(0));
+                return 
dateTime.format(DATE_TIME_FORMATTER_MAP.get(YEAR_TO_MICRO));
             } else {
-                result += " " + 
dateTime.toLocalTime().format(TIME_FORMATTER_LIST.get(1));
+                return 
dateTime.format(DATE_TIME_FORMATTER_MAP.get(YEAR_TO_SECOND));
             }
         }
-        return result;
+        return dateTime.toLocalDate().toString();
     }
 
     private static String addTime(Pair<Integer, Map<ChronoField, Long>> 
intervalPair, int sign, LocalTime time,
@@ -168,9 +205,9 @@ public class DateUtil {
         }
 
         if (hasMicroSecond) {
-            return time.format(TIME_FORMATTER_LIST.get(0));
+            return time.format(TIME_FORMATTER_MAP.get(HOUR_TO_MICRO));
         } else {
-            return time.format(TIME_FORMATTER_LIST.get(1));
+            return time.format(TIME_FORMATTER_MAP.get(HOUR_TO_SECOND));
         }
     }
 
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestTimestampFunction.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestTimestampFunction.java
new file mode 100644
index 0000000000..1efb66cd6b
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestTimestampFunction.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.temporal;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestTimestampFunction extends AbstractFunctionTemporalTestBase {
+
+    @Test
+    public void testTimestamp() throws Exception {
+        String transformSql = null;
+        TransformConfig config = null;
+        TransformProcessor<String, String> processor = null;
+        List<String> output = null;
+
+        transformSql = "select timestamp(string1,string2) from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case1: timestamp('2003-12-31 12:00:00.600000','12:00:00')
+        output = processor.transform("2003-12-31 12:00:00.600000|12:00:00", 
new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=2004-01-01 00:00:00.600000", 
output.get(0));
+
+        // case2: timestamp('2003-12-31 12:00:00','12:00:00.600000')
+        output = processor.transform("2003-12-31 12:00:00|12:00:00.600000", 
new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=2004-01-01 00:00:00.600000", 
output.get(0));
+
+        // case3: timestamp('2003-12-31','12:00:00.600000')
+        output = processor.transform("2003-12-31|12:00:00.600000", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=2003-12-31 12:00:00.600000", 
output.get(0));
+
+        transformSql = "select timestamp(string1,stringx) from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case4: timestamp('2003-12-31 12:00:00.600000',null)
+        output = processor.transform("2003-12-31 12:00:00.600000|12:00:00", 
new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=", output.get(0));
+
+        transformSql = "select timestamp(string1) from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case5: timestamp('2003-12-31 12:00:00')
+        output = processor.transform("2003-12-31 12:00:00", new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=2003-12-31 12:00:00", output.get(0));
+
+        // case6: timestamp('2003-12-31')
+        output = processor.transform("2003-12-31", new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=2003-12-31 00:00:00", output.get(0));
+
+    }
+}

Reply via email to