This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ae49243a5a [INLONG-10129][SDK] Transform SQL support +-*/ operations
(#10133)
ae49243a5a is described below
commit ae49243a5a4cbf512acfc0c391ac9995e68b2879
Author: 卢春亮 <[email protected]>
AuthorDate: Tue May 7 19:10:17 2024 +0800
[INLONG-10129][SDK] Transform SQL support +-*/ operations (#10133)
---
.../sdk/transform/process/TransformProcessor.java | 13 +++-
.../process/operator/EqualsToOperator.java | 5 +-
.../operator/GreaterThanEqualsOperator.java | 3 +-
.../process/operator/GreaterThanOperator.java | 3 +-
.../process/operator/MinorThanEqualsOperator.java | 3 +-
.../process/operator/MinorThanOperator.java | 3 +-
.../process/operator/NotEqualsToOperator.java | 5 +-
.../transform/process/operator/OperatorTools.java | 70 +++++++++++++++++++++-
.../AdditionParser.java} | 27 +++++----
.../DivisionParser.java} | 27 +++++----
.../MultiplicationParser.java} | 27 +++++----
.../ParenthesisParser.java} | 26 ++++----
.../SubtractionParser.java} | 27 +++++----
.../transform/process/TestTransformProcessor.java | 26 ++++++++
14 files changed, 191 insertions(+), 74 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index 23ca6644fa..c979ef71c4 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -52,6 +52,8 @@ import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;
import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.StringReader;
import java.nio.charset.Charset;
@@ -67,6 +69,8 @@ import java.util.Map.Entry;
*/
public class TransformProcessor {
+ private static final Logger LOG =
LoggerFactory.getLogger(TransformProcessor.class);
+
private TransformConfig config;
private SourceDecoder decoder;
private SinkEncoder encoder;
@@ -166,8 +170,13 @@ public class TransformProcessor {
SinkData sinkData = new DefaultSinkData();
for (Entry<String, ValueParser> entry :
this.selectItemMap.entrySet()) {
String fieldName = entry.getKey();
- Object fieldValue = entry.getValue().parse(sourceData, i);
- sinkData.putField(fieldName, String.valueOf(fieldValue));
+ try {
+ Object fieldValue = entry.getValue().parse(sourceData, i);
+ sinkData.putField(fieldName, String.valueOf(fieldValue));
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ sinkData.putField(fieldName, "");
+ }
}
sinkDatas.add(this.encoder.encode(sinkData));
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
index 3172626000..6910e0c9ca 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
@@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
-import org.apache.commons.lang.ObjectUtils;
/**
* EqualsToOperator
@@ -43,9 +42,11 @@ public class EqualsToOperator implements ExpressionOperator {
* @param rowIndex
* @return
*/
+ @SuppressWarnings("rawtypes")
@Override
public boolean check(SourceData sourceData, int rowIndex) {
- return ObjectUtils.equals(this.left.parse(sourceData, rowIndex),
this.right.parse(sourceData, rowIndex));
+ return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex),
+ (Comparable) this.right.parse(sourceData, rowIndex)) == 0;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
index 07da9d79c2..eb7689932e 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
@@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
-import org.apache.commons.lang.ObjectUtils;
/**
* GreaterThanEqualsOperator
@@ -46,7 +45,7 @@ public class GreaterThanEqualsOperator implements
ExpressionOperator {
@SuppressWarnings("rawtypes")
@Override
public boolean check(SourceData sourceData, int rowIndex) {
- return ObjectUtils.compare((Comparable) this.left.parse(sourceData,
rowIndex),
+ return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex),
(Comparable) this.right.parse(sourceData, rowIndex)) >= 0;
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
index 3b2158d96b..e0db44b1e3 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
@@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
-import org.apache.commons.lang.ObjectUtils;
/**
* GreaterThanOperator
@@ -46,7 +45,7 @@ public class GreaterThanOperator implements
ExpressionOperator {
@SuppressWarnings("rawtypes")
@Override
public boolean check(SourceData sourceData, int rowIndex) {
- return ObjectUtils.compare((Comparable) this.left.parse(sourceData,
rowIndex),
+ return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex),
(Comparable) this.right.parse(sourceData, rowIndex)) > 0;
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
index fec4ed8019..8b3628ddb7 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
@@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
-import org.apache.commons.lang.ObjectUtils;
/**
* MinorThanEqualsOperator
@@ -46,7 +45,7 @@ public class MinorThanEqualsOperator implements
ExpressionOperator {
@SuppressWarnings("rawtypes")
@Override
public boolean check(SourceData sourceData, int rowIndex) {
- return ObjectUtils.compare((Comparable) this.left.parse(sourceData,
rowIndex),
+ return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex),
(Comparable) this.right.parse(sourceData, rowIndex)) <= 0;
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
index 5d9db7dd9c..17baa9cb17 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
@@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import net.sf.jsqlparser.expression.operators.relational.MinorThan;
-import org.apache.commons.lang.ObjectUtils;
/**
* MinorThanOperator
@@ -46,7 +45,7 @@ public class MinorThanOperator implements ExpressionOperator {
@SuppressWarnings("rawtypes")
@Override
public boolean check(SourceData sourceData, int rowIndex) {
- return ObjectUtils.compare((Comparable) this.left.parse(sourceData,
rowIndex),
+ return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex),
(Comparable) this.right.parse(sourceData, rowIndex)) < 0;
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
index 9c58e70476..dbe185dec5 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
@@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
-import org.apache.commons.lang.ObjectUtils;
/**
* NotEqualsToOperator
@@ -43,9 +42,11 @@ public class NotEqualsToOperator implements
ExpressionOperator {
* @param rowIndex
* @return
*/
+ @SuppressWarnings("rawtypes")
@Override
public boolean check(SourceData sourceData, int rowIndex) {
- return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex),
this.right.parse(sourceData, rowIndex));
+ return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex),
+ (Comparable) this.right.parse(sourceData, rowIndex)) != 0;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
index 8afe2f0c74..361ce1aec3 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
@@ -17,9 +17,14 @@
package org.apache.inlong.sdk.transform.process.operator;
+import org.apache.inlong.sdk.transform.process.parser.AdditionParser;
import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
+import org.apache.inlong.sdk.transform.process.parser.DivisionParser;
import org.apache.inlong.sdk.transform.process.parser.LongParser;
+import org.apache.inlong.sdk.transform.process.parser.MultiplicationParser;
+import org.apache.inlong.sdk.transform.process.parser.ParenthesisParser;
import org.apache.inlong.sdk.transform.process.parser.StringParser;
+import org.apache.inlong.sdk.transform.process.parser.SubtractionParser;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import net.sf.jsqlparser.expression.Expression;
@@ -28,6 +33,10 @@ import net.sf.jsqlparser.expression.LongValue;
import net.sf.jsqlparser.expression.NotExpression;
import net.sf.jsqlparser.expression.Parenthesis;
import net.sf.jsqlparser.expression.StringValue;
+import net.sf.jsqlparser.expression.operators.arithmetic.Addition;
+import net.sf.jsqlparser.expression.operators.arithmetic.Division;
+import net.sf.jsqlparser.expression.operators.arithmetic.Multiplication;
+import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction;
import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
@@ -37,6 +46,9 @@ import
net.sf.jsqlparser.expression.operators.relational.MinorThan;
import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
import net.sf.jsqlparser.schema.Column;
+import org.apache.commons.lang.ObjectUtils;
+
+import java.math.BigDecimal;
/**
* OperatorTools
@@ -44,6 +56,10 @@ import net.sf.jsqlparser.schema.Column;
*/
public class OperatorTools {
+ public static final String ROOT_KEY = "$root";
+
+ public static final String CHILD_KEY = "$child";
+
public static ExpressionOperator buildOperator(Expression expr) {
if (expr instanceof AndExpression) {
return new AndOperator((AndExpression) expr);
@@ -76,9 +92,61 @@ public class OperatorTools {
return new StringParser((StringValue) expr);
} else if (expr instanceof LongValue) {
return new LongParser((LongValue) expr);
+ } else if (expr instanceof Parenthesis) {
+ return new ParenthesisParser((Parenthesis) expr);
+ } else if (expr instanceof Addition) {
+ return new AdditionParser((Addition) expr);
+ } else if (expr instanceof Subtraction) {
+ return new SubtractionParser((Subtraction) expr);
+ } else if (expr instanceof Multiplication) {
+ return new MultiplicationParser((Multiplication) expr);
+ } else if (expr instanceof Division) {
+ return new DivisionParser((Division) expr);
} else if (expr instanceof Function) {
- return new ColumnParser((Function) expr);
+ String exprString = expr.toString();
+ if (exprString.startsWith(ROOT_KEY) ||
exprString.startsWith(CHILD_KEY)) {
+ return new ColumnParser((Function) expr);
+ } else {
+ // TODO
+ }
}
return null;
}
+
+ /**
+ * parseBigDecimal
+ * @param value
+ * @return
+ */
+ public static BigDecimal parseBigDecimal(Object value) {
+ if (value instanceof BigDecimal) {
+ return (BigDecimal) value;
+ } else {
+ return new BigDecimal(String.valueOf(value));
+ }
+ }
+
+ /**
+ * compareValue
+ * @param value
+ * @return
+ */
+ @SuppressWarnings("rawtypes")
+ public static int compareValue(Comparable left, Comparable right) {
+ if (left instanceof String) {
+ if (right instanceof String) {
+ return ObjectUtils.compare(left, right);
+ } else {
+ BigDecimal leftValue = parseBigDecimal(left);
+ return ObjectUtils.compare(leftValue, right);
+ }
+ } else {
+ if (right instanceof String) {
+ BigDecimal rightValue = parseBigDecimal(right);
+ return ObjectUtils.compare(left, rightValue);
+ } else {
+ return ObjectUtils.compare(left, right);
+ }
+ }
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
similarity index 62%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
index 9c58e70476..a0f03ab4cd 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
@@ -15,37 +15,42 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.transform.process.operator;
+package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
-import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
-import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
-import org.apache.commons.lang.ObjectUtils;
+import net.sf.jsqlparser.expression.operators.arithmetic.Addition;
+
+import java.math.BigDecimal;
/**
- * NotEqualsToOperator
+ * AdditionParser
*
*/
-public class NotEqualsToOperator implements ExpressionOperator {
+public class AdditionParser implements ValueParser {
private ValueParser left;
+
private ValueParser right;
- public NotEqualsToOperator(NotEqualsTo expr) {
+ public AdditionParser(Addition expr) {
this.left = OperatorTools.buildParser(expr.getLeftExpression());
this.right = OperatorTools.buildParser(expr.getRightExpression());
}
/**
- * check
+ * parse
* @param sourceData
* @param rowIndex
* @return
*/
@Override
- public boolean check(SourceData sourceData, int rowIndex) {
- return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex),
this.right.parse(sourceData, rowIndex));
+ public Object parse(SourceData sourceData, int rowIndex) {
+ Object leftObj = this.left.parse(sourceData, rowIndex);
+ Object rightObj = this.right.parse(sourceData, rowIndex);
+ BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
+ BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
+ return leftValue.add(rightValue);
}
-
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java
similarity index 61%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java
index 9c58e70476..5dc94b6e99 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java
@@ -15,37 +15,42 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.transform.process.operator;
+package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
-import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
-import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
-import org.apache.commons.lang.ObjectUtils;
+import net.sf.jsqlparser.expression.operators.arithmetic.Division;
+
+import java.math.BigDecimal;
/**
- * NotEqualsToOperator
+ * DivisionParser
*
*/
-public class NotEqualsToOperator implements ExpressionOperator {
+public class DivisionParser implements ValueParser {
private ValueParser left;
+
private ValueParser right;
- public NotEqualsToOperator(NotEqualsTo expr) {
+ public DivisionParser(Division expr) {
this.left = OperatorTools.buildParser(expr.getLeftExpression());
this.right = OperatorTools.buildParser(expr.getRightExpression());
}
/**
- * check
+ * parse
* @param sourceData
* @param rowIndex
* @return
*/
@Override
- public boolean check(SourceData sourceData, int rowIndex) {
- return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex),
this.right.parse(sourceData, rowIndex));
+ public Object parse(SourceData sourceData, int rowIndex) {
+ Object leftObj = this.left.parse(sourceData, rowIndex);
+ Object rightObj = this.right.parse(sourceData, rowIndex);
+ BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
+ BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
+ return leftValue.divide(rightValue);
}
-
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java
similarity index 60%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java
index 9c58e70476..7918b434ac 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java
@@ -15,37 +15,42 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.transform.process.operator;
+package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
-import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
-import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
-import org.apache.commons.lang.ObjectUtils;
+import net.sf.jsqlparser.expression.operators.arithmetic.Multiplication;
+
+import java.math.BigDecimal;
/**
- * NotEqualsToOperator
+ * MultiplicationParser
*
*/
-public class NotEqualsToOperator implements ExpressionOperator {
+public class MultiplicationParser implements ValueParser {
private ValueParser left;
+
private ValueParser right;
- public NotEqualsToOperator(NotEqualsTo expr) {
+ public MultiplicationParser(Multiplication expr) {
this.left = OperatorTools.buildParser(expr.getLeftExpression());
this.right = OperatorTools.buildParser(expr.getRightExpression());
}
/**
- * check
+ * parse
* @param sourceData
* @param rowIndex
* @return
*/
@Override
- public boolean check(SourceData sourceData, int rowIndex) {
- return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex),
this.right.parse(sourceData, rowIndex));
+ public Object parse(SourceData sourceData, int rowIndex) {
+ Object leftObj = this.left.parse(sourceData, rowIndex);
+ Object rightObj = this.right.parse(sourceData, rowIndex);
+ BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
+ BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
+ return leftValue.multiply(rightValue);
}
-
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java
similarity index 56%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java
index 3172626000..61a2bd1bf3 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java
@@ -15,37 +15,33 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.transform.process.operator;
+package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
-import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
-import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
-import org.apache.commons.lang.ObjectUtils;
+import net.sf.jsqlparser.expression.Parenthesis;
/**
- * EqualsToOperator
+ * ParenthesisParser
*
*/
-public class EqualsToOperator implements ExpressionOperator {
+public class ParenthesisParser implements ValueParser {
- private ValueParser left;
- private ValueParser right;
+ private ValueParser node;
- public EqualsToOperator(EqualsTo expr) {
- this.left = OperatorTools.buildParser(expr.getLeftExpression());
- this.right = OperatorTools.buildParser(expr.getRightExpression());
+ public ParenthesisParser(Parenthesis expr) {
+ this.node = OperatorTools.buildParser(expr.getExpression());
}
/**
- * check
+ * parse
* @param sourceData
* @param rowIndex
* @return
*/
@Override
- public boolean check(SourceData sourceData, int rowIndex) {
- return ObjectUtils.equals(this.left.parse(sourceData, rowIndex),
this.right.parse(sourceData, rowIndex));
+ public Object parse(SourceData sourceData, int rowIndex) {
+ return node.parse(sourceData, rowIndex);
}
-
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
similarity index 61%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
index 9c58e70476..af36c79452 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
@@ -15,37 +15,42 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.transform.process.operator;
+package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
-import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
-import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
-import org.apache.commons.lang.ObjectUtils;
+import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction;
+
+import java.math.BigDecimal;
/**
- * NotEqualsToOperator
+ * SubtractionParser
*
*/
-public class NotEqualsToOperator implements ExpressionOperator {
+public class SubtractionParser implements ValueParser {
private ValueParser left;
+
private ValueParser right;
- public NotEqualsToOperator(NotEqualsTo expr) {
+ public SubtractionParser(Subtraction expr) {
this.left = OperatorTools.buildParser(expr.getLeftExpression());
this.right = OperatorTools.buildParser(expr.getRightExpression());
}
/**
- * check
+ * parse
* @param sourceData
* @param rowIndex
* @return
*/
@Override
- public boolean check(SourceData sourceData, int rowIndex) {
- return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex),
this.right.parse(sourceData, rowIndex));
+ public Object parse(SourceData sourceData, int rowIndex) {
+ Object leftObj = this.left.parse(sourceData, rowIndex);
+ Object rightObj = this.right.parse(sourceData, rowIndex);
+ BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
+ BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
+ return leftValue.subtract(rightValue);
}
-
}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
index 282e45edfb..b508f8f2aa 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
@@ -269,4 +269,30 @@ public class TestTransformProcessor {
e.printStackTrace();
}
}
+
+ @Test
+ public void testPb2CsvForAdd() {
+ try {
+ List<FieldInfo> fields = this.getTestFieldList();
+ String transformBase64 = this.getPbTestDescription();
+ SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", null);
+ SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ String transformSql = "select $root.sid,"
+ +
"($root.msgs(1).msgTime-$root.msgs(0).msgTime)/$root.packageID field2,"
+ +
"$root.packageID*($root.msgs(0).msgTime*$root.packageID+$root.msgs(1).msgTime/$root.packageID)"
+ + "*$root.packageID field3,"
+ + "$root.msgs(0).msg field4 from source "
+ + "where
$root.packageID<($root.msgs(0).msgTime+$root.msgs(1).msgTime"
+ + "+$root.msgs(0).msgTime+$root.msgs(1).msgTime)";
+ TransformConfig config = new TransformConfig(pbSource, csvSink,
transformSql);
+ // case1
+ TransformProcessor processor = new TransformProcessor(config);
+ byte[] srcBytes = this.getPbTestData();
+ List<String> output = processor.transform(srcBytes, new
HashMap<>());
+ Assert.assertTrue(output.size() == 1);
+ Assert.assertEquals(output.get(0),
"sid|2|3426487836002|msgValue4");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
}