This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new edf93bd547 [INLONG-10999][SDK] Support to return raw data by star sign
in transformer SQL (#11004)
edf93bd547 is described below
commit edf93bd547a09df6b1d7a6d57ea3d719d9f63f4f
Author: ChunLiang Lu <[email protected]>
AuthorDate: Tue Sep 3 21:49:46 2024 +0800
[INLONG-10999][SDK] Support to return raw data by star sign in transformer
SQL (#11004)
* [INLONG-10999][SDK] Support to return raw data by star sign in
transformer SQL
* add more UT Case
* fix code format problems
* fix pom.xml problem
---
.../sdk/transform/encode/CsvSinkEncoder.java | 6 +-
.../inlong/sdk/transform/encode/KvSinkEncoder.java | 6 +-
.../inlong/sdk/transform/encode/SinkEncoder.java | 2 +
.../sdk/transform/process/TransformProcessor.java | 34 +++++++---
.../ValueParserNode.java} | 19 +++---
.../transform/process/TestTransformProcessor.java | 76 ++++++++++++++++++++++
6 files changed, 122 insertions(+), 21 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
index ce47a0072c..89f6f364a0 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
@@ -66,7 +66,11 @@ public class CsvSinkEncoder implements SinkEncoder<String> {
} else {
for (String fieldName : sinkData.keyList()) {
String fieldValue = sinkData.getField(fieldName);
- EscapeUtils.escapeContent(builder, delimiter, escapeChar,
fieldValue);
+ if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) {
+ builder.append(fieldValue);
+ } else {
+ EscapeUtils.escapeContent(builder, delimiter,
escapeChar, fieldValue);
+ }
builder.append(delimiter);
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
index 7460ec95c2..2822374c41 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
@@ -63,7 +63,11 @@ public class KvSinkEncoder implements SinkEncoder<String> {
if (fields == null || fields.size() == 0) {
for (String fieldName : sinkData.keyList()) {
String fieldValue = sinkData.getField(fieldName);
-
builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter);
+ if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) {
+ builder.append(fieldValue).append(entryDelimiter);
+ } else {
+
builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter);
+ }
}
} else {
for (FieldInfo field : fields) {
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
index 7f845a99d6..a63f970295 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
@@ -27,6 +27,8 @@ import java.util.List;
*/
public interface SinkEncoder<Output> {
+ public static final String ALL_SOURCE_FIELD_SIGN = "*";
+
Output encode(SinkData sinkData, Context context);
List<FieldInfo> getFields();
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index 9944268dda..acb7e62e07 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -31,23 +31,23 @@ import
org.apache.inlong.sdk.transform.process.parser.ValueParser;
import com.google.common.collect.ImmutableMap;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserManager;
+import net.sf.jsqlparser.statement.select.AllColumns;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.StringReader;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
/**
* TransformProcessor
- *
+ *
*/
public class TransformProcessor<I, O> {
@@ -61,7 +61,9 @@ public class TransformProcessor<I, O> {
private PlainSelect transformSelect;
private ExpressionOperator where;
- private Map<String, ValueParser> selectItemMap;
+ private List<ValueParserNode> selectItems;
+
+ private boolean includeAllSourceFields = false;
public static <I, O> TransformProcessor<I, O> create(
TransformConfig config,
@@ -91,7 +93,7 @@ public class TransformProcessor<I, O> {
this.transformSelect = (PlainSelect) select.getSelectBody();
this.where =
OperatorTools.buildOperator(this.transformSelect.getWhere());
List<SelectItem> items = this.transformSelect.getSelectItems();
- this.selectItemMap = new HashMap<>(items.size());
+ this.selectItems = new ArrayList<>(items.size());
List<FieldInfo> fields = this.encoder.getFields();
for (int i = 0; i < items.size(); i++) {
SelectItem item = items.get(i);
@@ -108,8 +110,12 @@ public class TransformProcessor<I, O> {
fieldName = exprItem.getAlias().getName();
}
}
- this.selectItemMap.put(fieldName,
- OperatorTools.buildParser(exprItem.getExpression()));
+ this.selectItems
+ .add(new ValueParserNode(fieldName,
OperatorTools.buildParser(exprItem.getExpression())));
+ } else if (item instanceof AllColumns) {
+ fieldName = item.toString();
+ this.encoder.getFields().clear();
+ this.selectItems.add(new ValueParserNode(fieldName, null));
}
}
}
@@ -137,10 +143,18 @@ public class TransformProcessor<I, O> {
// parse value
SinkData sinkData = new DefaultSinkData();
- for (Entry<String, ValueParser> entry :
this.selectItemMap.entrySet()) {
- String fieldName = entry.getKey();
+ for (ValueParserNode node : this.selectItems) {
+ String fieldName = node.getFieldName();
+ ValueParser parser = node.getParser();
+ if (parser == null && StringUtils.equals(fieldName,
SinkEncoder.ALL_SOURCE_FIELD_SIGN)) {
+ if (input instanceof String) {
+ sinkData.addField(fieldName, (String) input);
+ } else {
+ sinkData.addField(fieldName, "");
+ }
+ continue;
+ }
try {
- ValueParser parser = entry.getValue();
Object fieldValue = parser.parse(sourceData, i, context);
sinkData.addField(fieldName, String.valueOf(fieldValue));
} catch (Throwable t) {
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java
similarity index 71%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java
index 7f845a99d6..e36c0c9c6a 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java
@@ -15,19 +15,20 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.transform.encode;
+package org.apache.inlong.sdk.transform.process;
-import org.apache.inlong.sdk.transform.pojo.FieldInfo;
-import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
-import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
/**
- * SinkEncoder
+ * ValueParserNode
*/
-public interface SinkEncoder<Output> {
+@AllArgsConstructor
+@Data
+public class ValueParserNode {
- Output encode(SinkData sinkData, Context context);
-
- List<FieldInfo> getFields();
+ private String fieldName;
+ private ValueParser parser;
}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
index 3413f1aca3..8448260252 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
@@ -350,4 +350,80 @@ public class TestTransformProcessor {
List<String> output = processor.transform(srcBytes, new HashMap<>());
Assert.assertEquals(2, output.size());
}
+ @Test
+ public void testCsv2Star() throws Exception {
+ List<FieldInfo> fields = this.getTestFieldList("ftime", "extinfo");
+ CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\',
fields);
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', new
ArrayList<>());
+ String transformSql = "select *";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1
+ TransformProcessor<String, String> processor1 = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
+
+ List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok",
new HashMap<>());
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
+ // case2
+ config.setTransformSql("select * from source where extinfo!='ok'");
+ TransformProcessor<String, String> processor2 = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
+
+ List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok",
new HashMap<>());
+ Assert.assertEquals(0, output2.size());
+ // case3
+ config.setTransformSql("select *,extinfo,ftime from source where
extinfo!='ok'");
+ TransformProcessor<String, String> processor3 = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
+
+ List<String> output3 = processor3.transform("2024-04-28 00:00:00|nok",
new HashMap<>());
+ Assert.assertEquals(1, output3.size());
+ Assert.assertEquals(output3.get(0), "2024-04-28
00:00:00|nok|nok|2024-04-28 00:00:00");
+ // case4
+ CsvSourceInfo csvSourceNoField = new CsvSourceInfo("UTF-8", '|', '\\',
new ArrayList<>());
+ CsvSinkInfo csvSinkNoField = new CsvSinkInfo("UTF-8", '|', '\\', new
ArrayList<>());
+ config.setTransformSql("select *,$2,$1 from source where $2='nok'");
+ TransformProcessor<String, String> processor4 = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSourceNoField),
+ SinkEncoderFactory.createCsvEncoder(csvSinkNoField));
+
+ List<String> output4 = processor4.transform("2024-04-28 00:00:00|nok",
new HashMap<>());
+ Assert.assertEquals(1, output4.size());
+ Assert.assertEquals(output4.get(0), "2024-04-28
00:00:00|nok|nok|2024-04-28 00:00:00");
+ }
+
+ @Test
+ public void testKv2Star() throws Exception {
+ List<FieldInfo> fields = this.getTestFieldList("ftime", "extinfo");
+ KvSourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
+ KvSinkInfo kvSink = new KvSinkInfo("UTF-8", new ArrayList<>());
+ String transformSql = "select *";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1
+ TransformProcessor<String, String> processor1 = TransformProcessor
+ .create(config, SourceDecoderFactory.createKvDecoder(kvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output1 = processor1.transform("ftime=2024-04-28
00:00:00&extinfo=ok", new HashMap<>());
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "ftime=2024-04-28
00:00:00&extinfo=ok");
+ // case2
+ config.setTransformSql("select * from source where extinfo!='ok'");
+ TransformProcessor<String, String> processor2 = TransformProcessor
+ .create(config, SourceDecoderFactory.createKvDecoder(kvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output2 = processor2.transform("ftime=2024-04-28
00:00:00&extinfo=ok", new HashMap<>());
+ Assert.assertEquals(0, output2.size());
+ // case3
+ config.setTransformSql("select *,extinfo e1,ftime f1 from source where
extinfo!='ok'");
+ TransformProcessor<String, String> processor3 = TransformProcessor
+ .create(config, SourceDecoderFactory.createKvDecoder(kvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+
+ List<String> output3 = processor3.transform("ftime=2024-04-28
00:00:00&extinfo=nok", new HashMap<>());
+ Assert.assertEquals(1, output3.size());
+ Assert.assertEquals(output3.get(0), "ftime=2024-04-28
00:00:00&extinfo=nok&e1=nok&f1=2024-04-28 00:00:00");
+ }
}