This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 714d7af5b6 [INLONG-10109][SDK] Support to transform from Json protocol
to CSV/KV protocol by single SQL (#10110)
714d7af5b6 is described below
commit 714d7af5b619e7cd829bfd26c879baef58b54abd
Author: 卢春亮 <[email protected]>
AuthorDate: Mon May 6 16:18:23 2024 +0800
[INLONG-10109][SDK] Support to transform from Json protocol to CSV/KV
protocol by single SQL (#10110)
---
.../inlong/sdk/transform/decode/CsvSourceData.java | 4 +-
.../decode/{CsvSourceData.java => JsonNode.java} | 59 ++++-----
.../sdk/transform/decode/JsonSourceData.java | 134 +++++++++++++++++++++
.../sdk/transform/decode/JsonSourceDecoder.java | 123 +++++++++++++++++++
.../inlong/sdk/transform/decode/KvSourceData.java | 4 +-
.../sdk/transform/encode/CsvSinkEncoder.java | 8 ++
.../inlong/sdk/transform/encode/KvSinkEncoder.java | 8 ++
.../inlong/sdk/transform/encode/SinkEncoder.java | 6 +
.../sdk/transform/process/TransformProcessor.java | 32 +++--
.../transform/process/operator/OperatorTools.java | 3 +
.../sdk/transform/process/parser/ColumnParser.java | 7 +-
.../transform/process/TestTransformProcessor.java | 81 ++++++++++++-
12 files changed, 421 insertions(+), 48 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
index 77f173f1a6..d4492b4b85 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
@@ -51,10 +51,10 @@ public class CsvSourceData implements SourceData {
@Override
public String getField(int rowNum, String fieldName) {
- if (rowNum > this.rows.size()) {
+ if (rowNum >= this.rows.size()) {
return null;
}
- Map<String, String> targetRow = this.rows.get(rowNum - 1);
+ Map<String, String> targetRow = this.rows.get(rowNum);
return targetRow.get(fieldName);
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonNode.java
similarity index 50%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonNode.java
index 77f173f1a6..e60406162f 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonNode.java
@@ -17,44 +17,35 @@
package org.apache.inlong.sdk.transform.decode;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import lombok.Data;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.commons.lang3.StringUtils;
/**
- * CsvSourceData
+ * JsonNode
*
*/
-public class CsvSourceData implements SourceData {
-
- private List<Map<String, String>> rows = new ArrayList<>();
-
- private Map<String, String> currentRow;
-
- public CsvSourceData() {
- }
-
- public void putField(String fieldName, String fieldValue) {
- this.currentRow.put(fieldName, fieldValue);
- }
-
- public void addRow() {
- this.currentRow = new HashMap<>();
- rows.add(currentRow);
- }
-
- @Override
- public int getRowCount() {
- return this.rows.size();
- }
-
- @Override
- public String getField(int rowNum, String fieldName) {
- if (rowNum > this.rows.size()) {
- return null;
+@Data
+public class JsonNode {
+
+ private String name;
+ private boolean isArray = false;
+ private int arrayIndex = -1;
+
+ public JsonNode(String nodeString) {
+ int beginIndex = nodeString.indexOf('[');
+ if (beginIndex < 0) {
+ this.name = nodeString;
+ } else {
+ this.name = StringUtils.trim(nodeString.substring(0, beginIndex));
+ int endIndex = nodeString.lastIndexOf(']');
+ if (endIndex >= 0) {
+ this.isArray = true;
+ this.arrayIndex =
NumberUtils.toInt(nodeString.substring(beginIndex + 1, endIndex), -1);
+ if (this.arrayIndex < 0) {
+ this.arrayIndex = 0;
+ }
+ }
}
- Map<String, String> targetRow = this.rows.get(rowNum - 1);
- return targetRow.get(fieldName);
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java
new file mode 100644
index 0000000000..5f92c34965
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * JsonSourceData
+ *
+ */
+public class JsonSourceData implements SourceData {
+
+ public static final String ROOT_KEY = "$root";
+
+ public static final String CHILD_KEY = "$child";
+
+ private JsonObject root;
+
+ private JsonArray childRoot;
+
+ /**
+ * Constructor
+ * @param root
+ * @param childRoot
+ */
+ public JsonSourceData(JsonObject root, JsonArray childRoot) {
+ this.root = root;
+ this.childRoot = childRoot;
+ }
+
+ /**
+ * getRowCount
+ * @return
+ */
+ @Override
+ public int getRowCount() {
+ if (this.childRoot == null) {
+ return 1;
+ } else {
+ return this.childRoot.size();
+ }
+ }
+
+ /**
+ * getField
+ * @param rowNum
+ * @param fieldName
+ * @return
+ */
+ @Override
+ public String getField(int rowNum, String fieldName) {
+ try {
+ List<JsonNode> childNodes = new ArrayList<>();
+ String[] nodeStrings = fieldName.split("\\.");
+ for (String nodeString : nodeStrings) {
+ childNodes.add(new JsonNode(nodeString));
+ }
+ // parse
+ if (childNodes.size() == 0) {
+ return "";
+ }
+ // first node
+ JsonNode firstNode = childNodes.get(0);
+ JsonElement current = root;
+ if (StringUtils.equals(ROOT_KEY, firstNode.getName())) {
+ current = root;
+ } else if (StringUtils.equals(CHILD_KEY, firstNode.getName())) {
+ if (rowNum < childRoot.size()) {
+ current = childRoot.get(rowNum);
+ } else {
+ return "";
+ }
+ } else {
+ // error data
+ return "";
+ }
+ if (current == null) {
+ // error data
+ return "";
+ }
+ // parse other node
+ for (int i = 1; i < childNodes.size(); i++) {
+ JsonNode node = childNodes.get(i);
+ if (!current.isJsonObject()) {
+ // error data
+ return "";
+ }
+ JsonElement newElement =
current.getAsJsonObject().get(node.getName());
+ if (newElement == null) {
+ // error data
+ return "";
+ }
+ if (!node.isArray()) {
+ current = newElement;
+ } else {
+ if (!newElement.isJsonArray()) {
+ // error data
+ return "";
+ }
+ JsonArray newArray = newElement.getAsJsonArray();
+ if (node.getArrayIndex() >= newArray.size()) {
+ // error data
+ return "";
+ }
+ current = newArray.get(node.getArrayIndex());
+ }
+ }
+ return current.getAsString();
+ } catch (Exception e) {
+ return "";
+ }
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
new file mode 100644
index 0000000000..57bf6a9982
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JsonSourceDecoder
+ *
+ */
+public class JsonSourceDecoder implements SourceDecoder {
+
+ protected JsonSourceInfo sourceInfo;
+ private Charset srcCharset = Charset.defaultCharset();
+ private String rowsNodePath;
+ private List<JsonNode> childNodes;
+
+ private Gson gson = new Gson();
+
+ /**
+ * Constructor
+ * @param sourceInfo
+ */
+ public JsonSourceDecoder(JsonSourceInfo sourceInfo) {
+ this.sourceInfo = sourceInfo;
+ if (!StringUtils.isBlank(sourceInfo.getCharset())) {
+ this.srcCharset = Charset.forName(sourceInfo.getCharset());
+ }
+ this.rowsNodePath = sourceInfo.getRowsNodePath();
+ if (!StringUtils.isBlank(rowsNodePath)) {
+ this.childNodes = new ArrayList<>();
+ String[] nodeStrings = this.rowsNodePath.split("\\.");
+ for (String nodeString : nodeStrings) {
+ this.childNodes.add(new JsonNode(nodeString));
+ }
+ }
+ }
+
+ /**
+ * decode
+ * @param srcBytes
+ * @param extParams
+ * @return
+ */
+ @Override
+ public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
+ String srcString = new String(srcBytes, srcCharset);
+ return this.decode(srcString, extParams);
+ }
+
+ /**
+ * decode
+ * @param srcString
+ * @param extParams
+ * @return
+ */
+ @Override
+ public SourceData decode(String srcString, Map<String, Object> extParams) {
+ JsonObject root = gson.fromJson(srcString, JsonObject.class);
+ JsonArray childRoot = null;
+ if (this.childNodes != null && this.childNodes.size() > 0) {
+ JsonElement current = root;
+ for (JsonNode node : childNodes) {
+ if (!current.isJsonObject()) {
+ // error data
+ return new JsonSourceData(root, childRoot);
+ }
+ JsonElement newElement =
current.getAsJsonObject().get(node.getName());
+ if (newElement == null) {
+ // error data
+ return new JsonSourceData(root, childRoot);
+ }
+ if (!node.isArray()) {
+ current = newElement;
+ } else {
+ if (!newElement.isJsonArray()) {
+ // error data
+ return new JsonSourceData(root, childRoot);
+ }
+ JsonArray newArray = newElement.getAsJsonArray();
+ if (node.getArrayIndex() >= newArray.size()) {
+ // error data
+ return new JsonSourceData(root, childRoot);
+ }
+ current = newArray.get(node.getArrayIndex());
+ }
+ }
+ if (!current.isJsonArray()) {
+ // error data
+ return new JsonSourceData(root, childRoot);
+ }
+ childRoot = current.getAsJsonArray();
+ }
+ SourceData sourceData = new JsonSourceData(root, childRoot);
+ return sourceData;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
index 3e3f600197..e260ae15f4 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
@@ -50,10 +50,10 @@ public class KvSourceData implements SourceData {
@Override
public String getField(int rowNum, String fieldName) {
- if (rowNum > this.rows.size()) {
+ if (rowNum >= this.rows.size()) {
return null;
}
- Map<String, String> targetRow = this.rows.get(rowNum - 1);
+ Map<String, String> targetRow = this.rows.get(rowNum);
return targetRow.get(fieldName);
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
index cb3c9405a0..a991d137b9 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
@@ -74,4 +74,12 @@ public class CsvSinkEncoder implements SinkEncoder {
}
return builder.substring(0, builder.length() - 1);
}
+
+ /**
+ * get fields
+ * @return the fields
+ */
+ public List<FieldInfo> getFields() {
+ return fields;
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
index 3fcc31107e..56f35ec319 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
@@ -61,4 +61,12 @@ public class KvSinkEncoder implements SinkEncoder {
}
return builder.substring(0, builder.length() - 1);
}
+
+ /**
+ * get fields
+ * @return the fields
+ */
+ public List<FieldInfo> getFields() {
+ return fields;
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
index ab83d21a0b..3839da0160 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
@@ -17,10 +17,16 @@
package org.apache.inlong.sdk.transform.encode;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+
+import java.util.List;
+
/**
* SinkEncoder
*/
public interface SinkEncoder {
String encode(SinkData sinkData);
+
+ List<FieldInfo> getFields();
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index c08a6b3bc5..ad4c4b4304 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.process;
import org.apache.inlong.sdk.transform.decode.CsvSourceDecoder;
+import org.apache.inlong.sdk.transform.decode.JsonSourceDecoder;
import org.apache.inlong.sdk.transform.decode.KvSourceDecoder;
import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.decode.SourceDecoder;
@@ -28,6 +29,8 @@ import org.apache.inlong.sdk.transform.encode.SinkData;
import org.apache.inlong.sdk.transform.encode.SinkEncoder;
import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.SinkInfo;
@@ -103,6 +106,8 @@ public class TransformProcessor {
this.decoder = new CsvSourceDecoder((CsvSourceInfo) sourceInfo);
} else if (sourceInfo instanceof KvSourceInfo) {
this.decoder = new KvSourceDecoder((KvSourceInfo) sourceInfo);
+ } else if (sourceInfo instanceof JsonSourceInfo) {
+ this.decoder = new JsonSourceDecoder((JsonSourceInfo) sourceInfo);
}
}
@@ -122,24 +127,35 @@ public class TransformProcessor {
this.where =
OperatorTools.buildOperator(this.transformSelect.getWhere());
List<SelectItem> items = this.transformSelect.getSelectItems();
this.selectItemMap = new HashMap<>(items.size());
- for (SelectItem item : items) {
+ List<FieldInfo> fields = this.encoder.getFields();
+ for (int i = 0; i < items.size(); i++) {
+ SelectItem item = items.get(i);
+ String fieldName = null;
+ if (i < fields.size()) {
+ fieldName = fields.get(i).getName();
+ }
if (item instanceof SelectExpressionItem) {
SelectExpressionItem exprItem = (SelectExpressionItem) item;
- if (exprItem.getAlias() == null) {
- this.selectItemMap.put(exprItem.toString(),
-
OperatorTools.buildParser(exprItem.getExpression()));
- } else {
- this.selectItemMap.put(exprItem.getAlias().getName(),
-
OperatorTools.buildParser(exprItem.getExpression()));
+ if (fieldName == null) {
+ if (exprItem.getAlias() == null) {
+ fieldName = exprItem.toString();
+ } else {
+ fieldName = exprItem.getAlias().getName();
+ }
}
+ this.selectItemMap.put(fieldName,
+ OperatorTools.buildParser(exprItem.getExpression()));
}
}
}
public List<String> transform(byte[] srcBytes, Map<String, Object>
extParams) {
SourceData sourceData = this.decoder.decode(srcBytes, extParams);
+ if (sourceData == null) {
+ return null;
+ }
List<String> sinkDatas = new ArrayList<>(sourceData.getRowCount());
- for (int i = 1; i <= sourceData.getRowCount(); i++) {
+ for (int i = 0; i < sourceData.getRowCount(); i++) {
if (this.where != null && !this.where.check(sourceData, i)) {
continue;
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
index c0e059f266..8afe2f0c74 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
@@ -23,6 +23,7 @@ import
org.apache.inlong.sdk.transform.process.parser.StringParser;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
import net.sf.jsqlparser.expression.LongValue;
import net.sf.jsqlparser.expression.NotExpression;
import net.sf.jsqlparser.expression.Parenthesis;
@@ -75,6 +76,8 @@ public class OperatorTools {
return new StringParser((StringValue) expr);
} else if (expr instanceof LongValue) {
return new LongParser((LongValue) expr);
+ } else if (expr instanceof Function) {
+ return new ColumnParser((Function) expr);
}
return null;
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
index b6dd579d2a..1216cfff74 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.transform.process.parser;
import org.apache.inlong.sdk.transform.decode.SourceData;
+import net.sf.jsqlparser.expression.Function;
import net.sf.jsqlparser.schema.Column;
/**
@@ -30,7 +31,11 @@ public class ColumnParser implements ValueParser {
private String fieldName;
public ColumnParser(Column expr) {
- this.fieldName = expr.getColumnName();
+ this.fieldName = expr.toString();
+ }
+
+ public ColumnParser(Function expr) {
+ this.fieldName = expr.toString().replace('(', '[').replace(')', ']');
}
/**
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
index 9c75456df6..498fad7f79 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sdk.transform.process;
import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.SinkInfo;
@@ -69,7 +70,7 @@ public class TestTransformProcessor {
}
@Test
- public void testKvCsv() {
+ public void testKv2Csv() {
try {
List<FieldInfo> fields = new ArrayList<>();
FieldInfo ftime = new FieldInfo();
@@ -97,6 +98,84 @@ public class TestTransformProcessor {
}
}
+ @Test
+ public void testJson2Csv() {
+ try {
+ List<FieldInfo> fields = new ArrayList<>();
+ FieldInfo sid = new FieldInfo();
+ sid.setName("sid");
+ fields.add(sid);
+ FieldInfo packageID = new FieldInfo();
+ packageID.setName("packageID");
+ fields.add(packageID);
+ FieldInfo msgTime = new FieldInfo();
+ msgTime.setName("msgTime");
+ fields.add(msgTime);
+ FieldInfo msg = new FieldInfo();
+ msg.setName("msg");
+ fields.add(msg);
+ SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "msgs");
+ SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ String transformSql = "select
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
+ TransformConfig config = new TransformConfig(jsonSource, csvSink,
transformSql);
+ // case1
+ TransformProcessor processor = new TransformProcessor(config);
+ String srcString = "{\n"
+ + " \"sid\":\"value1\",\n"
+ + " \"packageID\":\"value2\",\n"
+ + " \"msgs\":[\n"
+ + " {\"msg\":\"value4\",\"msgTime\":1713243918000},\n"
+ + " {\"msg\":\"v4\",\"msgTime\":1713243918000}\n"
+ + " ]\n"
+ + "}";
+ List<String> output = processor.transform(srcString, new
HashMap<>());
+ Assert.assertTrue(output.size() == 2);
+ Assert.assertEquals(output.get(0),
"value1|value2|1713243918000|value4");
+ Assert.assertEquals(output.get(1),
"value1|value2|1713243918000|v4");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testJson2CsvForOne() {
+ try {
+ List<FieldInfo> fields = new ArrayList<>();
+ FieldInfo sid = new FieldInfo();
+ sid.setName("sid");
+ fields.add(sid);
+ FieldInfo packageID = new FieldInfo();
+ packageID.setName("packageID");
+ fields.add(packageID);
+ FieldInfo msgTime = new FieldInfo();
+ msgTime.setName("msgTime");
+ fields.add(msgTime);
+ FieldInfo msg = new FieldInfo();
+ msg.setName("msg");
+ fields.add(msg);
+ SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "");
+ SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ String transformSql =
+ "select
$root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
+ TransformConfig config = new TransformConfig(jsonSource, csvSink,
transformSql);
+ // case1
+ TransformProcessor processor = new TransformProcessor(config);
+ String srcString = "{\n"
+ + " \"sid\":\"value1\",\n"
+ + " \"packageID\":\"value2\",\n"
+ + " \"msgs\":[\n"
+ + " {\"msg\":\"value4\",\"msgTime\":1713243918000},\n"
+ + " {\"msg\":\"v4\",\"msgTime\":1713243918000}\n"
+ + " ]\n"
+ + "}";
+ List<String> output = processor.transform(srcString, new
HashMap<>());
+ Assert.assertTrue(output.size() == 1);
+ Assert.assertEquals(output.get(0),
"value1|value2|1713243918000|value4");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
@Test
public void testKvCsvByJsonConfig() {
try {