This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b82f40f746 [INLONG-11154][SDK] Transform SQL supports TIMESTAMP
function (#11171)
b82f40f746 is described below
commit b82f40f74655d4bf143f726ac29148b2db037e7d
Author: Zkplo <[email protected]>
AuthorDate: Wed Sep 25 12:31:12 2024 +0800
[INLONG-11154][SDK] Transform SQL supports TIMESTAMP function (#11171)
Co-authored-by: ZKpLo <[email protected]>
---
.../process/function/TimestampFunction.java | 89 ++++++++++++++++++++++
.../sdk/transform/process/utils/DateUtil.java | 79 ++++++++++++++-----
.../function/temporal/TestTimestampFunction.java | 86 +++++++++++++++++++++
3 files changed, 233 insertions(+), 21 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampFunction.java
new file mode 100644
index 0000000000..698942badf
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampFunction.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+import org.apache.inlong.sdk.transform.process.utils.DateUtil;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+
+/**
+ * TimestampFunction -> timestamp(expr1[, expr2])
+ * description:
+ * - return NULL if expr1 or expr2 is NULL.
+ * - return the date or datetime expression expr as a datetime value if there
is only one parameter
+ * - return the result of the date or date time expression expr1 plus the time
expression expr2 if there are two parameters
+ */
+@TransformFunction(names = {"timestamp"})
+public class TimestampFunction implements ValueParser {
+
+ private ValueParser dateTimeExprParser;
+ private ValueParser timeExprParser;
+
+ public TimestampFunction(Function expr) {
+ List<Expression> expressions = expr.getParameters().getExpressions();
+ dateTimeExprParser = OperatorTools.buildParser(expressions.get(0));
+ if (expressions.size() == 2) {
+ timeExprParser = OperatorTools.buildParser(expressions.get(1));
+ }
+ }
+
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object dateTimeExprObj = dateTimeExprParser.parse(sourceData,
rowIndex, context);
+ if (dateTimeExprObj == null) {
+ return null;
+ }
+ String dateTimeStr = dateTimeExprObj.toString();
+ LocalDateTime localDateTime =
DateUtil.parseLocalDateTime(dateTimeExprObj.toString());
+ if (localDateTime == null) {
+ // Not meeting the format requirements
+ return null;
+ }
+ boolean hasMicroSecond = dateTimeStr.indexOf('.') != -1;
+ String formatStr = DateUtil.YEAR_TO_SECOND;
+ // Support the second parameter
+ if (timeExprParser != null) {
+ Object timeExprObj = timeExprParser.parse(sourceData, rowIndex,
context);
+ if (timeExprObj != null) {
+ String timeStr = timeExprObj.toString();
+ LocalTime localTime = DateUtil.parseLocalTime(timeStr);
+ if (localTime == null) {
+ // Not meeting the format requirements
+ return null;
+ }
+ hasMicroSecond |= timeStr.indexOf('.') != -1;
+ localDateTime = DateUtil.dateAdd(localDateTime, localTime);
+ } else {
+ return null;
+ }
+ }
+ if (hasMicroSecond) {
+ formatStr = DateUtil.YEAR_TO_MICRO;
+ }
+ return localDateTime.format(DateUtil.getDateTimeFormatter(formatStr));
+ }
+}
\ No newline at end of file
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java
index 998f838cf2..e703bb543f 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java
@@ -24,19 +24,28 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoField;
-import java.util.Arrays;
-import java.util.List;
+import java.util.LinkedHashMap;
import java.util.Map;
public class DateUtil {
// Need to follow this order
- private static final List<DateTimeFormatter> DATE_TIME_FORMATTER_LIST =
Arrays.asList(
- DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"),
- DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"),
- DateTimeFormatter.ofPattern("yyyy-MM-dd"));
- private static final List<DateTimeFormatter> TIME_FORMATTER_LIST =
Arrays.asList(
- DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS"),
DateTimeFormatter.ofPattern("HH:mm:ss"));
+ private static final Map<String, DateTimeFormatter>
DATE_TIME_FORMATTER_MAP = new LinkedHashMap<>();
+ private static final Map<String, DateTimeFormatter> TIME_FORMATTER_MAP =
new LinkedHashMap<>();
+ public static String YEAR_TO_MICRO = "yyyy-MM-dd HH:mm:ss.SSSSSS";
+ public static String YEAR_TO_SECOND = "yyyy-MM-dd HH:mm:ss";
+ public static String YEAR_TO_MONTH = "yyyy-MM-dd";
+ public static String HOUR_TO_MICRO = "HH:mm:ss.SSSSSS";
+ public static String HOUR_TO_SECOND = "HH:mm:ss";
+
+ static {
+ DATE_TIME_FORMATTER_MAP.put(YEAR_TO_MICRO,
DateTimeFormatter.ofPattern(YEAR_TO_MICRO));
+ DATE_TIME_FORMATTER_MAP.put(YEAR_TO_SECOND,
DateTimeFormatter.ofPattern(YEAR_TO_SECOND));
+ DATE_TIME_FORMATTER_MAP.put(YEAR_TO_MONTH,
DateTimeFormatter.ofPattern(YEAR_TO_MONTH));
+
+ TIME_FORMATTER_MAP.put(HOUR_TO_MICRO,
DateTimeFormatter.ofPattern(HOUR_TO_MICRO));
+ TIME_FORMATTER_MAP.put(HOUR_TO_SECOND,
DateTimeFormatter.ofPattern(HOUR_TO_SECOND));
+ }
/**
* Time calculation
@@ -56,8 +65,27 @@ public class DateUtil {
return null;
}
- Object dateParserObj = null;
- for (DateTimeFormatter dateTimeFormatter : DATE_TIME_FORMATTER_LIST) {
+ Object dateParserObj = parseLocalDateTime(dateStr);
+ if (dateParserObj != null) {
+ return addDateTime(intervalPair, sign, (LocalDateTime)
dateParserObj, dateStr);
+ }
+ dateParserObj = parseLocalTime(dateStr);
+ if (dateParserObj != null) {
+ return addTime(intervalPair, sign, (LocalTime) dateParserObj,
dateStr);
+ }
+ return null;
+ }
+
+ public static LocalDateTime dateAdd(LocalDateTime localDateTime, LocalTime
localTime) {
+ return localDateTime.plusHours(localTime.getHour())
+ .plusMinutes(localTime.getMinute())
+ .plusSeconds(localTime.getSecond())
+ .plusNanos(localTime.getNano());
+ }
+
+ public static LocalDateTime parseLocalDateTime(String dateStr) {
+ LocalDateTime dateParserObj = null;
+ for (DateTimeFormatter dateTimeFormatter :
DATE_TIME_FORMATTER_MAP.values()) {
try {
dateParserObj = LocalDateTime.parse(dateStr,
dateTimeFormatter);
} catch (Exception e) {
@@ -68,24 +96,35 @@ public class DateUtil {
}
}
if (dateParserObj != null) {
- return addDateTime(intervalPair, sign, (LocalDateTime)
dateParserObj, dateStr);
+ return dateParserObj;
}
}
+ return null;
+ }
- for (DateTimeFormatter dateTimeFormatter : TIME_FORMATTER_LIST) {
+ public static LocalTime parseLocalTime(String dateStr) {
+ LocalTime dateParserObj = null;
+ for (DateTimeFormatter dateTimeFormatter :
TIME_FORMATTER_MAP.values()) {
try {
dateParserObj = LocalTime.parse(dateStr, dateTimeFormatter);
} catch (Exception ignored) {
}
if (dateParserObj != null) {
- return addTime(intervalPair, sign, (LocalTime) dateParserObj,
dateStr);
+ return dateParserObj;
}
}
-
return null;
}
+ public static DateTimeFormatter getDateTimeFormatter(String formatStr) {
+ DateTimeFormatter formatter = DATE_TIME_FORMATTER_MAP.get(formatStr);
+ if (formatter != null) {
+ return formatter;
+ }
+ return TIME_FORMATTER_MAP.get(formatStr);
+ }
+
private static String addDateTime(Pair<Integer, Map<ChronoField, Long>>
intervalPair, int sign,
LocalDateTime dateTime, String dataStr) {
int factor = intervalPair.getKey();
@@ -127,16 +166,14 @@ public class DateUtil {
return null;
}
}
-
- String result = dateTime.toLocalDate().toString();
if (hasTime) {
if (hasMicroSecond) {
- result += " " +
dateTime.toLocalTime().format(TIME_FORMATTER_LIST.get(0));
+ return
dateTime.format(DATE_TIME_FORMATTER_MAP.get(YEAR_TO_MICRO));
} else {
- result += " " +
dateTime.toLocalTime().format(TIME_FORMATTER_LIST.get(1));
+ return
dateTime.format(DATE_TIME_FORMATTER_MAP.get(YEAR_TO_SECOND));
}
}
- return result;
+ return dateTime.toLocalDate().toString();
}
private static String addTime(Pair<Integer, Map<ChronoField, Long>>
intervalPair, int sign, LocalTime time,
@@ -168,9 +205,9 @@ public class DateUtil {
}
if (hasMicroSecond) {
- return time.format(TIME_FORMATTER_LIST.get(0));
+ return time.format(TIME_FORMATTER_MAP.get(HOUR_TO_MICRO));
} else {
- return time.format(TIME_FORMATTER_LIST.get(1));
+ return time.format(TIME_FORMATTER_MAP.get(HOUR_TO_SECOND));
}
}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestTimestampFunction.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestTimestampFunction.java
new file mode 100644
index 0000000000..1efb66cd6b
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestTimestampFunction.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.temporal;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestTimestampFunction extends AbstractFunctionTemporalTestBase {
+
+ @Test
+ public void testTimestamp() throws Exception {
+ String transformSql = null;
+ TransformConfig config = null;
+ TransformProcessor<String, String> processor = null;
+ List<String> output = null;
+
+ transformSql = "select timestamp(string1,string2) from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case1: timestamp('2003-12-31 12:00:00.600000','12:00:00')
+ output = processor.transform("2003-12-31 12:00:00.600000|12:00:00",
new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=2004-01-01 00:00:00.600000",
output.get(0));
+
+ // case2: timestamp('2003-12-31 12:00:00','12:00:00.600000')
+ output = processor.transform("2003-12-31 12:00:00|12:00:00.600000",
new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=2004-01-01 00:00:00.600000",
output.get(0));
+
+ // case3: timestamp('2003-12-31','12:00:00.600000')
+ output = processor.transform("2003-12-31|12:00:00.600000", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=2003-12-31 12:00:00.600000",
output.get(0));
+
+ transformSql = "select timestamp(string1,stringx) from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case4: timestamp('2003-12-31 12:00:00.600000',null)
+ output = processor.transform("2003-12-31 12:00:00.600000|12:00:00",
new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=", output.get(0));
+
+ transformSql = "select timestamp(string1) from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case5: timestamp('2003-12-31 12:00:00')
+ output = processor.transform("2003-12-31 12:00:00", new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=2003-12-31 12:00:00", output.get(0));
+
+ // case6: timestamp('2003-12-31')
+ output = processor.transform("2003-12-31", new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=2003-12-31 00:00:00", output.get(0));
+
+ }
+}