This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1518021802 [INLONG-10684][SDK] Inlong transform supports context
(#10702)
1518021802 is described below
commit 15180218024a5bf923761f83abbbc0e53268cd3f
Author: vernedeng <[email protected]>
AuthorDate: Tue Jul 23 16:40:27 2024 +0800
[INLONG-10684][SDK] Inlong transform supports context (#10702)
* [INLONG-10684][SDK] Inlong transform supports context
* fix UT
* fix docs
---
.../sdk/transform/decode/CsvSourceDecoder.java | 11 +--
.../sdk/transform/decode/JsonSourceDecoder.java | 18 ++---
.../sdk/transform/decode/KvSourceDecoder.java | 12 ++--
.../sdk/transform/decode/PbSourceDecoder.java | 5 +-
.../inlong/sdk/transform/decode/SourceDecoder.java | 6 +-
.../sdk/transform/encode/CsvSinkEncoder.java | 3 +-
.../inlong/sdk/transform/encode/KvSinkEncoder.java | 3 +-
.../inlong/sdk/transform/encode/SinkEncoder.java | 3 +-
.../inlong/sdk/transform/pojo/TransformConfig.java | 18 +++++
.../inlong/sdk/transform/process/Context.java | 84 ++++++++++++++++++++++
.../sdk/transform/process/TransformProcessor.java | 25 +++++--
.../transform/process/function/AbsFunction.java | 5 +-
.../transform/process/function/ConcatFunction.java | 5 +-
.../transform/process/function/ExpFunction.java | 5 +-
.../sdk/transform/process/function/LnFunction.java | 5 +-
.../transform/process/function/Log10Function.java | 7 +-
.../transform/process/function/Log2Function.java | 7 +-
.../transform/process/function/LogFunction.java | 9 +--
.../transform/process/function/NowFunction.java | 3 +-
.../transform/process/function/PowerFunction.java | 7 +-
.../transform/process/function/SqrtFunction.java | 5 +-
.../transform/process/operator/AndOperator.java | 9 +--
.../process/operator/EqualsToOperator.java | 11 +--
.../process/operator/ExpressionOperator.java | 3 +-
.../operator/GreaterThanEqualsOperator.java | 11 +--
.../process/operator/GreaterThanOperator.java | 11 +--
.../process/operator/MinorThanEqualsOperator.java | 11 +--
.../process/operator/MinorThanOperator.java | 11 +--
.../process/operator/NotEqualsToOperator.java | 11 +--
.../transform/process/operator/NotOperator.java | 7 +-
.../sdk/transform/process/operator/OrOperator.java | 9 +--
.../process/operator/ParenthesisOperator.java | 7 +-
.../transform/process/parser/AdditionParser.java | 11 +--
.../sdk/transform/process/parser/ColumnParser.java | 5 +-
.../transform/process/parser/DivisionParser.java | 7 +-
.../sdk/transform/process/parser/LongParser.java | 5 +-
.../process/parser/MultiplicationParser.java | 11 +--
.../process/parser/ParenthesisParser.java | 7 +-
.../sdk/transform/process/parser/StringParser.java | 5 +-
.../process/parser/SubtractionParser.java | 7 +-
.../sdk/transform/process/parser/ValueParser.java | 3 +-
41 files changed, 279 insertions(+), 129 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
index daddfd36d7..d809ac687a 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
@@ -19,12 +19,13 @@ package org.apache.inlong.sdk.transform.decode;
import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.nio.charset.Charset;
import java.util.List;
-import java.util.Map;
/**
* CsvSourceDecoder
@@ -53,19 +54,19 @@ public class CsvSourceDecoder implements
SourceDecoder<String> {
}
@Override
- public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
+ public SourceData decode(byte[] srcBytes, Context context) {
String srcString = new String(srcBytes, srcCharset);
- return this.decode(srcString, extParams);
+ return this.decode(srcString, context);
}
@Override
- public SourceData decode(String srcString, Map<String, Object> extParams) {
+ public SourceData decode(String srcString, Context context) {
String[][] rowValues = SplitUtils.splitCsv(srcString, delimiter,
escapeChar, '\"', '\n', true);
CsvSourceData sourceData = new CsvSourceData();
for (int i = 0; i < rowValues.length; i++) {
String[] fieldValues = rowValues[i];
sourceData.addRow();
- if (fields == null || fields.size() == 0) {
+ if (CollectionUtils.isEmpty(fields)) {
for (int j = 0; j < fieldValues.length; j++) {
String fieldName = SourceData.FIELD_DEFAULT_PREFIX + (j +
1);
sourceData.putField(fieldName, fieldValues[j]);
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
index 13c363912a..2d16d92bc1 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
@@ -18,17 +18,18 @@
package org.apache.inlong.sdk.transform.decode;
import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
+import org.apache.inlong.sdk.transform.process.Context;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
/**
* JsonSourceDecoder
@@ -65,26 +66,26 @@ public class JsonSourceDecoder implements
SourceDecoder<String> {
/**
* decode
* @param srcBytes
- * @param extParams
+ * @param context
* @return
*/
@Override
- public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
+ public SourceData decode(byte[] srcBytes, Context context) {
String srcString = new String(srcBytes, srcCharset);
- return this.decode(srcString, extParams);
+ return this.decode(srcString, context);
}
/**
* decode
* @param srcString
- * @param extParams
+ * @param context
* @return
*/
@Override
- public SourceData decode(String srcString, Map<String, Object> extParams) {
+ public SourceData decode(String srcString, Context context) {
JsonObject root = gson.fromJson(srcString, JsonObject.class);
JsonArray childRoot = null;
- if (this.childNodes != null && this.childNodes.size() > 0) {
+ if (CollectionUtils.isNotEmpty(childNodes)) {
JsonElement current = root;
for (JsonNode node : childNodes) {
if (!current.isJsonObject()) {
@@ -117,7 +118,6 @@ public class JsonSourceDecoder implements
SourceDecoder<String> {
}
childRoot = current.getAsJsonArray();
}
- SourceData sourceData = new JsonSourceData(root, childRoot);
- return sourceData;
+ return new JsonSourceData(root, childRoot);
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
index 77a4fef8b4..db26a4a57d 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
@@ -19,7 +19,9 @@ package org.apache.inlong.sdk.transform.decode;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.nio.charset.Charset;
@@ -45,19 +47,19 @@ public class KvSourceDecoder implements
SourceDecoder<String> {
}
@Override
- public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
+ public SourceData decode(byte[] srcBytes, Context context) {
String srcString = new String(srcBytes, srcCharset);
- return this.decode(srcString, extParams);
+ return this.decode(srcString, context);
}
@Override
- public SourceData decode(String srcString, Map<String, Object> extParams) {
+ public SourceData decode(String srcString, Context context) {
List<Map<String, String>> rowValues = KvUtils.splitKv(srcString, '&',
'=', '\\', '\"', '\n');
KvSourceData sourceData = new KvSourceData();
- if (fields == null || fields.size() == 0) {
+ if (CollectionUtils.isEmpty(fields)) {
for (Map<String, String> row : rowValues) {
sourceData.addRow();
- row.forEach((k, v) -> sourceData.putField(k, v));
+ row.forEach(sourceData::putField);
}
return sourceData;
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
index 6c8a919e24..48f3749c45 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.decode;
import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
+import org.apache.inlong.sdk.transform.process.Context;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
@@ -89,13 +90,13 @@ public class PbSourceDecoder implements
SourceDecoder<byte[]> {
/**
* decode
* @param srcBytes
- * @param extParams
+ * @param context
* @return
* @throws InvalidProtocolBufferException
*/
@SuppressWarnings("unchecked")
@Override
- public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
+ public SourceData decode(byte[] srcBytes, Context context) {
try {
// decode
DynamicMessage.Builder builder =
DynamicMessage.newBuilder(rootDesc);
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
index 7bbb4dda2d..2e410d24c3 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
@@ -17,15 +17,15 @@
package org.apache.inlong.sdk.transform.decode;
-import java.util.Map;
+import org.apache.inlong.sdk.transform.process.Context;
/**
* SourceDecoder
*/
public interface SourceDecoder<Input> {
- SourceData decode(byte[] srcBytes, Map<String, Object> extParams);
+ SourceData decode(byte[] srcBytes, Context context);
- SourceData decode(Input input, Map<String, Object> extParams);
+ SourceData decode(Input input, Context context);
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
index 09cae6ea1b..1043f9c2e0 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.transform.encode;
import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.commons.lang3.StringUtils;
@@ -57,7 +58,7 @@ public class CsvSinkEncoder implements SinkEncoder<String> {
* @return
*/
@Override
- public String encode(SinkData sinkData) {
+ public String encode(SinkData sinkData, Context context) {
builder.delete(0, builder.length());
if (fields == null || fields.size() == 0) {
if (escapeChar == null) {
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
index be0a7ba980..edf46fcee1 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.transform.encode;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.commons.lang3.StringUtils;
@@ -49,7 +50,7 @@ public class KvSinkEncoder implements SinkEncoder<String> {
* @return
*/
@Override
- public String encode(SinkData sinkData) {
+ public String encode(SinkData sinkData, Context context) {
builder.delete(0, builder.length());
if (fields == null || fields.size() == 0) {
for (String fieldName : sinkData.keyList()) {
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
index 150f1811f1..7f845a99d6 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.encode;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.process.Context;
import java.util.List;
@@ -26,7 +27,7 @@ import java.util.List;
*/
public interface SinkEncoder<Output> {
- Output encode(SinkData sinkData);
+ Output encode(SinkData sinkData, Context context);
List<FieldInfo> getFields();
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
index 71dd71be3b..b73f303233 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
@@ -19,6 +19,9 @@ package org.apache.inlong.sdk.transform.pojo;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
/**
* TransformConfig
@@ -28,9 +31,19 @@ public class TransformConfig {
@JsonProperty("transformSql")
private String transformSql;
+ @JsonProperty("configuration")
+ private Map<String, Object> configuration;
+
@JsonCreator
public TransformConfig(@JsonProperty("transformSql") String transformSql) {
+ this(transformSql, ImmutableMap.of());
+ }
+
+ @JsonCreator
+ public TransformConfig(@JsonProperty("transformSql") String transformSql,
+ @JsonProperty("configuration") Map<String, Object> configuration) {
this.transformSql = transformSql;
+ this.configuration = configuration;
}
/**
@@ -42,6 +55,11 @@ public class TransformConfig {
return transformSql;
}
+ @JsonProperty("configuration")
+ public Map<String, Object> getConfiguration() {
+ return configuration;
+ }
+
/**
* set transformSql
* @param transformSql the transformSql to set
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java
new file mode 100644
index 0000000000..9e4f4b0c11
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Transform context.
+ *
+ * <p>configuration</p> is the global configuration when init transform
processor
+ * <p>extParams</p> is the ext params of each data
+ * <p>runtimeParams</p> is the runtime outputs when processing by each
component
+ *
+ * The priority is runtimeParams > extParams > configuration.
+ */
+public class Context {
+
+ private final Map<String, Object> configuration;
+ private final Map<String, Object> extParams;
+ private final Map<String, Object> runtimeParams;
+
+ public Context(Map<String, Object> configuration, Map<String, Object>
extParams) {
+ this.configuration = configuration;
+ this.extParams = extParams;
+ this.runtimeParams = new ConcurrentHashMap<>();
+ }
+
+ public Object put(String key, Object value) {
+ return runtimeParams.put(key, value);
+ }
+
+ public Object get(String key) {
+ Object obj = runtimeParams.get(key);
+ if (obj != null) {
+ return obj;
+ }
+ obj = extParams.get(key);
+ if (obj != null) {
+ return obj;
+ }
+ return configuration.get(key);
+ }
+
+ public String getString(String key) {
+ Object obj = this.get(key);
+ if (obj != null) {
+ return obj.toString();
+ }
+ return null;
+ }
+
+ public Integer getInteger(String key) {
+ Object obj = this.get(key);
+ if (obj != null) {
+ return Integer.getInteger(obj.toString());
+ }
+ return null;
+ }
+
+ public Long getLong(String key) {
+ Object obj = this.get(key);
+ if (obj != null) {
+ return Long.getLong(obj.toString());
+ }
+ return null;
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index 0e74180932..9944268dda 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -55,9 +55,9 @@ public class TransformProcessor<I, O> {
private static final Map<String, Object> EMPTY_EXT_PARAMS =
ImmutableMap.of();
- private TransformConfig config;
- private SourceDecoder<I> decoder;
- private SinkEncoder<O> encoder;
+ private final TransformConfig config;
+ private final SourceDecoder<I> decoder;
+ private final SinkEncoder<O> encoder;
private PlainSelect transformSelect;
private ExpressionOperator where;
@@ -119,27 +119,38 @@ public class TransformProcessor<I, O> {
}
public List<O> transform(I input, Map<String, Object> extParams) {
- SourceData sourceData = this.decoder.decode(input, extParams);
+ Context context = new Context(config.getConfiguration(), extParams);
+
+ // decode
+ SourceData sourceData = this.decoder.decode(input, context);
if (sourceData == null) {
return null;
}
+
List<O> sinkDatas = new ArrayList<>(sourceData.getRowCount());
for (int i = 0; i < sourceData.getRowCount(); i++) {
- if (this.where != null && !this.where.check(sourceData, i)) {
+
+ // where check
+ if (this.where != null && !this.where.check(sourceData, i,
context)) {
continue;
}
+
+ // parse value
SinkData sinkData = new DefaultSinkData();
for (Entry<String, ValueParser> entry :
this.selectItemMap.entrySet()) {
String fieldName = entry.getKey();
try {
- Object fieldValue = entry.getValue().parse(sourceData, i);
+ ValueParser parser = entry.getValue();
+ Object fieldValue = parser.parse(sourceData, i, context);
sinkData.addField(fieldName, String.valueOf(fieldValue));
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
sinkData.addField(fieldName, "");
}
}
- sinkDatas.add(this.encoder.encode(sinkData));
+
+ // encode
+ sinkDatas.add(this.encoder.encode(sinkData, context));
}
return sinkDatas;
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AbsFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AbsFunction.java
index a94d662eae..d2b2ceace6 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AbsFunction.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AbsFunction.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.function;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
@@ -48,8 +49,8 @@ public class AbsFunction implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
- Object numberObj = numberParser.parse(sourceData, rowIndex);
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object numberObj = numberParser.parse(sourceData, rowIndex, context);
BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
return numberValue.abs();
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java
index 2bfe7a5588..529ddad007 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.function;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
@@ -59,10 +60,10 @@ public class ConcatFunction implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
StringBuilder builder = new StringBuilder();
for (ValueParser node : nodeList) {
- builder.append(node.parse(sourceData, rowIndex));
+ builder.append(node.parse(sourceData, rowIndex, context));
}
return builder.toString();
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ExpFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ExpFunction.java
index 5f542413e2..5a7a9bbfac 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ExpFunction.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ExpFunction.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.function;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
@@ -48,8 +49,8 @@ public class ExpFunction implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
- Object numberObj = numberParser.parse(sourceData, rowIndex);
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object numberObj = numberParser.parse(sourceData, rowIndex, context);
BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
return Math.exp(numberValue.doubleValue());
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LnFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LnFunction.java
index d5e5ecf80b..530b2ff4f8 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LnFunction.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LnFunction.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.function;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
@@ -48,8 +49,8 @@ public class LnFunction implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
- Object numberObj = numberParser.parse(sourceData, rowIndex);
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object numberObj = numberParser.parse(sourceData, rowIndex, context);
BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
return Math.log(numberValue.doubleValue());
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log10Function.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log10Function.java
index d390893af2..e968a27aa1 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log10Function.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log10Function.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.function;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
@@ -31,7 +32,7 @@ import java.math.BigDecimal;
*/
public class Log10Function implements ValueParser {
- private ValueParser numberParser;
+ private final ValueParser numberParser;
/**
* Constructor
@@ -48,8 +49,8 @@ public class Log10Function implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
- Object numberObj = numberParser.parse(sourceData, rowIndex);
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object numberObj = numberParser.parse(sourceData, rowIndex, context);
BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
return Math.log10(numberValue.doubleValue());
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log2Function.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log2Function.java
index 9c502f25a5..914bc69ac3 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log2Function.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log2Function.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.function;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
@@ -31,7 +32,7 @@ import java.math.BigDecimal;
*/
public class Log2Function implements ValueParser {
- private ValueParser numberParser;
+ private final ValueParser numberParser;
/**
* Constructor
@@ -48,8 +49,8 @@ public class Log2Function implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
- Object numberObj = numberParser.parse(sourceData, rowIndex);
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object numberObj = numberParser.parse(sourceData, rowIndex, context);
BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
return Math.log(numberValue.doubleValue()) / Math.log(2);
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LogFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LogFunction.java
index bc0e200255..ddbcd71a97 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LogFunction.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LogFunction.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.function;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
@@ -35,7 +36,7 @@ import java.util.List;
public class LogFunction implements ValueParser {
private ValueParser baseParser;
- private ValueParser numberParser;
+ private final ValueParser numberParser;
/**
* Constructor
@@ -59,11 +60,11 @@ public class LogFunction implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
- Object numberObj = numberParser.parse(sourceData, rowIndex);
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object numberObj = numberParser.parse(sourceData, rowIndex, context);
BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
if (baseParser != null) {
- Object baseObj = baseParser.parse(sourceData, rowIndex);
+ Object baseObj = baseParser.parse(sourceData, rowIndex, context);
BigDecimal baseValue = OperatorTools.parseBigDecimal(baseObj);
return Math.log(numberValue.doubleValue()) /
Math.log(baseValue.doubleValue());
} else {
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java
index 930a09af05..3857f22147 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.function;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import net.sf.jsqlparser.expression.Function;
@@ -42,7 +43,7 @@ public class NowFunction implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
return String.valueOf(System.currentTimeMillis());
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PowerFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PowerFunction.java
index 94835f9d7b..938fc00f6d 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PowerFunction.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PowerFunction.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.function;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
@@ -50,9 +51,9 @@ public class PowerFunction implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
- Object baseObj = baseParser.parse(sourceData, rowIndex);
- Object exponentObj = exponentParser.parse(sourceData, rowIndex);
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object baseObj = baseParser.parse(sourceData, rowIndex, context);
+ Object exponentObj = exponentParser.parse(sourceData, rowIndex,
context);
BigDecimal baseValue = OperatorTools.parseBigDecimal(baseObj);
BigDecimal exponentValue = OperatorTools.parseBigDecimal(exponentObj);
return Math.pow(baseValue.doubleValue(), exponentValue.doubleValue());
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SqrtFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SqrtFunction.java
index f9e277acbd..69cf43041a 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SqrtFunction.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SqrtFunction.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.function;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
@@ -48,8 +49,8 @@ public class SqrtFunction implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
- Object numberObj = numberParser.parse(sourceData, rowIndex);
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object numberObj = numberParser.parse(sourceData, rowIndex, context);
BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
return Math.sqrt(numberValue.doubleValue());
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
index c6464f850d..a9dcd42606 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.operator;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
@@ -27,8 +28,8 @@ import
net.sf.jsqlparser.expression.operators.conditional.AndExpression;
*/
public class AndOperator implements ExpressionOperator {
- private ExpressionOperator left;
- private ExpressionOperator right;
+ private final ExpressionOperator left;
+ private final ExpressionOperator right;
public AndOperator(AndExpression expr) {
this.left = OperatorTools.buildOperator(expr.getLeftExpression());
@@ -42,8 +43,8 @@ public class AndOperator implements ExpressionOperator {
* @return
*/
@Override
- public boolean check(SourceData sourceData, int rowIndex) {
- return left.check(sourceData, rowIndex) && right.check(sourceData,
rowIndex);
+ public boolean check(SourceData sourceData, int rowIndex, Context context)
{
+ return left.check(sourceData, rowIndex, context) &&
right.check(sourceData, rowIndex, context);
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
index 6910e0c9ca..709537e8a0 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.operator;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
@@ -28,8 +29,8 @@ import
net.sf.jsqlparser.expression.operators.relational.EqualsTo;
*/
public class EqualsToOperator implements ExpressionOperator {
- private ValueParser left;
- private ValueParser right;
+ private final ValueParser left;
+ private final ValueParser right;
public EqualsToOperator(EqualsTo expr) {
this.left = OperatorTools.buildParser(expr.getLeftExpression());
@@ -44,9 +45,9 @@ public class EqualsToOperator implements ExpressionOperator {
*/
@SuppressWarnings("rawtypes")
@Override
- public boolean check(SourceData sourceData, int rowIndex) {
- return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex),
- (Comparable) this.right.parse(sourceData, rowIndex)) == 0;
+ public boolean check(SourceData sourceData, int rowIndex, Context context)
{
+ return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex, context),
+ (Comparable) this.right.parse(sourceData, rowIndex, context))
== 0;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java
index b055e841e2..8f874b8de0 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java
@@ -18,11 +18,12 @@
package org.apache.inlong.sdk.transform.process.operator;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
/**
* ExpressionOperator
*/
public interface ExpressionOperator {
- boolean check(SourceData sourceData, int rowIndex);
+ boolean check(SourceData sourceData, int rowIndex, Context context);
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
index eb7689932e..3a53968e10 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.operator;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
@@ -28,8 +29,8 @@ import
net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
*/
public class GreaterThanEqualsOperator implements ExpressionOperator {
- private ValueParser left;
- private ValueParser right;
+ private final ValueParser left;
+ private final ValueParser right;
public GreaterThanEqualsOperator(GreaterThanEquals expr) {
this.left = OperatorTools.buildParser(expr.getLeftExpression());
@@ -44,9 +45,9 @@ public class GreaterThanEqualsOperator implements
ExpressionOperator {
*/
@SuppressWarnings("rawtypes")
@Override
- public boolean check(SourceData sourceData, int rowIndex) {
- return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex),
- (Comparable) this.right.parse(sourceData, rowIndex)) >= 0;
+ public boolean check(SourceData sourceData, int rowIndex, Context context)
{
+ return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex, context),
+ (Comparable) this.right.parse(sourceData, rowIndex, context))
>= 0;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
index e0db44b1e3..a1cd8c2ea2 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.operator;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
@@ -28,8 +29,8 @@ import
net.sf.jsqlparser.expression.operators.relational.GreaterThan;
*/
public class GreaterThanOperator implements ExpressionOperator {
- private ValueParser left;
- private ValueParser right;
+ private final ValueParser left;
+ private final ValueParser right;
public GreaterThanOperator(GreaterThan expr) {
this.left = OperatorTools.buildParser(expr.getLeftExpression());
@@ -44,9 +45,9 @@ public class GreaterThanOperator implements
ExpressionOperator {
*/
@SuppressWarnings("rawtypes")
@Override
- public boolean check(SourceData sourceData, int rowIndex) {
- return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex),
- (Comparable) this.right.parse(sourceData, rowIndex)) > 0;
+ public boolean check(SourceData sourceData, int rowIndex, Context context)
{
+ return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex, context),
+ (Comparable) this.right.parse(sourceData, rowIndex, context))
> 0;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
index 8b3628ddb7..4248cf1d36 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.operator;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
@@ -28,8 +29,8 @@ import
net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
*/
public class MinorThanEqualsOperator implements ExpressionOperator {
- private ValueParser left;
- private ValueParser right;
+ private final ValueParser left;
+ private final ValueParser right;
public MinorThanEqualsOperator(MinorThanEquals expr) {
this.left = OperatorTools.buildParser(expr.getLeftExpression());
@@ -44,9 +45,9 @@ public class MinorThanEqualsOperator implements
ExpressionOperator {
*/
@SuppressWarnings("rawtypes")
@Override
- public boolean check(SourceData sourceData, int rowIndex) {
- return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex),
- (Comparable) this.right.parse(sourceData, rowIndex)) <= 0;
+ public boolean check(SourceData sourceData, int rowIndex, Context context)
{
+ return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex, context),
+ (Comparable) this.right.parse(sourceData, rowIndex, context))
<= 0;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
index 17baa9cb17..21ecc0400a 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.operator;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import net.sf.jsqlparser.expression.operators.relational.MinorThan;
@@ -28,8 +29,8 @@ import
net.sf.jsqlparser.expression.operators.relational.MinorThan;
*/
public class MinorThanOperator implements ExpressionOperator {
- private ValueParser left;
- private ValueParser right;
+ private final ValueParser left;
+ private final ValueParser right;
public MinorThanOperator(MinorThan expr) {
this.left = OperatorTools.buildParser(expr.getLeftExpression());
@@ -44,9 +45,9 @@ public class MinorThanOperator implements ExpressionOperator {
*/
@SuppressWarnings("rawtypes")
@Override
- public boolean check(SourceData sourceData, int rowIndex) {
- return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex),
- (Comparable) this.right.parse(sourceData, rowIndex)) < 0;
+ public boolean check(SourceData sourceData, int rowIndex, Context context)
{
+ return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex, context),
+ (Comparable) this.right.parse(sourceData, rowIndex, context))
< 0;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
index dbe185dec5..98bf102b4f 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.operator;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
@@ -28,8 +29,8 @@ import
net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
*/
public class NotEqualsToOperator implements ExpressionOperator {
- private ValueParser left;
- private ValueParser right;
+ private final ValueParser left;
+ private final ValueParser right;
public NotEqualsToOperator(NotEqualsTo expr) {
this.left = OperatorTools.buildParser(expr.getLeftExpression());
@@ -44,9 +45,9 @@ public class NotEqualsToOperator implements
ExpressionOperator {
*/
@SuppressWarnings("rawtypes")
@Override
- public boolean check(SourceData sourceData, int rowIndex) {
- return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex),
- (Comparable) this.right.parse(sourceData, rowIndex)) != 0;
+ public boolean check(SourceData sourceData, int rowIndex, Context context)
{
+ return OperatorTools.compareValue((Comparable)
this.left.parse(sourceData, rowIndex, context),
+ (Comparable) this.right.parse(sourceData, rowIndex, context))
!= 0;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
index f648d426e7..d8b9ff07e0 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.operator;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import net.sf.jsqlparser.expression.NotExpression;
@@ -27,7 +28,7 @@ import net.sf.jsqlparser.expression.NotExpression;
*/
public class NotOperator implements ExpressionOperator {
- private ExpressionOperator node;
+ private final ExpressionOperator node;
public NotOperator(NotExpression expr) {
this.node = OperatorTools.buildOperator(expr.getExpression());
@@ -40,8 +41,8 @@ public class NotOperator implements ExpressionOperator {
* @return
*/
@Override
- public boolean check(SourceData sourceData, int rowIndex) {
- return !this.node.check(sourceData, rowIndex);
+ public boolean check(SourceData sourceData, int rowIndex, Context context)
{
+ return !this.node.check(sourceData, rowIndex, context);
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
index 33b9f82bdc..b5de7f279e 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.operator;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
@@ -27,8 +28,8 @@ import
net.sf.jsqlparser.expression.operators.conditional.OrExpression;
*/
public class OrOperator implements ExpressionOperator {
- private ExpressionOperator left;
- private ExpressionOperator right;
+ private final ExpressionOperator left;
+ private final ExpressionOperator right;
public OrOperator(OrExpression expr) {
this.left = OperatorTools.buildOperator(expr.getLeftExpression());
@@ -42,8 +43,8 @@ public class OrOperator implements ExpressionOperator {
* @return
*/
@Override
- public boolean check(SourceData sourceData, int rowIndex) {
- return left.check(sourceData, rowIndex) || right.check(sourceData,
rowIndex);
+ public boolean check(SourceData sourceData, int rowIndex, Context context)
{
+ return left.check(sourceData, rowIndex, context) ||
right.check(sourceData, rowIndex, context);
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
index 111f6bbb21..0ca1334fce 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.operator;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import net.sf.jsqlparser.expression.Parenthesis;
@@ -27,7 +28,7 @@ import net.sf.jsqlparser.expression.Parenthesis;
*/
public class ParenthesisOperator implements ExpressionOperator {
- private ExpressionOperator node;
+ private final ExpressionOperator node;
public ParenthesisOperator(Parenthesis expr) {
this.node = OperatorTools.buildOperator(expr.getExpression());
@@ -40,8 +41,8 @@ public class ParenthesisOperator implements
ExpressionOperator {
* @return
*/
@Override
- public boolean check(SourceData sourceData, int rowIndex) {
- return this.node.check(sourceData, rowIndex);
+ public boolean check(SourceData sourceData, int rowIndex, Context context)
{
+ return this.node.check(sourceData, rowIndex, context);
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
index a0f03ab4cd..08474fe81f 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import net.sf.jsqlparser.expression.operators.arithmetic.Addition;
@@ -30,9 +31,9 @@ import java.math.BigDecimal;
*/
public class AdditionParser implements ValueParser {
- private ValueParser left;
+ private final ValueParser left;
- private ValueParser right;
+ private final ValueParser right;
public AdditionParser(Addition expr) {
this.left = OperatorTools.buildParser(expr.getLeftExpression());
@@ -46,9 +47,9 @@ public class AdditionParser implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
- Object leftObj = this.left.parse(sourceData, rowIndex);
- Object rightObj = this.right.parse(sourceData, rowIndex);
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object leftObj = this.left.parse(sourceData, rowIndex, context);
+ Object rightObj = this.right.parse(sourceData, rowIndex, context);
BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
return leftValue.add(rightValue);
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
index afc58b422e..3a5000a57f 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import net.sf.jsqlparser.expression.Function;
import net.sf.jsqlparser.schema.Column;
@@ -28,7 +29,7 @@ import net.sf.jsqlparser.schema.Column;
*/
public class ColumnParser implements ValueParser {
- private String fieldName;
+ private final String fieldName;
public ColumnParser(Column expr) {
this.fieldName = expr.toString();
@@ -45,7 +46,7 @@ public class ColumnParser implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
return sourceData.getField(rowIndex, fieldName);
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java
index 5dc94b6e99..61cf1bb82f 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import net.sf.jsqlparser.expression.operators.arithmetic.Division;
@@ -46,9 +47,9 @@ public class DivisionParser implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
- Object leftObj = this.left.parse(sourceData, rowIndex);
- Object rightObj = this.right.parse(sourceData, rowIndex);
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object leftObj = this.left.parse(sourceData, rowIndex, context);
+ Object rightObj = this.right.parse(sourceData, rowIndex, context);
BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
return leftValue.divide(rightValue);
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java
index efd61cc2cb..7abb8af77c 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import net.sf.jsqlparser.expression.LongValue;
@@ -27,7 +28,7 @@ import net.sf.jsqlparser.expression.LongValue;
*/
public class LongParser implements ValueParser {
- private Long value;
+ private final Long value;
public LongParser(LongValue expr) {
this.value = expr.getValue();
@@ -40,7 +41,7 @@ public class LongParser implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
return value;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java
index 7918b434ac..f7299dcf8c 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import net.sf.jsqlparser.expression.operators.arithmetic.Multiplication;
@@ -30,9 +31,9 @@ import java.math.BigDecimal;
*/
public class MultiplicationParser implements ValueParser {
- private ValueParser left;
+ private final ValueParser left;
- private ValueParser right;
+ private final ValueParser right;
public MultiplicationParser(Multiplication expr) {
this.left = OperatorTools.buildParser(expr.getLeftExpression());
@@ -46,9 +47,9 @@ public class MultiplicationParser implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
- Object leftObj = this.left.parse(sourceData, rowIndex);
- Object rightObj = this.right.parse(sourceData, rowIndex);
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object leftObj = this.left.parse(sourceData, rowIndex, context);
+ Object rightObj = this.right.parse(sourceData, rowIndex, context);
BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
return leftValue.multiply(rightValue);
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java
index 61a2bd1bf3..1899017087 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import net.sf.jsqlparser.expression.Parenthesis;
@@ -28,7 +29,7 @@ import net.sf.jsqlparser.expression.Parenthesis;
*/
public class ParenthesisParser implements ValueParser {
- private ValueParser node;
+ private final ValueParser node;
public ParenthesisParser(Parenthesis expr) {
this.node = OperatorTools.buildParser(expr.getExpression());
@@ -41,7 +42,7 @@ public class ParenthesisParser implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
- return node.parse(sourceData, rowIndex);
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ return node.parse(sourceData, rowIndex, context);
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java
index 9cb431c1fa..9ce0646cc9 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import net.sf.jsqlparser.expression.StringValue;
@@ -27,7 +28,7 @@ import net.sf.jsqlparser.expression.StringValue;
*/
public class StringParser implements ValueParser {
- private String stringValue;
+ private final String stringValue;
public StringParser(StringValue expr) {
this.stringValue = expr.getValue();
@@ -40,7 +41,7 @@ public class StringParser implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
return stringValue;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
index af36c79452..15d534d50f 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction;
@@ -46,9 +47,9 @@ public class SubtractionParser implements ValueParser {
* @return
*/
@Override
- public Object parse(SourceData sourceData, int rowIndex) {
- Object leftObj = this.left.parse(sourceData, rowIndex);
- Object rightObj = this.right.parse(sourceData, rowIndex);
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object leftObj = this.left.parse(sourceData, rowIndex, context);
+ Object rightObj = this.right.parse(sourceData, rowIndex, context);
BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
return leftValue.subtract(rightValue);
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java
index bafafe276c..3b246cfc80 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
/**
* ValueParser
@@ -25,5 +26,5 @@ import org.apache.inlong.sdk.transform.decode.SourceData;
*/
public interface ValueParser {
- Object parse(SourceData sourceData, int rowIndex);
+ Object parse(SourceData sourceData, int rowIndex, Context context);
}