This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ca4069293c [INLONG-11107][SDK] Avro Source Data Support Map Type
(#11108)
ca4069293c is described below
commit ca4069293c9110a90f80f6262504f1c4639c2815
Author: Xincheng Huang <[email protected]>
AuthorDate: Fri Sep 20 17:28:12 2024 +0800
[INLONG-11107][SDK] Avro Source Data Support Map Type (#11108)
---
.../inlong/sdk/transform/decode/AvroSourceData.java | 12 ++++++++++++
.../process/processor/AbstractProcessorTestBase.java | 19 +++++++++++--------
.../process/processor/TestAvro2CsvProcessor.java | 9 +++++----
3 files changed, 28 insertions(+), 12 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java
index c060c89af4..42705433f4 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java
@@ -20,12 +20,14 @@ package org.apache.inlong.sdk.transform.decode;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
import org.apache.commons.lang3.StringUtils;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
public class AvroSourceData implements SourceData {
@@ -91,6 +93,16 @@ public class AvroSourceData implements SourceData {
// parse other node
for (int i = 1; i < childNodes.size(); i++) {
AvroNode node = childNodes.get(i);
+ if (curSchema.getType() == Type.MAP) {
+ Map<?, ?> map = (Map<?, ?>) current;
+ Object mapValue = map.get(new Utf8(node.getName()));
+ if (mapValue == null) {
+ return "";
+ }
+ curSchema = curSchema.getValueType();
+ current = mapValue;
+ continue;
+ }
if (curSchema.getType() != Type.RECORD) {
// error data
return "";
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
index 9f05396667..0a71004789 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
@@ -67,14 +67,17 @@ public abstract class AbstractProcessorTestBase {
}
protected byte[] getAvroTestData() {
- String srcString =
"T2JqAQIWYXZyby5zY2hlbWHIBXsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJTZGtEYXRhUmVxdWVzdCIs"
- +
"Im5hbWVzcGFjZSI6InRlc3QiLCJmaWVsZHMiOlt7Im5hbWUiOiJzaWQiLCJ0eXBlIjoic3RyaW5nIn0seyJuYW1lIj"
- +
"oibXNncyIsInR5cGUiOnsidHlwZSI6ImFycmF5IiwiaXRlbXMiOnsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJTZGtNZ"
- +
"XNzYWdlIiwiZmllbGRzIjpbeyJuYW1lIjoibXNnIiwidHlwZSI6ImJ5dGVzIn0seyJuYW1lIjoibXNnVGltZSIsInR5"
- +
"cGUiOiJsb25nIn0seyJuYW1lIjoiZXh0aW5mbyIsInR5cGUiOnsidHlwZSI6Im1hcCIsInZhbHVlcyI6InN0cmluZyJ"
- +
"9fV19fX0seyJuYW1lIjoicGFja2FnZUlEIiwidHlwZSI6ImxvbmcifV19AI7h/J8SaFCGp012msD3lKMCngEIc2lkMQ"
- +
"QKQXBwbGXyhcYJBAhrZXkxCGtleTEIa2V5Mgx2YWx1ZTEADEJhbmFuYeSLjBMECGtleTEIa2V5MghrZXkyDHZhbHVlM"
- + "gAAgIkPjuH8nxJoUIanTXaawPeUow==";
+ String srcString =
"T2JqAQIWYXZyby5zY2hlbWHIBXsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJTZGtE"
+ +
"YXRhUmVxdWVzdCIsIm5hbWVzcGFjZSI6InRlc3QiLCJmaWVsZHMiOlt7Im5hbWUi"
+ +
"OiJzaWQiLCJ0eXBlIjoic3RyaW5nIn0seyJuYW1lIjoibXNncyIsInR5cGUiOnsi"
+ +
"dHlwZSI6ImFycmF5IiwiaXRlbXMiOnsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJT"
+ +
"ZGtNZXNzYWdlIiwiZmllbGRzIjpbeyJuYW1lIjoibXNnIiwidHlwZSI6ImJ5dGVz"
+ +
"In0seyJuYW1lIjoibXNnVGltZSIsInR5cGUiOiJsb25nIn0seyJuYW1lIjoiZXh0"
+ +
"aW5mbyIsInR5cGUiOnsidHlwZSI6Im1hcCIsInZhbHVlcyI6InN0cmluZyJ9fV19"
+ +
"fX0seyJuYW1lIjoicGFja2FnZUlEIiwidHlwZSI6ImxvbmcifV19AMt7kQjpgkXl"
+ +
"EjM4Iv+oOJYClgEIc2lkMQQKQXBwbGXyhcYJBARrMQx2YWx1ZTEEazIMdmFsdWUy"
+ +
"AAxCYW5hbmHki4wTBARrMQx2YWx1ZTMEazIMdmFsdWU0AACAiQ/Le5EI6YJF5RIz"
+ + "OCL/qDiW";
byte[] srcBytes = Base64.getDecoder().decode(srcString);
return srcBytes;
}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAvro2CsvProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAvro2CsvProcessor.java
index fa0a361112..8ef3f9f3f5 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAvro2CsvProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAvro2CsvProcessor.java
@@ -34,10 +34,11 @@ public class TestAvro2CsvProcessor extends
AbstractProcessorTestBase {
@Test
public void testAvro2Csv() throws Exception {
- List<FieldInfo> fields = this.getTestFieldList("sid", "packageID",
"msgTime", "msg");
+ List<FieldInfo> fields = this.getTestFieldList("sid", "packageID",
"msgTime", "msg", "extinfo.k1");
AvroSourceInfo avroSource = new AvroSourceInfo("UTF-8", "msgs");
CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields);
- String transformSql = "select
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
+ String transformSql =
+ "select $root.sid,$root.packageID,$child.msgTime,$child.msg,
$child.extinfo.k1 from source";
TransformConfig config = new TransformConfig(transformSql);
// case1
TransformProcessor<byte[], String> processor = TransformProcessor
@@ -46,7 +47,7 @@ public class TestAvro2CsvProcessor extends
AbstractProcessorTestBase {
byte[] srcBytes = this.getAvroTestData();
List<String> output = processor.transform(srcBytes);
Assert.assertEquals(2, output.size());
- Assert.assertEquals(output.get(0), "sid1|123456|10011001|Apple");
- Assert.assertEquals(output.get(1), "sid1|123456|20022002|Banana");
+ Assert.assertEquals(output.get(0),
"sid1|123456|10011001|Apple|value1");
+ Assert.assertEquals(output.get(1),
"sid1|123456|20022002|Banana|value3");
}
}