This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9a79697b07 [INLONG-12019][SDK] Transformation supports a caching
mechanism for processing identical function parameters (#12020)
9a79697b07 is described below
commit 9a79697b07cd62c48b83b010358e23235308fecf
Author: ChunLiang Lu <[email protected]>
AuthorDate: Fri Oct 17 10:52:52 2025 +0800
[INLONG-12019][SDK] Transformation supports a caching mechanism for
processing identical function parameters (#12020)
---
.../inlong/sdk/transform/process/Context.java | 3 ++
.../sdk/transform/process/TransformProcessor.java | 1 +
.../process/function/string/ParseUrlFunction.java | 40 ++++++++++++---
.../process/function/string/UrlDecodeFunction.java | 20 +++++++-
.../process/processor/TestCsv2KvProcessor.java | 59 ++++++++++++++++++++++
5 files changed, 114 insertions(+), 9 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java
index 6cfdd4f160..02e82ffbea 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java
@@ -89,4 +89,7 @@ public class Context {
return null;
}
+ public Map<String, Object> getRuntimeParams() {
+ return runtimeParams;
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index 13cc17010c..e6392bbe2d 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -180,6 +180,7 @@ public class TransformProcessor<I, O> {
sinkData.addField(fieldName, "");
} else {
sinkData.addField(fieldName, fieldValue.toString());
+ context.put(fieldName, fieldValue);
}
} catch (Throwable t) {
sinkData.addField(fieldName, "");
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ParseUrlFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ParseUrlFunction.java
index e853c59271..501522d694 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ParseUrlFunction.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ParseUrlFunction.java
@@ -57,17 +57,24 @@ public class ParseUrlFunction implements ValueParser {
private ValueParser urlParser;
private ValueParser partParser;
private ValueParser keyParser;
+ private final String exprKey;
public ParseUrlFunction(Function expr) {
List<Expression> params = expr.getParameters().getExpressions();
urlParser = OperatorTools.buildParser(params.get(0));
partParser = params.size() > 1 ?
OperatorTools.buildParser(params.get(1)) : null;
keyParser = params.size() > 2 ?
OperatorTools.buildParser(params.get(2)) : null;
+ exprKey = expr.toString();
}
@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Map<String, Object> runtimeParams = context.getRuntimeParams();
+ if (runtimeParams.containsKey(exprKey)) {
+ return runtimeParams.get(exprKey);
+ }
if (urlParser == null || partParser == null) {
+ runtimeParams.put(exprKey, null);
return null;
}
Object urlObj = urlParser.parse(sourceData, rowIndex, context);
@@ -75,6 +82,7 @@ public class ParseUrlFunction implements ValueParser {
Object keyObj = keyParser != null ? keyParser.parse(sourceData,
rowIndex, context) : null;
if (urlObj == null || partObj == null) {
+ runtimeParams.put(exprKey, null);
return null;
}
@@ -82,6 +90,7 @@ public class ParseUrlFunction implements ValueParser {
String part = OperatorTools.parseString(partObj);
String key = keyObj != null ? OperatorTools.parseString(keyObj) : null;
if (keyParser != null && key == null) {
+ runtimeParams.put(exprKey, null);
return null;
}
@@ -95,6 +104,7 @@ public class ParseUrlFunction implements ValueParser {
}
Map<String, String> queryPairs = splitQuery(strQuery);
if (key == null) {
+ runtimeParams.put(exprKey, strQuery);
return strQuery;
}
return queryPairs.getOrDefault(key, "");
@@ -103,23 +113,39 @@ public class ParseUrlFunction implements ValueParser {
URL netUrl = new URL(url);
switch (part) {
case "HOST":
- return netUrl.getHost();
+ String exprValue = netUrl.getHost();
+ runtimeParams.put(exprKey, exprValue);
+ return exprValue;
case "PATH":
- return netUrl.getPath();
+ exprValue = netUrl.getPath();
+ runtimeParams.put(exprKey, exprValue);
+ return exprValue;
case "REF":
- return netUrl.getRef();
+ exprValue = netUrl.getRef();
+ runtimeParams.put(exprKey, exprValue);
+ return exprValue;
case "PROTOCOL":
- return netUrl.getProtocol();
+ exprValue = netUrl.getProtocol();
+ runtimeParams.put(exprKey, exprValue);
+ return exprValue;
case "AUTHORITY":
- return netUrl.getAuthority();
+ exprValue = netUrl.getAuthority();
+ runtimeParams.put(exprKey, exprValue);
+ return exprValue;
case "FILE":
- return netUrl.getFile();
+ exprValue = netUrl.getFile();
+ runtimeParams.put(exprKey, exprValue);
+ return exprValue;
case "USERINFO":
- return netUrl.getUserInfo();
+ exprValue = netUrl.getUserInfo();
+ runtimeParams.put(exprKey, exprValue);
+ return exprValue;
default:
+ runtimeParams.put(exprKey, null);
return null;
}
} catch (MalformedURLException e) {
+ runtimeParams.put(exprKey, null);
return null;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/UrlDecodeFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/UrlDecodeFunction.java
index 9daf27d02f..3be28bad92 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/UrlDecodeFunction.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/UrlDecodeFunction.java
@@ -30,6 +30,7 @@ import net.sf.jsqlparser.expression.Function;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.Map;
/**
* UrlDecodeFunction -> url_decode(str[, charset])
@@ -53,39 +54,54 @@ public class UrlDecodeFunction implements ValueParser {
private final ValueParser stringParser;
private final ValueParser charsetParser;
+ private final String exprKey;
public UrlDecodeFunction(Function expr) {
List<Expression> params = expr.getParameters().getExpressions();
stringParser = OperatorTools.buildParser(params.get(0));
charsetParser = params.size() > 1 ?
OperatorTools.buildParser(params.get(1)) : null;
+ exprKey = expr.toString();
}
@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Map<String, Object> runtimeParams = context.getRuntimeParams();
+ if (runtimeParams.containsKey(exprKey)) {
+ return runtimeParams.get(exprKey);
+ }
Object stringObj = stringParser.parse(sourceData, rowIndex, context);
if (stringObj == null) {
+ runtimeParams.put(exprKey, null);
return null;
}
String string = OperatorTools.parseString(stringObj);
if (string == null) {
+ runtimeParams.put(exprKey, null);
return null;
}
try {
if (charsetParser == null) {
- return URLDecoder.decode(string,
StandardCharsets.UTF_8.toString());
+ String exprValue = URLDecoder.decode(string,
StandardCharsets.UTF_8.toString());
+ runtimeParams.put(exprKey, exprValue);
+ return exprValue;
} else {
Object charsetObj = charsetParser.parse(sourceData, rowIndex,
context);
if (charsetObj == null) {
+ runtimeParams.put(exprKey, null);
return null;
}
String charset = OperatorTools.parseString(charsetObj);
if (charset == null) {
+ runtimeParams.put(exprKey, null);
return null;
}
- return URLDecoder.decode(string, charset);
+ String exprValue = URLDecoder.decode(string, charset);
+ runtimeParams.put(exprKey, exprValue);
+ return exprValue;
}
} catch (Exception e) {
+ runtimeParams.put(exprKey, null);
return null;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
index ad2ee5f3bc..7598edf497 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
@@ -139,4 +139,63 @@ public class TestCsv2KvProcessor extends
AbstractProcessorTestBase {
Assert.assertEquals(output1.get(0),
"20250101|2025-01-01 01:01:01.001|dt_imp|2025-01-01
01:01:01.001|12345678|123456|android|PJV110;Android 15,level
35|15|OPPO|PJV110|china|guangdong|shenzhen|wifi|12345678|1.2.0.12345|mobileapp|12345678|pg_sgrp_test||search|||MNJT|{\"A88\":\"12345678\",\"A89\":\"12345678\",\"A48\":\"\",\"dt_wxopenid\":\"\",\"dt_seqtime\":\"12345678\",\"app_bld\":\"12345678\",\"A100\":\"12345678\",\"dt_fchlid\":\"\",\"A1\":\"12345678\",\"os_vrsn\":\"Android
15\",\"A3\":\"12345678\",\"dt_mchl [...]
}
+
+ @Test
+ public void testCsv2CsvRuntimesMap() throws Exception {
+ List<FieldInfo> sourceFields = this.getTestFieldList("ftime",
"extinfo", "country", "province", "operator",
+ "apn", "gw", "src_ip_head", "info_str", "product_id",
"app_version", "sdk_id", "sdk_version",
+ "hardware_os", "qua", "upload_ip", "client_ip", "upload_apn",
"event_code", "event_result",
+ "package_size", "consume_time", "event_value", "event_time",
"upload_time");
+ List<FieldInfo> sinkFields = this.getTestFieldList("imp_hour",
"ftime", "event_code", "event_time", "log_id",
+ "qimei36", "platform", "hardware_os", "os_version", "brand",
"model", "country", "province", "city",
+ "network_type", "dt_qq", "app_version", "boundle_id",
"dt_usid", "dt_pgid", "dt_ref_pgid", "dt_eid",
+ "dt_element_lvtm", "dt_lvtm", "product_id", "biz_pub_params",
"udf_kv", "sdk_type", "app_version_num");
+ CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\',
sourceFields);
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', sinkFields);
+ String transformSql = "select replace(substr(ftime,1,10),'-','') as
imp_hour,"
+ + "url_decode(event_value,'GBK') as decode_event_value,"
+ + "url_decode(hardware_os,'GBK') as decode_hardvalue_os,"
+ + "lower($ctx.decode_hardvalue_os) as lower_hardvalue_os,"
+ + "ftime as ftime,event_code as event_code,"
+ + "event_time as event_time,"
+ + "parse_url($ctx.decode_event_value,'QUERY','A100') as
log_id,"
+ + "parse_url($ctx.decode_event_value,'QUERY','A153') as
qimei36,"
+ + "case when $ctx.lower_hardvalue_os like '%android%' then
'android' when $ctx.lower_hardvalue_os like '%ipad%' then 'ipad' when
$ctx.lower_hardvalue_os like '%iphone%' then 'iphone' when
$ctx.lower_hardvalue_os like '%harmony%' then 'harmony' when
$ctx.lower_hardvalue_os like '%windows%' then 'windows' when
$ctx.lower_hardvalue_os like '%mac%' then 'mac' when $ctx.lower_hardvalue_os
like '%linux%' then 'linux' else 'unknown' end as platform,"
+ + "$ctx.decode_hardvalue_os as hardware_os,"
+ + "trim(case when hardware_os LIKE '%Android%' then
regexp_extract($ctx.decode_hardvalue_os, 'Android(.+),level', 1) when
hardware_os LIKE '%iPhone%' then regexp_extract($ctx.decode_hardvalue_os,
'OS(.+)\\\\(', 1) when hardware_os LIKE '%Harmony%' then
regexp_extract($ctx.decode_hardvalue_os,
'Harmony\\\\s+[^\\\\s]+\\\\s+([^\\\\s]+)\\\\(', 1) else 'unknown' end) as
os_version,"
+ + "parse_url($ctx.decode_event_value,'QUERY','A9') as brand,"
+ + "parse_url($ctx.decode_event_value,'QUERY','A10') as model,"
+ + "country as country,"
+ + "province as province,"
+ + "parse_url($ctx.decode_event_value,'QUERY','A160') as city,"
+ + "parse_url($ctx.decode_event_value,'QUERY','A19') as
network_type,"
+ + "parse_url($ctx.decode_event_value,'QUERY','dt_qq') as
dt_qq,"
+ + "url_decode(app_version,'GBK') as app_version,"
+ + "parse_url($ctx.decode_event_value,'QUERY','A67') as
boundle_id,"
+ + "parse_url($ctx.decode_event_value,'QUERY','dt_usid') as
dt_usid,"
+ + "parse_url($ctx.decode_event_value,'QUERY','dt_pgid') as
dt_pgid,"
+ + "parse_url($ctx.decode_event_value,'QUERY','dt_ref_pgid') as
dt_ref_pgid,"
+ + "parse_url($ctx.decode_event_value,'QUERY','dt_eid') as
dt_eid,"
+ +
"parse_url($ctx.decode_event_value,'QUERY','dt_element_lvtm') as
dt_element_lvtm,"
+ + "parse_url($ctx.decode_event_value,'QUERY','dt_lvtm') as
dt_lvtm,"
+ + "product_id as product_id,"
+ +
"json_remove(str_to_json($ctx.decode_event_value,'&','='),'udf_kv') as
biz_pub_params,"
+ + "parse_url($ctx.decode_event_value,'QUERY','udf_kv') as
udf_kv,"
+ + "case when sdk_id='js' then 1 when sdk_id='weapp' then 2
else 0 end as sdk_type,"
+ +
"split_index(app_version,'\\.',0)*1000+split_index(app_version,'\\.',1)*100+split_index(split_index(app_version,'\\.',2),'\\(',0)
as app_version_num "
+ + "from source where
parse_url(url_decode(event_value,'GBK'),'QUERY','dt_pgid') like 'pg_sgrp_%'";
+ System.out.println(transformSql);
+ TransformConfig config = new TransformConfig(transformSql, new
HashMap<>(), false, true);
+ // case1
+ TransformProcessor<String, String> processor1 = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
+ String sourceData =
+ "2025-01-01
01:01:01.001|extinfo=127.0.0.1|china|guangdong|unite|unknown|unknown|127.0.0.1
2025-01-01
01:01:01.001|INFO|MNJT|1.2.0.12345|js|1.2.3.4-qqvideo6|PJV110%3BAndroid+15%2Clevel+35||127.0.0.1|127.0.0.1|wifi|dt_imp|true|0|0|A9%3DOPPO%26A89%3D12345678%26A76%3D1.2.3.4%26A58%3DN%26A52%3D480%26A17%3D1080*2244%26A12%3Dzh%26A10%3DPJV110%26A158%3D12345678%26A67%3Dmobileapp%26A159%3DN%26A31%3D%2C%2C%26A160%3Dshenzhen%26ui_vrsn%3DPJV%28CN01%29%26udf_kv%3D%7B%22eid%22%3A%22se
[...]
+ List<String> output1 = processor1.transform(sourceData, new
HashMap<>());
+ Assert.assertEquals(1, output1.size());
+ System.out.println(output1.get(0));
+ Assert.assertEquals(output1.get(0),
+ "20250101|2025-01-01 01:01:01.001|dt_imp|2025-01-01
01:01:01.001|12345678|123456|android|PJV110;Android 15,level
35|15|OPPO|PJV110|china|guangdong|shenzhen|wifi|12345678|1.2.0.12345|mobileapp|12345678|pg_sgrp_test||search|||MNJT|{\"A88\":\"12345678\",\"A89\":\"12345678\",\"A48\":\"\",\"dt_wxopenid\":\"\",\"dt_seqtime\":\"12345678\",\"app_bld\":\"12345678\",\"A100\":\"12345678\",\"dt_fchlid\":\"\",\"A1\":\"12345678\",\"os_vrsn\":\"Android
15\",\"A3\":\"12345678\",\"dt_mchl [...]
+ }
}