This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f1285ed4b1 [INLONG-10968][SDK] Support Inlong Transform operator 
annotation (#10969)
f1285ed4b1 is described below

commit f1285ed4b1c9ec1ff16942c6324f6fad46c6898c
Author: emptyOVO <[email protected]>
AuthorDate: Tue Sep 3 10:33:10 2024 +0800

    [INLONG-10968][SDK] Support Inlong Transform operator annotation (#10969)
---
 .../transform/process/operator/AndOperator.java    |  1 +
 .../process/operator/EqualsToOperator.java         |  1 +
 .../operator/GreaterThanEqualsOperator.java        |  1 +
 .../process/operator/GreaterThanOperator.java      |  1 +
 .../process/operator/MinorThanEqualsOperator.java  |  1 +
 .../process/operator/MinorThanOperator.java        |  1 +
 .../process/operator/NotEqualsToOperator.java      |  1 +
 .../transform/process/operator/NotOperator.java    |  1 +
 .../transform/process/operator/OperatorTools.java  | 89 ++++++++++++++--------
 .../sdk/transform/process/operator/OrOperator.java |  1 +
 .../process/operator/ParenthesisOperator.java      |  1 +
 .../{NotOperator.java => TransformOperator.java}   | 33 ++------
 12 files changed, 77 insertions(+), 55 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
index a9dcd42606..f438d4295c 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
@@ -26,6 +26,7 @@ import 
net.sf.jsqlparser.expression.operators.conditional.AndExpression;
  * AndOperator
  * 
  */
+@TransformOperator(values = AndExpression.class)
 public class AndOperator implements ExpressionOperator {
 
     private final ExpressionOperator left;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
index 709537e8a0..13010c854b 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
@@ -27,6 +27,7 @@ import 
net.sf.jsqlparser.expression.operators.relational.EqualsTo;
  * EqualsToOperator
  * 
  */
+@TransformOperator(values = EqualsTo.class)
 public class EqualsToOperator implements ExpressionOperator {
 
     private final ValueParser left;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
index 3a53968e10..e703afdbda 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
@@ -27,6 +27,7 @@ import 
net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
  * GreaterThanEqualsOperator
  * 
  */
+@TransformOperator(values = GreaterThanEquals.class)
 public class GreaterThanEqualsOperator implements ExpressionOperator {
 
     private final ValueParser left;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
index a1cd8c2ea2..ba73ef4c4d 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
@@ -27,6 +27,7 @@ import 
net.sf.jsqlparser.expression.operators.relational.GreaterThan;
  * GreaterThanOperator
  * 
  */
+@TransformOperator(values = GreaterThan.class)
 public class GreaterThanOperator implements ExpressionOperator {
 
     private final ValueParser left;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
index 4248cf1d36..e8104e2cc3 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
@@ -27,6 +27,7 @@ import 
net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
  * MinorThanEqualsOperator
  * 
  */
+@TransformOperator(values = MinorThanEquals.class)
 public class MinorThanEqualsOperator implements ExpressionOperator {
 
     private final ValueParser left;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
index 21ecc0400a..ecae167c5c 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
@@ -27,6 +27,7 @@ import 
net.sf.jsqlparser.expression.operators.relational.MinorThan;
  * MinorThanOperator
  * 
  */
+@TransformOperator(values = MinorThan.class)
 public class MinorThanOperator implements ExpressionOperator {
 
     private final ValueParser left;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
index 98bf102b4f..d3fec5c8c2 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
@@ -27,6 +27,7 @@ import 
net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
  * NotEqualsToOperator
  * 
  */
+@TransformOperator(values = NotEqualsTo.class)
 public class NotEqualsToOperator implements ExpressionOperator {
 
     private final ValueParser left;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
index d8b9ff07e0..306177ffba 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
@@ -26,6 +26,7 @@ import net.sf.jsqlparser.expression.NotExpression;
  * NotOperator
  * 
  */
+@TransformOperator(values = NotExpression.class)
 public class NotOperator implements ExpressionOperator {
 
     private final ExpressionOperator node;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
index bb35bb4490..9982b37418 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
@@ -22,54 +22,83 @@ import 
org.apache.inlong.sdk.transform.process.parser.ColumnParser;
 import org.apache.inlong.sdk.transform.process.parser.ParserTools;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
 import net.sf.jsqlparser.expression.Expression;
 import net.sf.jsqlparser.expression.Function;
-import net.sf.jsqlparser.expression.NotExpression;
-import net.sf.jsqlparser.expression.Parenthesis;
-import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
-import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
-import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
-import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
-import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
-import net.sf.jsqlparser.expression.operators.relational.MinorThan;
-import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
-import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
 import org.apache.commons.lang.ObjectUtils;
+import org.reflections.Reflections;
+import org.reflections.scanners.Scanners;
 
+import java.lang.reflect.Constructor;
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * OperatorTools
  *
  */
+@Slf4j
 public class OperatorTools {
 
+    private static final String OPERATOR_PATH = 
"org.apache.inlong.sdk.transform.process.operator";
+
+    private final static Map<Class<?>, Class<?>> operatorMap = 
Maps.newConcurrentMap();
+
     public static final String ROOT_KEY = "$root";
 
     public static final String CHILD_KEY = "$child";
+
+    static {
+        init();
+    }
+
+    private static void init() {
+        Reflections reflections = new Reflections(OPERATOR_PATH, 
Scanners.TypesAnnotated);
+        Set<Class<?>> clazzSet = 
reflections.getTypesAnnotatedWith(TransformOperator.class);
+        for (Class<?> clazz : clazzSet) {
+            if (ExpressionOperator.class.isAssignableFrom(clazz)) {
+                TransformOperator annotation = 
clazz.getAnnotation(TransformOperator.class);
+                if (annotation == null) {
+                    continue;
+                }
+                Class<?>[] values = annotation.values();
+                for (Class<?> value : values) {
+                    operatorMap.compute(value, (key, former) -> {
+                        if (former != null) {
+                            log.warn("find a conflict for parser class [{}], 
the former one is [{}], new one is [{}]",
+                                    key, former.getName(), clazz.getName());
+                        }
+                        return clazz;
+                    });
+                }
+            }
+        }
+    }
+
+    public static ExpressionOperator getTransformOperator(Expression expr) {
+        Class<?> clazz = operatorMap.get(expr.getClass());
+        if (clazz == null) {
+            return null;
+        }
+        try {
+            Constructor<?> constructor = 
clazz.getDeclaredConstructor(expr.getClass());
+            return (ExpressionOperator) constructor.newInstance(expr);
+        } catch (NoSuchMethodException e) {
+            log.error("transform operator {} needs one constructor that accept 
one params whose type is {}",
+                    clazz.getName(), expr.getClass().getName(), e);
+            throw new RuntimeException(e);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     public static ExpressionOperator buildOperator(Expression expr) {
-        if (expr instanceof AndExpression) {
-            return new AndOperator((AndExpression) expr);
-        } else if (expr instanceof OrExpression) {
-            return new OrOperator((OrExpression) expr);
-        } else if (expr instanceof Parenthesis) {
-            return new ParenthesisOperator((Parenthesis) expr);
-        } else if (expr instanceof NotExpression) {
-            return new NotOperator((NotExpression) expr);
-        } else if (expr instanceof EqualsTo) {
-            return new EqualsToOperator((EqualsTo) expr);
-        } else if (expr instanceof NotEqualsTo) {
-            return new NotEqualsToOperator((NotEqualsTo) expr);
-        } else if (expr instanceof GreaterThan) {
-            return new GreaterThanOperator((GreaterThan) expr);
-        } else if (expr instanceof GreaterThanEquals) {
-            return new GreaterThanEqualsOperator((GreaterThanEquals) expr);
-        } else if (expr instanceof MinorThan) {
-            return new MinorThanOperator((MinorThan) expr);
-        } else if (expr instanceof MinorThanEquals) {
-            return new MinorThanEqualsOperator((MinorThanEquals) expr);
+        if (expr != null) {
+            return getTransformOperator(expr);
         }
         return null;
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
index b5de7f279e..9efccba3f2 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
@@ -26,6 +26,7 @@ import 
net.sf.jsqlparser.expression.operators.conditional.OrExpression;
  * OrOperator
  * 
  */
+@TransformOperator(values = OrExpression.class)
 public class OrOperator implements ExpressionOperator {
 
     private final ExpressionOperator left;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
index 0ca1334fce..de71c84b4d 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
@@ -26,6 +26,7 @@ import net.sf.jsqlparser.expression.Parenthesis;
  * ParenthesisOperator
  * 
  */
+@TransformOperator(values = Parenthesis.class)
 public class ParenthesisOperator implements ExpressionOperator {
 
     private final ExpressionOperator node;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/TransformOperator.java
similarity index 56%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/TransformOperator.java
index d8b9ff07e0..ee9c676665 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/TransformOperator.java
@@ -17,32 +17,15 @@
 
 package org.apache.inlong.sdk.transform.process.operator;
 
-import org.apache.inlong.sdk.transform.decode.SourceData;
-import org.apache.inlong.sdk.transform.process.Context;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
 
-import net.sf.jsqlparser.expression.NotExpression;
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
 
-/**
- * NotOperator
- * 
- */
-public class NotOperator implements ExpressionOperator {
-
-    private final ExpressionOperator node;
-
-    public NotOperator(NotExpression expr) {
-        this.node = OperatorTools.buildOperator(expr.getExpression());
-    }
-
-    /**
-     * check
-     * @param sourceData
-     * @param rowIndex
-     * @return
-     */
-    @Override
-    public boolean check(SourceData sourceData, int rowIndex, Context context) 
{
-        return !this.node.check(sourceData, rowIndex, context);
-    }
+@Retention(RUNTIME)
+@Target(TYPE)
+public @interface TransformOperator {
 
+    Class<?>[] values();
 }

Reply via email to