This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 70968a9d4b [INLONG-11091][Manager] Manager supports in filter function
configuration (#11094)
70968a9d4b is described below
commit 70968a9d4b26a2d95acb845ae68ce4d0d80706a0
Author: fuweng11 <[email protected]>
AuthorDate: Fri Sep 13 13:10:54 2024 +0800
[INLONG-11091][Manager] Manager supports in filter function configuration
(#11094)
---
.../pojo/sort/util/FilterFunctionUtils.java | 30 +++++++++++++++++++---
.../pojo/transform/TransformDefinition.java | 2 +-
.../pojo/transform/filter/FilterDefinition.java | 2 ++
.../pojo/transform/TransformDefinitionTest.java | 4 +--
.../service/sink/StreamSinkServiceImpl.java | 2 +-
5 files changed, 33 insertions(+), 7 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
index 1b3eadc7f2..c66dccd511 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
@@ -31,16 +31,20 @@ import
org.apache.inlong.manager.pojo.transform.filter.FilterDefinition.FilterRu
import
org.apache.inlong.manager.pojo.transform.filter.FilterDefinition.TargetValue;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.transformation.CompareOperator;
import org.apache.inlong.sort.protocol.transformation.ConstantParam;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import org.apache.inlong.sort.protocol.transformation.FunctionParam;
import org.apache.inlong.sort.protocol.transformation.LogicOperator;
+import
org.apache.inlong.sort.protocol.transformation.MultiValueCompareOperator;
import
org.apache.inlong.sort.protocol.transformation.SingleValueCompareOperator;
import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
+import
org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
import
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.InOperator;
import
org.apache.inlong.sort.protocol.transformation.operator.IsNotNullOperator;
import org.apache.inlong.sort.protocol.transformation.operator.IsNullOperator;
import
org.apache.inlong.sort.protocol.transformation.operator.LessThanOperator;
@@ -51,13 +55,17 @@ import
org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator;
import org.apache.inlong.sort.protocol.transformation.operator.OrOperator;
import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* Util for creat filter function.
*/
+@Slf4j
public class FilterFunctionUtils {
/**
@@ -155,12 +163,26 @@ public class FilterFunctionUtils {
FieldInfoUtils.convertFieldFormat(fieldType, fieldFormat));
}
OperationType operationType = filterRule.getOperationType();
- SingleValueCompareOperator compareOperator =
parseCompareOperator(operationType);
+ CompareOperator compareOperator = parseCompareOperator(operationType);
TargetValue targetValue = filterRule.getTargetValue();
FunctionParam target = parseTargetValue(targetValue, transformName);
RuleRelation relationWithPost = filterRule.getRelationWithPost();
LogicOperator logicOperator = parseLogicOperator(relationWithPost);
- return new SingleValueFilterFunction(logicOperator, sourceFieldInfo,
compareOperator, target);
+ if (compareOperator instanceof SingleValueCompareOperator) {
+ return new SingleValueFilterFunction(logicOperator,
sourceFieldInfo,
+ (SingleValueCompareOperator) compareOperator, target);
+ } else {
+ List<FunctionParam> targets = new ArrayList<>();
+ if (CollectionUtils.isNotEmpty(filterRule.getTargetValues())) {
+ for (TargetValue value : filterRule.getTargetValues()) {
+ targets.add(parseTargetValue(value, transformName));
+ }
+ } else {
+ targets.add(target);
+ }
+ return new MultiValueFilterFunction(sourceFieldInfo, targets,
(MultiValueCompareOperator) compareOperator,
+ logicOperator);
+ }
}
private static LogicOperator parseLogicOperator(RuleRelation relation) {
@@ -199,7 +221,7 @@ public class FilterFunctionUtils {
}
}
- private static SingleValueCompareOperator
parseCompareOperator(OperationType operationType) {
+ private static CompareOperator parseCompareOperator(OperationType
operationType) {
switch (operationType) {
case eq:
return EqualOperator.getInstance();
@@ -217,6 +239,8 @@ public class FilterFunctionUtils {
return IsNullOperator.getInstance();
case not_null:
return IsNotNullOperator.getInstance();
+ case in:
+ return InOperator.getInstance();
default:
throw new IllegalArgumentException(String.format("Unsupported
operateType=%s", operationType));
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java
index 95c119ef25..f1aafdab01 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java
@@ -36,7 +36,7 @@ public abstract class TransformDefinition {
@JsonFormat
public enum OperationType {
- lt, le, eq, ne, ge, gt, is_null, not_null
+ lt, le, eq, ne, ge, gt, is_null, not_null, in
}
@JsonFormat
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java
index 9cec3550de..bdd642cadf 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java
@@ -105,6 +105,8 @@ public class FilterDefinition extends TransformDefinition {
private TargetValue targetValue;
+ private List<TargetValue> targetValues;
+
private RuleRelation relationWithPost;
}
diff --git
a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java
b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java
index f68bf36f62..4d5cecbad3 100644
---
a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java
+++
b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java
@@ -95,9 +95,9 @@ public class TransformDefinitionTest {
private List<FilterRule> createFilterRule() {
List<FilterRule> filterRules = Lists.newArrayList();
filterRules.add(new FilterRule(new StreamField(0,
FieldType.STRING.toString(), "name", null, null),
- OperationType.not_null, null, RuleRelation.OR));
+ OperationType.not_null, null, null, RuleRelation.OR));
filterRules.add(new FilterRule(new StreamField(1,
FieldType.INT.toString(), "age", null, null),
- OperationType.gt, new TargetValue(true, null, "50"), null));
+ OperationType.gt, new TargetValue(true, null, "50"), null,
null));
return filterRules;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 9177253d90..4ce1d1c76c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -481,7 +481,7 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
this.startProcessForSink(request.getInlongGroupId(),
request.getInlongStreamId(), operator);
}
- LOGGER.info("success to update sink by id: {}", request);
+ LOGGER.info("success to update sink by id: {}", request.getId());
return true;
}