This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d4df5eda31 [INLONG-11081][SDK] Transform SQL supports INTERVAL parse
(#11086)
d4df5eda31 is described below
commit d4df5eda31db2431da8afe391bc2f5b7ccc4418e
Author: Zkplo <[email protected]>
AuthorDate: Fri Sep 20 09:57:29 2024 +0800
[INLONG-11081][SDK] Transform SQL supports INTERVAL parse (#11086)
Co-authored-by: ZKpLo <[email protected]>
---
.../transform/process/parser/AdditionParser.java | 39 ++++-
.../transform/process/parser/IntervalParser.java | 154 ++++++++++++++++++
.../process/parser/SubtractionParser.java | 39 ++++-
.../sdk/transform/process/utils/DateUtil.java | 177 +++++++++++++++++++++
.../process/parser/AbstractParserTestBase.java | 5 +
.../process/parser/TestAdditionParser.java | 125 +++++++++++++++
.../process/parser/TestSubtractionParser.java | 125 +++++++++++++++
7 files changed, 650 insertions(+), 14 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
index 13536be30d..eea6ce7e17 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
@@ -20,14 +20,17 @@ package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.utils.DateUtil;
import net.sf.jsqlparser.expression.operators.arithmetic.Addition;
+import org.apache.commons.lang3.tuple.Pair;
import java.math.BigDecimal;
+import java.time.temporal.ChronoField;
+import java.util.Map;
/**
* AdditionParser
- *
*/
@TransformParser(values = Addition.class)
public class AdditionParser implements ValueParser {
@@ -41,16 +44,38 @@ public class AdditionParser implements ValueParser {
this.right = OperatorTools.buildParser(expr.getRightExpression());
}
- /**
- * parse
- * @param sourceData
- * @param rowIndex
- * @return
- */
@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ if (this.left instanceof IntervalParser && this.right instanceof
IntervalParser) {
+ return null;
+ } else if (this.left instanceof IntervalParser || this.right
instanceof IntervalParser) {
+ IntervalParser intervalParser = null;
+ ValueParser dateParser = null;
+ if (this.left instanceof IntervalParser) {
+ intervalParser = (IntervalParser) this.left;
+ dateParser = this.right;
+ } else {
+ intervalParser = (IntervalParser) this.right;
+ dateParser = this.left;
+ }
+ Object intervalPairObj = intervalParser.parse(sourceData,
rowIndex, context);
+ Object dateObj = dateParser.parse(sourceData, rowIndex, context);
+ if (intervalPairObj == null || dateObj == null) {
+ return null;
+ }
+ return DateUtil.dateAdd(OperatorTools.parseString(dateObj),
+ (Pair<Integer, Map<ChronoField, Long>>) intervalPairObj,
1);
+ } else {
+ return numericalOperation(sourceData, rowIndex, context);
+ }
+ }
+
+ private BigDecimal numericalOperation(SourceData sourceData, int rowIndex,
Context context) {
Object leftObj = this.left.parse(sourceData, rowIndex, context);
Object rightObj = this.right.parse(sourceData, rowIndex, context);
+ if (leftObj == null || rightObj == null) {
+ return null;
+ }
BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
return leftValue.add(rightValue);
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/IntervalParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/IntervalParser.java
new file mode 100644
index 0000000000..7266dcd63b
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/IntervalParser.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+
+import lombok.extern.slf4j.Slf4j;
+import net.sf.jsqlparser.expression.IntervalExpression;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoField;
+import java.time.temporal.TemporalAccessor;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * IntervalParser <-> INTERVAL expr unit ->
Pair(factor,Map(ChronoField,Count)):
+ * <p>
+ * `factor`:
+ * <p>
+ * 1) `expr` can accept strings starting with '-', representing the meaning
of subtraction.
+ * So the positive or negative sign of `factor` indicates whether `expr`
starts with a '-' or not.
+ * <p>
+ * 2) For units like WEEK and QUARTER, it is not easy to parse,
+ * so WEEK -> ( unit=DAY, adb(factor)=7 ); QUARTER -> ( unit=MONTH,
adb(factor)=3 ).
+ * <p>
+ * `Map(ChronoField,Count)`:
+ * <p>
+ * IntervalParser will automatically match the corresponding
DateTimeFormatter based on the input `expr`,
+ * Based on DateTimeFormatter, IntervalParser will parse the incoming units
and store them in a Map.
+ * <p>
+ *
+ * In addition,acceptable expression parsing and specifying parameters in two
ways:
+ * 1) interval rowName year -> expression
+ * 3) interval 2 year -> fixed parameter
+ */
+@Slf4j
+@TransformParser(values = IntervalExpression.class)
+public class IntervalParser implements ValueParser {
+
+ private final String intervalType;
+ private final ValueParser dateParser;
+ private final String parameter;
+
+ private static final List<ChronoField> CHRONO_FIELD_LIST =
Arrays.asList(ChronoField.YEAR,
+ ChronoField.MONTH_OF_YEAR,
+ ChronoField.DAY_OF_MONTH, ChronoField.HOUR_OF_DAY,
ChronoField.MINUTE_OF_HOUR, ChronoField.SECOND_OF_MINUTE,
+ ChronoField.MICRO_OF_SECOND);
+ private static final Map<String, DateTimeFormatter> DT_FORMATTER_MAP = new
ConcurrentHashMap<>();
+
+ static {
+ DT_FORMATTER_MAP.put("SECOND_MICROSECOND",
DateTimeFormatter.ofPattern("s.SSSSSS"));
+ DT_FORMATTER_MAP.put("MINUTE_MICROSECOND",
DateTimeFormatter.ofPattern("m:s.SSSSSS"));
+ DT_FORMATTER_MAP.put("MINUTE_SECOND",
DateTimeFormatter.ofPattern("m:s"));
+ DT_FORMATTER_MAP.put("HOUR_MICROSECOND",
DateTimeFormatter.ofPattern("H:m:s.SSSSSS"));
+ DT_FORMATTER_MAP.put("HOUR_SECOND",
DateTimeFormatter.ofPattern("H:m:s"));
+ DT_FORMATTER_MAP.put("HOUR_MINUTE",
DateTimeFormatter.ofPattern("H:m"));
+ DT_FORMATTER_MAP.put("DAY_MICROSECOND", DateTimeFormatter.ofPattern("d
H:m:s.SSSSSS"));
+ DT_FORMATTER_MAP.put("DAY_SECOND", DateTimeFormatter.ofPattern("d
H:m:s"));
+ DT_FORMATTER_MAP.put("DAY_MINUTE", DateTimeFormatter.ofPattern("d
H:m"));
+ DT_FORMATTER_MAP.put("DAY_HOUR", DateTimeFormatter.ofPattern("d H"));
+ DT_FORMATTER_MAP.put("YEAR_MONTH", DateTimeFormatter.ofPattern("y-M"));
+
+ DT_FORMATTER_MAP.put("MICROSECOND",
DateTimeFormatter.ofPattern("SSSSSS"));
+ DT_FORMATTER_MAP.put("SECOND", DateTimeFormatter.ofPattern("s"));
+ DT_FORMATTER_MAP.put("MINUTE", DateTimeFormatter.ofPattern("m"));
+ DT_FORMATTER_MAP.put("HOUR", DateTimeFormatter.ofPattern("H"));
+ DT_FORMATTER_MAP.put("DAY", DateTimeFormatter.ofPattern("d"));
+ DT_FORMATTER_MAP.put("MONTH", DateTimeFormatter.ofPattern("M"));
+ DT_FORMATTER_MAP.put("YEAR", DateTimeFormatter.ofPattern("y"));
+ }
+
+ public IntervalParser(IntervalExpression expr) {
+ intervalType = expr.getIntervalType().toUpperCase();
+ dateParser = OperatorTools.buildParser(expr.getExpression());
+ if (dateParser == null) {
+ parameter = expr.getParameter();
+ } else {
+ parameter = null;
+ }
+ }
+
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ DateTimeFormatter dateTimeFormatter =
DT_FORMATTER_MAP.get(intervalType);
+
+ String dataStr = parameter;
+ if (dateParser != null) {
+ Object dateObj = dateParser.parse(sourceData, rowIndex, context);
+ if (dateObj == null) {
+ return null;
+ }
+ dataStr = OperatorTools.parseString(dateObj);
+ }
+
+ int factor = 1;
+ if (dateTimeFormatter == null) {
+ if ("WEEK".equals(intervalType)) {
+ dateTimeFormatter = DT_FORMATTER_MAP.get("DAY");
+ factor = 7;
+ } else if ("QUARTER".equals(intervalType)) {
+ dateTimeFormatter = DT_FORMATTER_MAP.get("MONTH");
+ factor = 3;
+ } else {
+ return null;
+ }
+ }
+
+ try {
+ factor = dataStr.charAt(0) == '-' ? -factor : factor;
+ if (factor < 0) {
+ dataStr = dataStr.substring(1);
+ }
+ TemporalAccessor temporalAccessor =
dateTimeFormatter.parse(dataStr);
+ HashMap<ChronoField, Long> map = new HashMap<>();
+ for (ChronoField field : CHRONO_FIELD_LIST) {
+ try {
+ long num = temporalAccessor.getLong(field);
+ if (num == 0) {
+ continue;
+ }
+ map.put(field, temporalAccessor.getLong(field));
+ } catch (Exception ignored) {
+
+ }
+ }
+ return Pair.of(factor, map);
+ } catch (Exception e) {
+ log.error("Interval parse error", e);
+ return null;
+ }
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
index 140a1dc995..cf32f1694c 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
@@ -20,14 +20,17 @@ package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.utils.DateUtil;
import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction;
+import org.apache.commons.lang3.tuple.Pair;
import java.math.BigDecimal;
+import java.time.temporal.ChronoField;
+import java.util.Map;
/**
* SubtractionParser
- *
*/
@TransformParser(values = Subtraction.class)
public class SubtractionParser implements ValueParser {
@@ -41,16 +44,38 @@ public class SubtractionParser implements ValueParser {
this.right = OperatorTools.buildParser(expr.getRightExpression());
}
- /**
- * parse
- * @param sourceData
- * @param rowIndex
- * @return
- */
@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ if (this.left instanceof IntervalParser && this.right instanceof
IntervalParser) {
+ return null;
+ } else if (this.left instanceof IntervalParser || this.right
instanceof IntervalParser) {
+ IntervalParser intervalParser = null;
+ ValueParser dateParser = null;
+ if (this.left instanceof IntervalParser) {
+ intervalParser = (IntervalParser) this.left;
+ dateParser = this.right;
+ } else {
+ intervalParser = (IntervalParser) this.right;
+ dateParser = this.left;
+ }
+ Object intervalPairObj = intervalParser.parse(sourceData,
rowIndex, context);
+ Object dateObj = dateParser.parse(sourceData, rowIndex, context);
+ if (intervalPairObj == null || dateObj == null) {
+ return null;
+ }
+ return DateUtil.dateAdd(OperatorTools.parseString(dateObj),
+ (Pair<Integer, Map<ChronoField, Long>>) intervalPairObj,
-1);
+ } else {
+ return numericalOperation(sourceData, rowIndex, context);
+ }
+ }
+
+ private BigDecimal numericalOperation(SourceData sourceData, int rowIndex,
Context context) {
Object leftObj = this.left.parse(sourceData, rowIndex, context);
Object rightObj = this.right.parse(sourceData, rowIndex, context);
+ if (leftObj == null || rightObj == null) {
+ return null;
+ }
BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
return leftValue.subtract(rightValue);
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java
new file mode 100644
index 0000000000..998f838cf2
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.utils;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoField;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class DateUtil {
+
+ // Need to follow this order
+ private static final List<DateTimeFormatter> DATE_TIME_FORMATTER_LIST =
Arrays.asList(
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"),
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"),
+ DateTimeFormatter.ofPattern("yyyy-MM-dd"));
+ private static final List<DateTimeFormatter> TIME_FORMATTER_LIST =
Arrays.asList(
+ DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS"),
DateTimeFormatter.ofPattern("HH:mm:ss"));
+
+ /**
+ * Time calculation
+ *
+ * @param dateStr Time parameter string
+ * @param intervalPair Interval parsing results
+ * @param sign If the sign is positive or negative, it indicates
addition or subtraction
+ * @return Calculation result string
+ */
+ public static String dateAdd(String dateStr, Pair<Integer,
Map<ChronoField, Long>> intervalPair, int sign) {
+
+ if (sign < 0) {
+ sign = -1;
+ } else if (sign > 0) {
+ sign = 1;
+ } else {
+ return null;
+ }
+
+ Object dateParserObj = null;
+ for (DateTimeFormatter dateTimeFormatter : DATE_TIME_FORMATTER_LIST) {
+ try {
+ dateParserObj = LocalDateTime.parse(dateStr,
dateTimeFormatter);
+ } catch (Exception e) {
+ try {
+ dateParserObj = LocalDate.parse(dateStr,
dateTimeFormatter).atStartOfDay();
+ } catch (Exception ignored) {
+
+ }
+ }
+ if (dateParserObj != null) {
+ return addDateTime(intervalPair, sign, (LocalDateTime)
dateParserObj, dateStr);
+ }
+ }
+
+ for (DateTimeFormatter dateTimeFormatter : TIME_FORMATTER_LIST) {
+ try {
+ dateParserObj = LocalTime.parse(dateStr, dateTimeFormatter);
+ } catch (Exception ignored) {
+
+ }
+ if (dateParserObj != null) {
+ return addTime(intervalPair, sign, (LocalTime) dateParserObj,
dateStr);
+ }
+ }
+
+ return null;
+ }
+
+ private static String addDateTime(Pair<Integer, Map<ChronoField, Long>>
intervalPair, int sign,
+ LocalDateTime dateTime, String dataStr) {
+ int factor = intervalPair.getKey();
+ Map<ChronoField, Long> valueMap = intervalPair.getValue();
+
+ boolean hasTime = dataStr.indexOf(' ') != -1;
+ boolean hasMicroSecond = dataStr.indexOf('.') != -1;
+
+ for (ChronoField field : valueMap.keySet()) {
+ long amount = valueMap.get(field) * factor * sign;
+ switch (field) {
+ case MICRO_OF_SECOND:
+ hasTime = true;
+ hasMicroSecond = true;
+ dateTime = dateTime.plusNanos(amount * 1000L);
+ break;
+ case SECOND_OF_MINUTE:
+ hasTime = true;
+ dateTime = dateTime.plusSeconds(amount);
+ break;
+ case MINUTE_OF_HOUR:
+ hasTime = true;
+ dateTime = dateTime.plusMinutes(amount);
+ break;
+ case HOUR_OF_DAY:
+ hasTime = true;
+ dateTime = dateTime.plusHours(amount);
+ break;
+ case DAY_OF_MONTH:
+ dateTime = dateTime.plusDays(amount);
+ break;
+ case MONTH_OF_YEAR:
+ dateTime = dateTime.plusMonths(amount);
+ break;
+ case YEAR:
+ dateTime = dateTime.plusYears(amount);
+ break;
+ default:
+ return null;
+ }
+ }
+
+ String result = dateTime.toLocalDate().toString();
+ if (hasTime) {
+ if (hasMicroSecond) {
+ result += " " +
dateTime.toLocalTime().format(TIME_FORMATTER_LIST.get(0));
+ } else {
+ result += " " +
dateTime.toLocalTime().format(TIME_FORMATTER_LIST.get(1));
+ }
+ }
+ return result;
+ }
+
+ private static String addTime(Pair<Integer, Map<ChronoField, Long>>
intervalPair, int sign, LocalTime time,
+ String dataStr) {
+ int factor = intervalPair.getKey();
+ Map<ChronoField, Long> valueMap = intervalPair.getValue();
+
+ boolean hasMicroSecond = dataStr.indexOf('.') != -1;
+
+ for (ChronoField field : valueMap.keySet()) {
+ long amount = valueMap.get(field) * factor * sign;
+ switch (field) {
+ case MICRO_OF_SECOND:
+ hasMicroSecond = true;
+ time = time.plusNanos(amount * 1000L);
+ break;
+ case SECOND_OF_MINUTE:
+ time = time.plusSeconds(amount);
+ break;
+ case MINUTE_OF_HOUR:
+ time = time.plusMinutes(amount);
+ break;
+ case HOUR_OF_DAY:
+ time = time.plusHours(amount);
+ break;
+ default:
+ return null;
+ }
+ }
+
+ if (hasMicroSecond) {
+ return time.format(TIME_FORMATTER_LIST.get(0));
+ } else {
+ return time.format(TIME_FORMATTER_LIST.get(1));
+ }
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/AbstractParserTestBase.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/AbstractParserTestBase.java
index 2534bff096..4fcb436575 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/AbstractParserTestBase.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/AbstractParserTestBase.java
@@ -41,6 +41,11 @@ public abstract class AbstractParserTestBase {
field.setName("numeric" + i);
srcFields.add(field);
}
+ for (int i = 1; i < 5; i++) {
+ FieldInfo field = new FieldInfo();
+ field.setName("string" + i);
+ srcFields.add(field);
+ }
FieldInfo field = new FieldInfo();
field.setName("result");
dstFields.add(field);
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestAdditionParser.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestAdditionParser.java
new file mode 100644
index 0000000000..fb27cd4e34
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestAdditionParser.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestAdditionParser extends AbstractParserTestBase {
+
+ @Test
+ public void testAdditionParser() throws Exception {
+ String transformSql = null;
+ TransformConfig config = null;
+ TransformProcessor<String, String> processor = null;
+ List<String> output = null;
+
+ transformSql = "select string1 + INTERVAL string2 SECOND_MICROSECOND
from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case1: '1992-12-31 23:59:59' + INTERVAL 1.999999 SECOND_MICROSECOND
+ output = processor.transform("||||1992-12-31 23:59:59|1.999999", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1993-01-01 00:00:00.999999",
output.get(0));
+ // case1: '1992-12-31 23:59:59' + INTERVAL -1.999999 SECOND_MICROSECOND
+ output = processor.transform("||||1992-12-31 23:59:59|-1.999999", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1992-12-31 23:59:57.000001",
output.get(0));
+
+ // case2: '1992-12-31' + INTERVAL 1.999999 SECOND_MICROSECOND
+ output = processor.transform("||||1992-12-31|1.999999", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1992-12-31 00:00:01.999999",
output.get(0));
+ // case2: '1992-12-31' + INTERVAL -1.999999 SECOND_MICROSECOND
+ output = processor.transform("||||1992-12-31|-1.999999", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1992-12-30 23:59:58.000001",
output.get(0));
+
+ transformSql = "select string1 + INTERVAL string2 YEAR from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case3: '1992-12-31 23:59:59' + INTERVAL 1 YEAR
+ output = processor.transform("||||1992-12-31 23:59:59|1", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1993-12-31 23:59:59", output.get(0));
+ // case3: '1992-12-31 23:59:59' + INTERVAL -1 YEAR
+ output = processor.transform("||||1992-12-31 23:59:59|-1", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1991-12-31 23:59:59", output.get(0));
+
+ // case4: '23:59:59' + INTERVAL 1 YEAR
+ output = processor.transform("||||23:59:59|1", new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=null", output.get(0));
+ // case4: '23:59:59' + INTERVAL -1 YEAR
+ output = processor.transform("||||23:59:59|-1", new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=null", output.get(0));
+
+ transformSql = "select string1 + INTERVAL string2 WEEK from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case5: '1992-12-31 23:59:59' + INTERVAL 13 WEEK
+ output = processor.transform("||||1992-12-31 23:59:59|13", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1993-04-01 23:59:59", output.get(0));
+ // case5: '1992-12-31 23:59:59' + INTERVAL -13 WEEK
+ output = processor.transform("||||1992-12-31 23:59:59|-13", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1992-10-01 23:59:59", output.get(0));
+
+ transformSql = "select string1 + INTERVAL string2 QUARTER from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case6: '1992-12-31 23:59:59' + INTERVAL 13 QUARTER
+ output = processor.transform("||||1992-12-31 23:59:59|13", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1996-03-31 23:59:59", output.get(0));
+ // case6: '1992-12-31 23:59:59' + INTERVAL -13 QUARTER
+ output = processor.transform("||||1992-12-31 23:59:59|-13", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1989-09-30 23:59:59", output.get(0));
+
+ transformSql = "select string1 + INTERVAL xxd QUARTER from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case7: '1992-12-31 23:59:59' + INTERVAL null QUARTER
+ output = processor.transform("||||1992-12-31 23:59:59|13", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=null", output.get(0));
+
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestSubtractionParser.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestSubtractionParser.java
new file mode 100644
index 0000000000..b7bd4cfb8a
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestSubtractionParser.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestSubtractionParser extends AbstractParserTestBase {
+
+ @Test
+ public void testSubtractionParser() throws Exception {
+ String transformSql = null;
+ TransformConfig config = null;
+ TransformProcessor<String, String> processor = null;
+ List<String> output = null;
+
+ transformSql = "select string1 - INTERVAL string2 SECOND_MICROSECOND
from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case1: '1992-12-31 23:59:59' - INTERVAL 1.999999 SECOND_MICROSECOND
+ output = processor.transform("||||1992-12-31 23:59:59|1.999999", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1992-12-31 23:59:57.000001",
output.get(0));
+ // case1: '1992-12-31 23:59:59' - INTERVAL -1.999999 SECOND_MICROSECOND
+ output = processor.transform("||||1992-12-31 23:59:59|-1.999999", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1993-01-01 00:00:00.999999",
output.get(0));
+
+ // case2: '1992-12-31' - INTERVAL 1.999999 SECOND_MICROSECOND
+ output = processor.transform("||||1992-12-31|1.999999", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1992-12-30 23:59:58.000001",
output.get(0));
+ // case2: '1992-12-31' - INTERVAL -1.999999 SECOND_MICROSECOND
+ output = processor.transform("||||1992-12-31|-1.999999", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1992-12-31 00:00:01.999999",
output.get(0));
+
+ transformSql = "select string1 - INTERVAL string2 YEAR from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case3: '1992-12-31 23:59:59' - INTERVAL 1 YEAR
+ output = processor.transform("||||1992-12-31 23:59:59|1", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1991-12-31 23:59:59", output.get(0));
+ // case3: '1992-12-31 23:59:59' - INTERVAL -1 YEAR
+ output = processor.transform("||||1992-12-31 23:59:59|-1", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1993-12-31 23:59:59", output.get(0));
+
+ // case4: '23:59:59' - INTERVAL 1 YEAR
+ output = processor.transform("||||23:59:59|1", new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=null", output.get(0));
+ // case4: '23:59:59' - INTERVAL -1 YEAR
+ output = processor.transform("||||23:59:59|-1", new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=null", output.get(0));
+
+ transformSql = "select string1 - INTERVAL string2 WEEK from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case5: '1992-12-31 23:59:59' - INTERVAL 13 WEEK
+ output = processor.transform("||||1992-12-31 23:59:59|13", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1992-10-01 23:59:59", output.get(0));
+ // case5: '1992-12-31 23:59:59' - INTERVAL -13 WEEK
+ output = processor.transform("||||1992-12-31 23:59:59|-13", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1993-04-01 23:59:59", output.get(0));
+
+ transformSql = "select string1 - INTERVAL string2 QUARTER from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case6: '1992-12-31 23:59:59' - INTERVAL 13 QUARTER
+ output = processor.transform("||||1992-12-31 23:59:59|13", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1989-09-30 23:59:59", output.get(0));
+ // case6: '1992-12-31 23:59:59' - INTERVAL -13 QUARTER
+ output = processor.transform("||||1992-12-31 23:59:59|-13", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1996-03-31 23:59:59", output.get(0));
+
+ transformSql = "select string1 - INTERVAL xxd QUARTER from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case7: '1992-12-31 23:59:59' - INTERVAL null QUARTER
+ output = processor.transform("||||1992-12-31 23:59:59|13", new
HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=null", output.get(0));
+
+ }
+}