This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7851c07645 [INLONG-11134][SDK] Add Parquet formatted data source for
Transform (#11182)
7851c07645 is described below
commit 7851c076454ae7023e64191067e498ab766a78eb
Author: Zkplo <[email protected]>
AuthorDate: Tue Sep 24 12:06:53 2024 +0800
[INLONG-11134][SDK] Add Parquet formatted data source for Transform (#11182)
Co-authored-by: ZKpLo <[email protected]>
---
inlong-sdk/transform-sdk/pom.xml | 14 ++
.../transform/decode/ParquetInputByteArray.java | 69 +++++++++
.../sdk/transform/decode/ParquetSourceData.java | 172 +++++++++++++++++++++
.../sdk/transform/decode/ParquetSourceDecoder.java | 105 +++++++++++++
.../sdk/transform/decode/SourceDecoderFactory.java | 6 +
.../sdk/transform/pojo/ParquetSourceInfo.java | 103 ++++++++++++
.../processor/AbstractProcessorTestBase.java | 42 +++++
.../process/processor/TestParquetCsvProcessor.java | 139 +++++++++++++++++
8 files changed, 650 insertions(+)
diff --git a/inlong-sdk/transform-sdk/pom.xml b/inlong-sdk/transform-sdk/pom.xml
index f8e2388839..e922c97b54 100644
--- a/inlong-sdk/transform-sdk/pom.xml
+++ b/inlong-sdk/transform-sdk/pom.xml
@@ -68,6 +68,20 @@
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
</dependency>
+ <!-- Parquet dependencies -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetInputByteArray.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetInputByteArray.java
new file mode 100644
index 0000000000..0eafc33518
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetInputByteArray.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+import java.io.ByteArrayInputStream;
+
+public class ParquetInputByteArray implements InputFile {
+
+ private final byte[] data;
+
+ private class SeekableByteArrayInputStream extends ByteArrayInputStream {
+
+ public SeekableByteArrayInputStream(byte[] buf) {
+ super(buf);
+ }
+
+ public void setPos(int pos) {
+ this.pos = pos;
+ }
+
+ public int getPos() {
+ return this.pos;
+ }
+ }
+
+ public ParquetInputByteArray(byte[] data) {
+ this.data = data;
+ }
+
+ @Override
+ public long getLength() {
+ return this.data.length;
+ }
+
+ @Override
+ public SeekableInputStream newStream() {
+ return new DelegatingSeekableInputStream(new
SeekableByteArrayInputStream(this.data)) {
+
+ @Override
+ public void seek(long newPos) {
+ ((SeekableByteArrayInputStream) this.getStream()).setPos((int)
newPos);
+ }
+
+ @Override
+ public long getPos() {
+ return ((SeekableByteArrayInputStream)
this.getStream()).getPos();
+ }
+ };
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceData.java
new file mode 100644
index 0000000000..02eff5167f
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceData.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+
+/**
+ * ParquetSourceData
+ */
+public class ParquetSourceData implements SourceData {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ParquetSourceData.class);
+
+ public static final String ROOT_KEY = "$root.";
+
+ public static final String CHILD_KEY = "$child.";
+
+ private Group rootGroup;
+ private Group childParent;
+ private String childName;
+ private Charset srcCharset;
+ private Type childType;
+ private int rowCount = -1;
+
+ public ParquetSourceData(Group root, String childPath, Charset srcCharset)
{
+ this.rootGroup = root;
+ String pathStr = "";
+ if (!StringUtils.isEmpty(childPath)) {
+ pathStr = childPath;
+ }
+ if (!StringUtils.isEmpty(pathStr)) {
+ String[] pathNodes = pathStr.split("\\.");
+ this.childName = pathNodes[pathNodes.length - 1];
+ this.childParent = parsePath(rootGroup, pathNodes);
+ if (this.childParent != null) {
+ this.childType =
this.childParent.getType().getType(this.childName);
+ this.rowCount =
this.childParent.getFieldRepetitionCount(this.childName);
+ }
+ }
+ this.srcCharset = srcCharset;
+ }
+ @Override
+ public int getRowCount() {
+ if (this.childParent == null) {
+ return 1;
+ } else {
+ return rowCount;
+ }
+ }
+
+ @Override
+ public String getField(int rowNum, String fieldName) {
+ String fieldValue = "";
+ try {
+ if (StringUtils.startsWith(fieldName, ROOT_KEY)) {
+ // Dealing with multi-level paths
+ fieldName = fieldName.substring(ROOT_KEY.length());
+ fieldValue = parseFields(fieldName, rootGroup);
+ } else if (StringUtils.startsWith(fieldName, CHILD_KEY)) {
+ // To meet various situations
+ if (childType instanceof GroupType) {
+ Group group = childParent.getGroup(childName, rowNum);
+ if (childParent != null && rowNum < getRowCount()) {
+ // Dealing with multi-level paths
+ fieldName = fieldName.substring(CHILD_KEY.length());
+ fieldValue = parseFields(fieldName, group);
+ }
+ } else {
+ fieldValue = getFieldValue(childParent,
childName).toString();
+ }
+ }
+ return fieldValue;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ return fieldValue;
+ }
+
+ private String parseFields(String fieldName, Group rootGroup) {
+ String[] pathNodes = fieldName.split("\\.");
+ Group parNode = parsePath(rootGroup, pathNodes);
+ return getFieldValue(parNode, pathNodes[pathNodes.length -
1]).toString();
+ }
+
+ /**
+ * Resolve to the parent Group of the path for subsequent value taking
+ * @param root Starting Group for Analysis
+ * @param path Analyze the path, separated by dots "."
+ * @return The parent group of the path
+ */
+ public Group parsePath(Group root, String[] path) {
+ Object cur = root, last = null;
+ int lastIdx = path.length - 1;
+ for (int i = 0; i <= lastIdx; i++) {
+ if (cur instanceof Group) {
+ Object value = getFieldValue((Group) cur, path[i]);
+ last = cur;
+ cur = value;
+ } else if (i == lastIdx) {
+ return (Group) last;
+ } else {
+ return null;
+ }
+ if (cur == null) {
+ return null;
+ }
+ }
+ return (Group) last;
+ }
+
+ public Object getFieldValue(Group group, String fieldName) {
+ try {
+ int idx = 0, start = fieldName.indexOf('(');
+ if (start != -1) {
+ idx = Integer.parseInt(fieldName.substring(start + 1,
fieldName.indexOf(')')));
+ fieldName = fieldName.substring(0, start);
+ }
+ Type field = group.getType().getType(fieldName);
+ if (field.isPrimitive()) {
+ switch (field.asPrimitiveType().getPrimitiveTypeName()) {
+ case INT32:
+ return group.getInteger(fieldName, idx);
+ case INT64:
+ return group.getLong(fieldName, idx);
+ case INT96:
+ return group.getInt96(fieldName, idx);
+ case FLOAT:
+ return group.getFloat(fieldName, idx);
+ case DOUBLE:
+ return group.getDouble(fieldName, idx);
+ case BOOLEAN:
+ return group.getBoolean(fieldName, idx);
+ case FIXED_LEN_BYTE_ARRAY:
+ return group.getBinary(fieldName, idx).getBytes();
+ case BINARY:
+ return new String(group.getBinary(fieldName,
idx).getBytes(), srcCharset);
+ default:
+ LOG.error("Unsupported type for field: {} ,field name:
{}", field, fieldName);
+ return null;
+ }
+ } else {
+ return group.getGroup(fieldName, idx);
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
new file mode 100644
index 0000000000..11312370eb
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.inlong.sdk.transform.pojo.ParquetSourceInfo;
+import org.apache.inlong.sdk.transform.process.Context;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+
+/**
+ * PbSourceDecoder
+ */
+public class ParquetSourceDecoder implements SourceDecoder<byte[]> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ParquetSourceDecoder.class);
+
+ protected ParquetSourceInfo sourceInfo;
+ private Charset srcCharset = Charset.defaultCharset();
+ private MessageType schema;
+ private String childMessagePath;
+ private String rootMessageLabel;
+
+ public ParquetSourceDecoder(ParquetSourceInfo sourceInfo) {
+ try {
+ this.sourceInfo = sourceInfo;
+ if (!StringUtils.isBlank(sourceInfo.getCharset())) {
+ this.srcCharset = Charset.forName(sourceInfo.getCharset());
+ }
+ this.schema =
MessageTypeParser.parseMessageType(sourceInfo.getParquetSchema());
+ this.childMessagePath = sourceInfo.getChildMessagePath();
+ this.rootMessageLabel = sourceInfo.getRootMessageLabel();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new TransformException(e.getMessage(), e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public SourceData decode(byte[] srcBytes, Context context) {
+ try {
+ // Create a custom InputFile
+ InputFile inputFile = new ParquetInputByteArray(srcBytes);
+
+ // Read Parquet data using ParquetFileReader
+ try (ParquetFileReader reader = ParquetFileReader.open(inputFile))
{
+ // Retrieve the metadata of the file
+ ParquetMetadata footer = reader.getFooter();
+ MessageType schema = footer.getFileMetaData().getSchema();
+
+ PageReadStore pages;
+ while ((pages = reader.readNextRowGroup()) != null) {
+ long rows = pages.getRowCount();
+
+ ColumnIOFactory factory = new ColumnIOFactory();
+ MessageColumnIO columnIO = factory.getColumnIO(schema);
+
+ RecordMaterializer<Group> recordMaterializer = new
GroupRecordConverter(schema);
+ RecordReader<Group> recordReader =
columnIO.getRecordReader(pages, recordMaterializer);
+
+ for (int i = 0; i < rows; i++) {
+ Group group = recordReader.read();
+ if (group != null) {
+ return new ParquetSourceData(group,
this.childMessagePath, this.srcCharset);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ return null;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
index 041e1632eb..0f166dcef8 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
@@ -22,6 +22,7 @@ import org.apache.inlong.sdk.transform.pojo.BsonSourceInfo;
import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.ParquetSourceInfo;
import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
import org.apache.inlong.sdk.transform.pojo.YamlSourceInfo;
@@ -46,10 +47,15 @@ public class SourceDecoderFactory {
public static AvroSourceDecoder createAvroDecoder(AvroSourceInfo
sourceInfo) {
return new AvroSourceDecoder(sourceInfo);
}
+
public static BsonSourceDecoder createBsonDecoder(BsonSourceInfo
sourceInfo) {
return new BsonSourceDecoder(sourceInfo);
}
+ public static ParquetSourceDecoder createParquetDecoder(ParquetSourceInfo
sourceInfo) {
+ return new ParquetSourceDecoder(sourceInfo);
+ }
+
public static YamlSourceDecoder createYamlDecoder(YamlSourceInfo
sourceInfo) {
return new YamlSourceDecoder(sourceInfo);
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ParquetSourceInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ParquetSourceInfo.java
new file mode 100644
index 0000000000..37a077e41e
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ParquetSourceInfo.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * ParquetSourceInfo
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ParquetSourceInfo extends SourceInfo {
+
+ private String parquetSchema;
+ private String rootMessageLabel;
+ private String childMessagePath;
+
+ @JsonCreator
+ public ParquetSourceInfo(
+ @JsonProperty("charset") String charset,
+ @JsonProperty("parquetSchema") String parquetSchema,
+ @JsonProperty("rootMessageLabel") String rootMessageLabel,
+ @JsonProperty("childMessagePath") String childMessagePath) {
+ super(charset);
+ this.parquetSchema = parquetSchema;
+ this.rootMessageLabel = rootMessageLabel;
+ this.childMessagePath = childMessagePath;
+ }
+
+ /**
+ * get ParquetSchema
+ *
+ * @return the protoDescription
+ */
+ @JsonProperty("ParquetSchema")
+ public String getParquetSchema() {
+ return parquetSchema;
+ }
+
+ /**
+ * set ParquetSchema
+ *
+ * @param parquetSchema the parquetSchema to set
+ */
+ public void setParquetSchema(String parquetSchema) {
+ this.parquetSchema = parquetSchema;
+ }
+
+ /**
+ * get rootMessageLabel
+ *
+ * @return the rootMessageLabel
+ */
+ @JsonProperty("rootMessageLabel")
+ public String getRootMessageLabel() {
+ return rootMessageLabel;
+ }
+
+ /**
+ * set rootMessageLabel
+ *
+ * @param rootMessageLabel the rootMessageLabel to set
+ */
+ public void setRootMessageLabel(String rootMessageLabel) {
+ this.rootMessageLabel = rootMessageLabel;
+ }
+
+ /**
+ * get childMessagePath
+ *
+ * @return the childMessagePath
+ */
+ @JsonProperty("childMessagePath")
+ public String getChildMessagePath() {
+ return childMessagePath;
+ }
+
+ /**
+ * set childMessagePath
+ *
+ * @param childMessagePath the childMessagePath to set
+ */
+ public void setChildMessagePath(String childMessagePath) {
+ this.childMessagePath = childMessagePath;
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
index 0a71004789..e99a3c83c4 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
@@ -66,6 +66,48 @@ public abstract class AbstractProcessorTestBase {
return transformBase64;
}
+ protected String getParquetTestDescription() {
+ return "message SdkDataRequest { "
+ + "required binary sid (UTF8); "
+ + "required int64 packageID; "
+ + "repeated group msgs { "
+ + " required binary msg (UTF8); "
+ + " required int64 msgTime; "
+ + " optional group extinfo (MAP) { "
+ + " repeated group key_value { "
+ + " required binary key (UTF8); "
+ + " required binary value (UTF8); "
+ + " } "
+ + " } "
+ + "} "
+ + "}";
+ }
+
+ protected byte[] getParquetTestData() {
+ String srcString =
"UEFSMRUAFRoVHiwVAhUAFQgVCBwYCXNlc3Npb25fMRgJc2Vzc2lvbl8xFg" +
+
"AAAAANMAkAAABzZXNzaW9uXzEVABUQFRQsFQIVABUIFQgcGAjpAwAAAAAAABgI6QMAAAA" +
+
"AAAAWAAAAAAgc6QMAAAAAAAAVABVMFUwsFQQVABUGFQYcGAtIZWxsbyBXb3JsZBgHQm9u" +
+
"am91chYAAAAAJhQCAAAAAwIFBmgDCwAAAEhlbGxvIFdvcmxkBwAAAEJvbmpvdXIVABU4F" +
+
"TgsFQQVABUGFQYcGAjellVhAAAAABgI05ZVYQAAAAAWAAAAABwUAgAAAAMCBQZAA9OWVW" +
+
"EAAAAA3pZVYQAAAAAVBBUQFRRMFQIVBAAACBwEAAAAbGFuZxUAFSAVJCwVBBUEFQYVBhw" +
+
"YBGxhbmcYBGxhbmcWAAAAABA8AwAAAAMEAAMAAAADDwAAAxUAFTQVNCwVBBUAFQYVBhwY" +
+
"AkZSGAJFThYAAAAAGhgDAAAAAwQABQc0DwACAAAARU4CAAAARlIVAhmsSA5TZGtEYXRhU" +
+
"mVxdWVzdBUGABUMJQAYA3NpZCUAABUEJQAYCXBhY2thZ2VJRAA1BBgEbXNncxUGABUMJQ" +
+
"AYA21zZyUAABUEJQAYB21zZ1RpbWUANQIYB2V4dGluZm8VAhUCADUEGAlrZXlfdmFsdWU" +
+
"VBAAVDCUAGANrZXklAAAVDCUAGAV2YWx1ZSUAABYCGRwZbCYIHBUMGSUIABkYA3NpZBUC" +
+
"FgIWcBZ0Jgg8GAlzZXNzaW9uXzEYCXNlc3Npb25fMRYAAAAAJnwcFQQZJQgAGRgJcGFja" +
+
"2FnZUlEFQIWAhZiFmYmfDwYCOkDAAAAAAAAGAjpAwAAAAAAABYAAAAAJuIBHBUMGSUABh" +
+
"koBG1zZ3MDbXNnFQIWBBaiARaiASbiATwYC0hlbGxvIFdvcmxkGAdCb25qb3VyFgAAAAA" +
+
"mhAMcFQQZJQAGGSgEbXNncwdtc2dUaW1lFQIWBBaKARaKASaEAzwYCN6WVWEAAAAAGAjT" +
+
"llVhAAAAABYAAAAAJo4EHBUMGSUEBhlIBG1zZ3MHZXh0aW5mbwlrZXlfdmFsdWUDa2V5F" +
+
"QIWBBaMARaUASaOBDwYBGxhbmcYBGxhbmcWAAAAACaiBRwVDBklAAYZSARtc2dzB2V4dG" +
+
"luZm8Ja2V5X3ZhbHVlBXZhbHVlFQIWBBZuFm4mogU8GAJGUhgCRU4WAAAAABb4BRYCACh" +
+
"JcGFycXVldC1tciB2ZXJzaW9uIDEuOC4xIChidWlsZCA0YWJhNGRhZTdiYjBkNGVkYmNm" +
+ "NzkyM2FlMTMzOWYyOGZkM2Y3ZmNmKQBeAgAAUEFSMQ==";
+ byte[] srcBytes = Base64.getDecoder().decode(srcString);
+ return srcBytes;
+ }
+
protected byte[] getAvroTestData() {
String srcString =
"T2JqAQIWYXZyby5zY2hlbWHIBXsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJTZGtE"
+
"YXRhUmVxdWVzdCIsIm5hbWVzcGFjZSI6InRlc3QiLCJmaWVsZHMiOlt7Im5hbWUi"
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestParquetCsvProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestParquetCsvProcessor.java
new file mode 100644
index 0000000000..0562161424
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestParquetCsvProcessor.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.processor;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.ParquetSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestParquetCsvProcessor extends AbstractProcessorTestBase {
+
+ @Test
+ public void testParquet2Csv() throws Exception {
+ List<FieldInfo> fields = this.getTestFieldList("sid", "packageID",
"msgTime", "msg");
+ String parquetTestDescription = this.getParquetTestDescription();
+ ParquetSourceInfo parquetSourceInfo =
+ new ParquetSourceInfo("UTF-8", parquetTestDescription,
"SdkDataRequest", "msgs");
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields);
+ String transformSql = "select
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1
+ TransformProcessor<byte[], String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createParquetDecoder(parquetSourceInfo),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
+ byte[] srcBytes = this.getParquetTestData();
+ List<String> output = processor.transform(srcBytes);
+ Assert.assertEquals(2, output.size());
+ Assert.assertEquals(output.get(0), "session_1|1001|1632999123|Hello
World");
+ Assert.assertEquals(output.get(1),
"session_1|1001|1632999134|Bonjour");
+ }
+
+ @Test
+ public void testParquet2CsvForOne() throws Exception {
+ List<FieldInfo> fields = this.getTestFieldList("sid", "packageID",
"msgTime", "msg");
+ String parquetTestDescription = this.getParquetTestDescription();
+ ParquetSourceInfo parquetSourceInfo =
+ new ParquetSourceInfo("UTF-8", parquetTestDescription,
"SdkDataRequest", null);
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields);
+ String transformSql = "select
$root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1
+ TransformProcessor<byte[], String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createParquetDecoder(parquetSourceInfo),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
+ byte[] srcBytes = this.getParquetTestData();
+ List<String> output = processor.transform(srcBytes, new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals(output.get(0), "session_1|1001|1632999134|Hello
World");
+ }
+
+ @Test
+ public void testParquet2CsvForAdd() throws Exception {
+ List<FieldInfo> fields = this.getTestFieldList("sid", "packageID",
"msgTime", "msg");
+ String parquetTestDescription = this.getParquetTestDescription();
+ ParquetSourceInfo parquetSourceInfo =
+ new ParquetSourceInfo("UTF-8", parquetTestDescription,
"SdkDataRequest", null);
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields);
+
+ String transformSql = "select $root.sid,"
+ +
"($root.msgs(1).msgTime-$root.msgs(0).msgTime+990)/$root.packageID field2,"
+ +
"$root.packageID*($root.msgs(0).msgTime*$root.packageID+$root.msgs(1).msgTime/($root.packageID
- 1))"
+ + "*$root.packageID field3,"
+ + "$root.msgs(0).msg field4 from source "
+ + "where
$root.packageID<($root.msgs(0).msgTime+$root.msgs(1).msgTime"
+ + "+$root.msgs(0).msgTime+$root.msgs(1).msgTime)";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1
+ TransformProcessor<byte[], String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createParquetDecoder(parquetSourceInfo),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
+ byte[] srcBytes = this.getParquetTestData();
+ List<String> output = processor.transform(srcBytes, new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals(output.get(0),
"session_1|1|1637904657266133390.134|Hello World");
+ }
+
+ @Test
+ public void testParquet2CsvForConcat() throws Exception {
+ List<FieldInfo> fields = this.getTestFieldList("sid", "packageID",
"msgTime", "msg");
+ String parquetTestDescription = this.getParquetTestDescription();
+ ParquetSourceInfo parquetSourceInfo =
+ new ParquetSourceInfo("UTF-8", parquetTestDescription,
"SdkDataRequest", "msgs");
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields);
+ String transformSql = "select $root.sid,$root.packageID,$child.msg
Time,"
+ + "concat($root.sid,$root.packageID,$child.msgTime,$child.msg)
msg,$root.msgs.msgTime.msg from source";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1
+ TransformProcessor<byte[], String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createParquetDecoder(parquetSourceInfo),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
+ byte[] srcBytes = this.getParquetTestData();
+ List<String> output = processor.transform(srcBytes, new HashMap<>());
+ Assert.assertTrue(output.size() == 2);
+ Assert.assertEquals(output.get(0), "session_1|1001|Hello
World|session_110011632999123Hello World");
+ Assert.assertEquals(output.get(1),
"session_1|1001|Bonjour|session_110011632999134Bonjour");
+ }
+
+ @Test
+ public void testParquet2CsvForNow() throws Exception {
+ List<FieldInfo> fields = this.getTestFieldList("sid", "packageID",
"msgTime", "msg");
+ String parquetTestDescription = this.getParquetTestDescription();
+ ParquetSourceInfo parquetSourceInfo =
+ new ParquetSourceInfo("UTF-8", parquetTestDescription,
"SdkDataRequest", "msgs");
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields);
+ String transformSql = "select now() from source";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1
+ TransformProcessor<byte[], String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createParquetDecoder(parquetSourceInfo),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
+ byte[] srcBytes = this.getParquetTestData();
+ List<String> output = processor.transform(srcBytes, new HashMap<>());
+ Assert.assertEquals(2, output.size());
+ }
+}