This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 373c3eb846 [INLONG-11961][SDK] Transform supports array index access, 
the WHERE clause supports the LIKE operator, and the str_to_json function 
converts KV-format data into JSON format (#11962)
373c3eb846 is described below

commit 373c3eb846ffa1f2e3c13329cd79b5f2bf65e5e4
Author: ChunLiang Lu <[email protected]>
AuthorDate: Fri Aug 8 19:18:26 2025 +0800

    [INLONG-11961][SDK] Transform supports array index access, the WHERE clause 
supports the LIKE operator, and the str_to_json function converts KV-format 
data into JSON format (#11962)
---
 .../process/function/string/StrToJsonFunction.java | 116 ++++++++++++++++++++
 .../transform/process/operator/LikeOperator.java   | 117 +++++++++++++++++++++
 .../sdk/transform/process/parser/ArrayParser.java  |  56 ++++++++++
 .../process/processor/TestCsv2KvProcessor.java     |  57 ++++++++++
 4 files changed, 346 insertions(+)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/StrToJsonFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/StrToJsonFunction.java
new file mode 100644
index 0000000000..26c8d5df1a
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/StrToJsonFunction.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.string;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.function.FunctionConstant;
+import org.apache.inlong.sdk.transform.process.function.TransformFunction;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import com.google.gson.JsonObject;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.List;
+/**
+ * StrToJsonFunction  ->  str_to_json(str, pairDelimiter, keyValueDelimiter)
+ * description:
+ * - Return NULL if 'str' is NULL
+ * - Return a json string after splitting the 'str' into key/value pairs using 
'pairDelimiter'(default is ',')
+ *          and 'keyValueDelimiter'(default is '=')
+ * Note: Both 'pairDelimiter' and 'keyValueDelimiter' are treated as regular 
expressions.So special characters
+ *       (e.g. <([{^-=$!|]})?*+.>) need to be properly escaped before using as 
a delimiter literally.
+ */
+@TransformFunction(type = FunctionConstant.STRING_TYPE, names = {
+        "str_to_json"}, parameter = "(String s1, String pairDelimiter, String 
keyValueDelimiter)", descriptions = {
+                "- Return \"\" if 'str' is NULL;",
+                "- Return a json string after splitting the 'str' into 
key/value pairs using 'pairDelimiter'(default is ',') "
+                        +
+                        "and 'keyValueDelimiter'(default is '=');",
+                "Note: Both 'pairDelimiter' and 'keyValueDelimiter' are 
treated as regular expressions.So special " +
+                        "characters(e.g. <([{^-=$!|]})?*+.>) need to be 
properly escaped before using as a delimiter literally."
+        }, examples = {
+                "str_to_json('key1=value1,key2=value2,key3=value3') = 
{\"key1\":\"value1\",\"key2\":\"value2\",\"key3\"=\"value3\"}",
+                "str_to_json(\"name->John!age->30!city->China\" , \"!\" , 
\"->\") = {\"name\":\"John\",\"age\":\"30\",\"city\":\"China\"}"
+        })
+public class StrToJsonFunction implements ValueParser {
+
+    private ValueParser inputParser;
+
+    private ValueParser pairDelimiterParser;
+
+    private ValueParser kvDelimiterParser;
+
+    public StrToJsonFunction(Function expr) {
+        List<Expression> expressions = expr.getParameters().getExpressions();
+        if (!expressions.isEmpty()) {
+            inputParser = OperatorTools.buildParser(expressions.get(0));
+            if (expressions.size() >= 2) {
+                pairDelimiterParser = 
OperatorTools.buildParser(expressions.get(1));
+                if (expressions.size() >= 3) {
+                    kvDelimiterParser = 
OperatorTools.buildParser(expressions.get(2));
+                }
+            }
+        }
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object inputStringObj = inputParser.parse(sourceData, rowIndex, 
context);
+        Object pairDelimiterStringObj = null;
+        String pairDelimiterString = null;
+        if (pairDelimiterParser != null) {
+            pairDelimiterStringObj = pairDelimiterParser.parse(sourceData, 
rowIndex, context);
+            pairDelimiterString = 
OperatorTools.parseString(pairDelimiterStringObj);
+        }
+        Object kvDelimiterStringObj = null;
+        String kvDelimiterString = null;
+        if (kvDelimiterParser != null) {
+            kvDelimiterStringObj = kvDelimiterParser.parse(sourceData, 
rowIndex, context);
+            kvDelimiterString = 
OperatorTools.parseString(kvDelimiterStringObj);
+        }
+        String inputString = OperatorTools.parseString(inputStringObj);
+
+        return parse2Json(pairDelimiterString, kvDelimiterString, inputString);
+    }
+
+    private JsonObject parse2Json(String pairDelimiterString, String 
kvDelimiterString,
+            String inputString) {
+        String pairDelimiter =
+                (pairDelimiterString == null || pairDelimiterString.isEmpty()) 
? "," : escapeRegex(pairDelimiterString);
+        String keyValueDelimiter =
+                (kvDelimiterString == null || kvDelimiterString.isEmpty()) ? 
"=" : escapeRegex(kvDelimiterString);
+
+        JsonObject json = new JsonObject();
+        String[] pairs = inputString.split(pairDelimiter);
+
+        for (String pair : pairs) {
+            if (pair.contains(keyValueDelimiter)) {
+                String[] keyValue = pair.split(keyValueDelimiter, 2);
+                json.addProperty(keyValue[0], keyValue[1]);
+            }
+        }
+        return json;
+    }
+
+    private String escapeRegex(String delimiter) {
+        return delimiter.replaceAll("([\\\\^$|?*+\\[\\](){}])", "\\\\$1");
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/LikeOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/LikeOperator.java
new file mode 100644
index 0000000000..d48ee38869
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/LikeOperator.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.common.util.StringUtil;
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import lombok.extern.slf4j.Slf4j;
+import net.sf.jsqlparser.expression.operators.relational.LikeExpression;
+
+import java.util.regex.Pattern;
+
+/**
+ * LikeOperator
+ * 
+ */
+@Slf4j
+@TransformOperator(values = LikeExpression.class)
+public class LikeOperator implements ExpressionOperator {
+
+    private final ValueParser destParser;
+    private final ValueParser patternParser;
+    private final ValueParser escapeParser;
+    private final boolean isNot;
+    private static final String REGEX_SPECIAL_CHAR = "[]()|^-+*?{}$\\.";
+
+    public LikeOperator(LikeExpression expr) {
+        destParser = OperatorTools.buildParser(expr.getLeftExpression());
+        patternParser = OperatorTools.buildParser(expr.getRightExpression());
+        escapeParser = OperatorTools.buildParser(expr.getEscape());
+        isNot = expr.isNot();
+    }
+
+    private String buildLikeRegex(String pattern, char escapeChar) {
+        int len = pattern.length();
+        StringBuilder regexPattern = new StringBuilder(len + len);
+        for (int i = 0; i < len; i++) {
+            char c = pattern.charAt(i);
+            if (REGEX_SPECIAL_CHAR.indexOf(c) >= 0) {
+                regexPattern.append('\\');
+            }
+            if (c == escapeChar) {
+                if (i == (pattern.length() - 1)) {
+                    // At the end of a string, the escape character represents 
itself
+                    regexPattern.append(c);
+                    continue;
+                }
+                char nextChar = pattern.charAt(i + 1);
+                if (nextChar == '_' || nextChar == '%' || nextChar == 
escapeChar) {
+                    regexPattern.append(nextChar);
+                    i++;
+                } else {
+                    throw new RuntimeException("Illegal pattern string");
+                }
+            } else if (c == '_') {
+                regexPattern.append('.');
+            } else if (c == '%') {
+                regexPattern.append("(?s:.*)");
+            } else {
+                regexPattern.append(c);
+            }
+        }
+        return regexPattern.toString();
+    }
+
+    /**
+     * check
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @Override
+    public boolean check(SourceData sourceData, int rowIndex, Context context) 
{
+        Object destObj = destParser.parse(sourceData, rowIndex, context);
+        Object patternObj = patternParser.parse(sourceData, rowIndex, context);
+        if (destObj == null || patternObj == null) {
+            return false;
+        }
+        char escapeChr = '\\';
+        if (escapeParser != null) {
+            Object escapeObj = this.escapeParser.parse(sourceData, rowIndex, 
context);
+            if (!StringUtil.isEmpty(escapeObj)) {
+                escapeChr = escapeObj.toString().charAt(0);
+            }
+        }
+        String destStr = destObj.toString();
+        String pattern = patternObj.toString();
+        try {
+            final String regex = buildLikeRegex(pattern, escapeChr);
+            boolean isMatch = Pattern.matches(regex.toLowerCase(), 
destStr.toLowerCase());
+            if (isNot) {
+                return !isMatch;
+            }
+            return isMatch;
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            return false;
+        }
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ArrayParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ArrayParser.java
new file mode 100644
index 0000000000..9571b0a16a
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ArrayParser.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+
+import net.sf.jsqlparser.expression.ArrayExpression;
+
+import java.util.List;
+
+/**
+ * ArrayParser
+ * Description: Support to get the value from array
+ */
+@TransformParser(values = ArrayExpression.class)
+public class ArrayParser implements ValueParser {
+
+    private final ValueParser left;
+
+    private final ValueParser right;
+
+    public ArrayParser(ArrayExpression expr) {
+        this.left = OperatorTools.buildParser(expr.getObjExpression());
+        this.right = OperatorTools.buildParser(expr.getIndexExpression());
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object leftValue = this.left.parse(sourceData, rowIndex, context);
+        Object rightValue = this.right.parse(sourceData, rowIndex, context);
+
+        if (leftValue instanceof List<?> && rightValue instanceof Number) {
+            List<?> leftObj = (List<?>) leftValue;
+            Number rightObj = (Number) rightValue;
+            return leftObj.get(rightObj.intValue());
+        }
+        return null;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
index 2b57409714..ad2ee5f3bc 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.transform.process.processor;
 
 import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
 import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
 import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
 import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
@@ -82,4 +83,60 @@ public class TestCsv2KvProcessor extends 
AbstractProcessorTestBase {
         List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok", 
new HashMap<>());
         Assert.assertEquals(0, output2.size());
     }
+
+    @Test
+    public void testCsv2CsvSplit() throws Exception {
+        List<FieldInfo> sourceFields = this.getTestFieldList("ftime", 
"extinfo", "country", "province", "operator",
+                "apn", "gw", "src_ip_head", "info_str", "product_id", 
"app_version", "sdk_id", "sdk_version",
+                "hardware_os", "qua", "upload_ip", "client_ip", "upload_apn", 
"event_code", "event_result",
+                "package_size", "consume_time", "event_value", "event_time", 
"upload_time");
+        List<FieldInfo> sinkFields = this.getTestFieldList("imp_hour", 
"ftime", "event_code", "event_time", "log_id",
+                "qimei36", "platform", "hardware_os", "os_version", "brand", 
"model", "country", "province", "city",
+                "network_type", "dt_qq", "app_version", "boundle_id", 
"dt_usid", "dt_pgid", "dt_ref_pgid", "dt_eid",
+                "dt_element_lvtm", "dt_lvtm", "product_id", "biz_pub_params", 
"udf_kv", "sdk_type", "app_version_num");
+        CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\', 
sourceFields);
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', sinkFields);
+        String transformSql = "select replace(substr(ftime,1,10),'-','') as 
imp_hour,"
+                + "ftime as ftime,event_code as event_code,"
+                + "event_time as event_time,"
+                + "parse_url(url_decode(event_value,'GBK'),'QUERY','A100') as 
log_id,"
+                + "parse_url(url_decode(event_value,'GBK'),'QUERY','A153') as 
qimei36,"
+                + "case when lower(url_decode(hardware_os,'GBK')) like 
'%android%' then 'android' when lower(url_decode(hardware_os,'GBK')) like 
'%ipad%' then 'ipad' when lower(url_decode(hardware_os,'GBK')) like '%iphone%' 
then 'iphone' when lower(url_decode(hardware_os,'GBK')) like '%harmony%' then 
'harmony' when lower(url_decode(hardware_os,'GBK')) like '%windows%' then 
'windows' when lower(url_decode(hardware_os,'GBK')) like '%mac%' then 'mac' 
when lower(url_decode(hardware_os,'GBK') [...]
+                + "url_decode(hardware_os,'GBK') as hardware_os,"
+                + "trim(case when hardware_os LIKE '%Android%' then 
regexp_extract(url_decode(hardware_os,'GBK'), 'Android(.+),level', 1) when 
hardware_os LIKE '%iPhone%' then regexp_extract(url_decode(hardware_os,'GBK'), 
'OS(.+)\\\\(', 1) when hardware_os LIKE '%Harmony%' then 
regexp_extract(url_decode(hardware_os,'GBK'), 
'Harmony\\\\s+[^\\\\s]+\\\\s+([^\\\\s]+)\\\\(', 1) else 'unknown' end) as 
os_version,"
+                + "parse_url(url_decode(event_value,'GBK'),'QUERY','A9') as 
brand,"
+                + "parse_url(url_decode(event_value,'GBK'),'QUERY','A10') as 
model,"
+                + "country as country,"
+                + "province as province,"
+                + "parse_url(url_decode(event_value,'GBK'),'QUERY','A160') as 
city,"
+                + "parse_url(url_decode(event_value,'GBK'),'QUERY','A19') as 
network_type,"
+                + "parse_url(url_decode(event_value,'GBK'),'QUERY','dt_qq') as 
dt_qq,"
+                + "url_decode(app_version,'GBK') as app_version,"
+                + "parse_url(url_decode(event_value,'GBK'),'QUERY','A67') as 
boundle_id,"
+                + "parse_url(url_decode(event_value,'GBK'),'QUERY','dt_usid') 
as dt_usid,"
+                + "parse_url(url_decode(event_value,'GBK'),'QUERY','dt_pgid') 
as dt_pgid,"
+                + 
"parse_url(url_decode(event_value,'GBK'),'QUERY','dt_ref_pgid') as dt_ref_pgid,"
+                + "parse_url(url_decode(event_value,'GBK'),'QUERY','dt_eid') 
as dt_eid,"
+                + 
"parse_url(url_decode(event_value,'GBK'),'QUERY','dt_element_lvtm') as 
dt_element_lvtm,"
+                + "parse_url(url_decode(event_value,'GBK'),'QUERY','dt_lvtm') 
as dt_lvtm,"
+                + "product_id as product_id,"
+                + 
"json_remove(str_to_json(url_decode(event_value,'GBK'),'&','='),'udf_kv') as 
biz_pub_params,"
+                + "parse_url(url_decode(event_value,'GBK'),'QUERY','udf_kv') 
as udf_kv,"
+                + "case when sdk_id='js' then 1 when sdk_id='weapp' then 2 
else 0 end as sdk_type,"
+                + 
"split_index(app_version,'\\.',0)*1000+split_index(app_version,'\\.',1)*100+split_index(split_index(app_version,'\\.',2),'\\(',0)
 as app_version_num "
+                + "from source where 
parse_url(url_decode(event_value,'GBK'),'QUERY','dt_pgid') like 'pg_sgrp_%'";
+        System.out.println(transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1
+        TransformProcessor<String, String> processor1 = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
+        String sourceData =
+                "2025-01-01 
01:01:01.001|extinfo=127.0.0.1|china|guangdong|unite|unknown|unknown|127.0.0.1 
2025-01-01 
01:01:01.001|INFO|MNJT|1.2.0.12345|js|1.2.3.4-qqvideo6|PJV110%3BAndroid+15%2Clevel+35||127.0.0.1|127.0.0.1|wifi|dt_imp|true|0|0|A9%3DOPPO%26A89%3D12345678%26A76%3D1.2.3.4%26A58%3DN%26A52%3D480%26A17%3D1080*2244%26A12%3Dzh%26A10%3DPJV110%26A158%3D12345678%26A67%3Dmobileapp%26A159%3DN%26A31%3D%2C%2C%26A160%3Dshenzhen%26ui_vrsn%3DPJV%28CN01%29%26udf_kv%3D%7B%22eid%22%3A%22se
 [...]
+        List<String> output1 = processor1.transform(sourceData, new 
HashMap<>());
+        Assert.assertEquals(1, output1.size());
+        System.out.println(output1.get(0));
+        Assert.assertEquals(output1.get(0),
+                "20250101|2025-01-01 01:01:01.001|dt_imp|2025-01-01 
01:01:01.001|12345678|123456|android|PJV110;Android 15,level 
35|15|OPPO|PJV110|china|guangdong|shenzhen|wifi|12345678|1.2.0.12345|mobileapp|12345678|pg_sgrp_test||search|||MNJT|{\"A88\":\"12345678\",\"A89\":\"12345678\",\"A48\":\"\",\"dt_wxopenid\":\"\",\"dt_seqtime\":\"12345678\",\"app_bld\":\"12345678\",\"A100\":\"12345678\",\"dt_fchlid\":\"\",\"A1\":\"12345678\",\"os_vrsn\":\"Android
 15\",\"A3\":\"12345678\",\"dt_mchl [...]
+    }
 }

Reply via email to