This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f1285ed4b1 [INLONG-10968][SDK] Support Inlong Transform operator
annotation (#10969)
f1285ed4b1 is described below
commit f1285ed4b1c9ec1ff16942c6324f6fad46c6898c
Author: emptyOVO <[email protected]>
AuthorDate: Tue Sep 3 10:33:10 2024 +0800
[INLONG-10968][SDK] Support Inlong Transform operator annotation (#10969)
---
.../transform/process/operator/AndOperator.java | 1 +
.../process/operator/EqualsToOperator.java | 1 +
.../operator/GreaterThanEqualsOperator.java | 1 +
.../process/operator/GreaterThanOperator.java | 1 +
.../process/operator/MinorThanEqualsOperator.java | 1 +
.../process/operator/MinorThanOperator.java | 1 +
.../process/operator/NotEqualsToOperator.java | 1 +
.../transform/process/operator/NotOperator.java | 1 +
.../transform/process/operator/OperatorTools.java | 89 ++++++++++++++--------
.../sdk/transform/process/operator/OrOperator.java | 1 +
.../process/operator/ParenthesisOperator.java | 1 +
.../{NotOperator.java => TransformOperator.java} | 33 ++------
12 files changed, 77 insertions(+), 55 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
index a9dcd42606..f438d4295c 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
@@ -26,6 +26,7 @@ import
net.sf.jsqlparser.expression.operators.conditional.AndExpression;
* AndOperator
*
*/
+@TransformOperator(values = AndExpression.class)
public class AndOperator implements ExpressionOperator {
private final ExpressionOperator left;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
index 709537e8a0..13010c854b 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
@@ -27,6 +27,7 @@ import
net.sf.jsqlparser.expression.operators.relational.EqualsTo;
* EqualsToOperator
*
*/
+@TransformOperator(values = EqualsTo.class)
public class EqualsToOperator implements ExpressionOperator {
private final ValueParser left;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
index 3a53968e10..e703afdbda 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
@@ -27,6 +27,7 @@ import
net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
* GreaterThanEqualsOperator
*
*/
+@TransformOperator(values = GreaterThanEquals.class)
public class GreaterThanEqualsOperator implements ExpressionOperator {
private final ValueParser left;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
index a1cd8c2ea2..ba73ef4c4d 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
@@ -27,6 +27,7 @@ import
net.sf.jsqlparser.expression.operators.relational.GreaterThan;
* GreaterThanOperator
*
*/
+@TransformOperator(values = GreaterThan.class)
public class GreaterThanOperator implements ExpressionOperator {
private final ValueParser left;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
index 4248cf1d36..e8104e2cc3 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
@@ -27,6 +27,7 @@ import
net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
* MinorThanEqualsOperator
*
*/
+@TransformOperator(values = MinorThanEquals.class)
public class MinorThanEqualsOperator implements ExpressionOperator {
private final ValueParser left;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
index 21ecc0400a..ecae167c5c 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
@@ -27,6 +27,7 @@ import
net.sf.jsqlparser.expression.operators.relational.MinorThan;
* MinorThanOperator
*
*/
+@TransformOperator(values = MinorThan.class)
public class MinorThanOperator implements ExpressionOperator {
private final ValueParser left;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
index 98bf102b4f..d3fec5c8c2 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
@@ -27,6 +27,7 @@ import
net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
* NotEqualsToOperator
*
*/
+@TransformOperator(values = NotEqualsTo.class)
public class NotEqualsToOperator implements ExpressionOperator {
private final ValueParser left;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
index d8b9ff07e0..306177ffba 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
@@ -26,6 +26,7 @@ import net.sf.jsqlparser.expression.NotExpression;
* NotOperator
*
*/
+@TransformOperator(values = NotExpression.class)
public class NotOperator implements ExpressionOperator {
private final ExpressionOperator node;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
index bb35bb4490..9982b37418 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
@@ -22,54 +22,83 @@ import
org.apache.inlong.sdk.transform.process.parser.ColumnParser;
import org.apache.inlong.sdk.transform.process.parser.ParserTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;
-import net.sf.jsqlparser.expression.NotExpression;
-import net.sf.jsqlparser.expression.Parenthesis;
-import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
-import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
-import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
-import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
-import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
-import net.sf.jsqlparser.expression.operators.relational.MinorThan;
-import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
-import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
import org.apache.commons.lang.ObjectUtils;
+import org.reflections.Reflections;
+import org.reflections.scanners.Scanners;
+import java.lang.reflect.Constructor;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
+import java.util.Map;
+import java.util.Set;
/**
* OperatorTools
*
*/
+@Slf4j
public class OperatorTools {
+ private static final String OPERATOR_PATH =
"org.apache.inlong.sdk.transform.process.operator";
+
+ private final static Map<Class<?>, Class<?>> operatorMap =
Maps.newConcurrentMap();
+
public static final String ROOT_KEY = "$root";
public static final String CHILD_KEY = "$child";
+
+ static {
+ init();
+ }
+
+ private static void init() {
+ Reflections reflections = new Reflections(OPERATOR_PATH,
Scanners.TypesAnnotated);
+ Set<Class<?>> clazzSet =
reflections.getTypesAnnotatedWith(TransformOperator.class);
+ for (Class<?> clazz : clazzSet) {
+ if (ExpressionOperator.class.isAssignableFrom(clazz)) {
+ TransformOperator annotation =
clazz.getAnnotation(TransformOperator.class);
+ if (annotation == null) {
+ continue;
+ }
+ Class<?>[] values = annotation.values();
+ for (Class<?> value : values) {
+ operatorMap.compute(value, (key, former) -> {
+ if (former != null) {
+ log.warn("find a conflict for parser class [{}],
the former one is [{}], new one is [{}]",
+ key, former.getName(), clazz.getName());
+ }
+ return clazz;
+ });
+ }
+ }
+ }
+ }
+
+ public static ExpressionOperator getTransformOperator(Expression expr) {
+ Class<?> clazz = operatorMap.get(expr.getClass());
+ if (clazz == null) {
+ return null;
+ }
+ try {
+ Constructor<?> constructor =
clazz.getDeclaredConstructor(expr.getClass());
+ return (ExpressionOperator) constructor.newInstance(expr);
+ } catch (NoSuchMethodException e) {
+ log.error("transform operator {} needs one constructor that accept
one params whose type is {}",
+ clazz.getName(), expr.getClass().getName(), e);
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public static ExpressionOperator buildOperator(Expression expr) {
- if (expr instanceof AndExpression) {
- return new AndOperator((AndExpression) expr);
- } else if (expr instanceof OrExpression) {
- return new OrOperator((OrExpression) expr);
- } else if (expr instanceof Parenthesis) {
- return new ParenthesisOperator((Parenthesis) expr);
- } else if (expr instanceof NotExpression) {
- return new NotOperator((NotExpression) expr);
- } else if (expr instanceof EqualsTo) {
- return new EqualsToOperator((EqualsTo) expr);
- } else if (expr instanceof NotEqualsTo) {
- return new NotEqualsToOperator((NotEqualsTo) expr);
- } else if (expr instanceof GreaterThan) {
- return new GreaterThanOperator((GreaterThan) expr);
- } else if (expr instanceof GreaterThanEquals) {
- return new GreaterThanEqualsOperator((GreaterThanEquals) expr);
- } else if (expr instanceof MinorThan) {
- return new MinorThanOperator((MinorThan) expr);
- } else if (expr instanceof MinorThanEquals) {
- return new MinorThanEqualsOperator((MinorThanEquals) expr);
+ if (expr != null) {
+ return getTransformOperator(expr);
}
return null;
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
index b5de7f279e..9efccba3f2 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
@@ -26,6 +26,7 @@ import
net.sf.jsqlparser.expression.operators.conditional.OrExpression;
* OrOperator
*
*/
+@TransformOperator(values = OrExpression.class)
public class OrOperator implements ExpressionOperator {
private final ExpressionOperator left;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
index 0ca1334fce..de71c84b4d 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
@@ -26,6 +26,7 @@ import net.sf.jsqlparser.expression.Parenthesis;
* ParenthesisOperator
*
*/
+@TransformOperator(values = Parenthesis.class)
public class ParenthesisOperator implements ExpressionOperator {
private final ExpressionOperator node;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/TransformOperator.java
similarity index 56%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/TransformOperator.java
index d8b9ff07e0..ee9c676665 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/TransformOperator.java
@@ -17,32 +17,15 @@
package org.apache.inlong.sdk.transform.process.operator;
-import org.apache.inlong.sdk.transform.decode.SourceData;
-import org.apache.inlong.sdk.transform.process.Context;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
-import net.sf.jsqlparser.expression.NotExpression;
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
-/**
- * NotOperator
- *
- */
-public class NotOperator implements ExpressionOperator {
-
- private final ExpressionOperator node;
-
- public NotOperator(NotExpression expr) {
- this.node = OperatorTools.buildOperator(expr.getExpression());
- }
-
- /**
- * check
- * @param sourceData
- * @param rowIndex
- * @return
- */
- @Override
- public boolean check(SourceData sourceData, int rowIndex, Context context)
{
- return !this.node.check(sourceData, rowIndex, context);
- }
+@Retention(RUNTIME)
+@Target(TYPE)
+public @interface TransformOperator {
+ Class<?>[] values();
}