This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7851c07645 [INLONG-11134][SDK] Add Parquet formatted data source for 
Transform (#11182)
7851c07645 is described below

commit 7851c076454ae7023e64191067e498ab766a78eb
Author: Zkplo <[email protected]>
AuthorDate: Tue Sep 24 12:06:53 2024 +0800

    [INLONG-11134][SDK] Add Parquet formatted data source for Transform (#11182)
    
    Co-authored-by: ZKpLo <[email protected]>
---
 inlong-sdk/transform-sdk/pom.xml                   |  14 ++
 .../transform/decode/ParquetInputByteArray.java    |  69 +++++++++
 .../sdk/transform/decode/ParquetSourceData.java    | 172 +++++++++++++++++++++
 .../sdk/transform/decode/ParquetSourceDecoder.java | 105 +++++++++++++
 .../sdk/transform/decode/SourceDecoderFactory.java |   6 +
 .../sdk/transform/pojo/ParquetSourceInfo.java      | 103 ++++++++++++
 .../processor/AbstractProcessorTestBase.java       |  42 +++++
 .../process/processor/TestParquetCsvProcessor.java | 139 +++++++++++++++++
 8 files changed, 650 insertions(+)

diff --git a/inlong-sdk/transform-sdk/pom.xml b/inlong-sdk/transform-sdk/pom.xml
index f8e2388839..e922c97b54 100644
--- a/inlong-sdk/transform-sdk/pom.xml
+++ b/inlong-sdk/transform-sdk/pom.xml
@@ -68,6 +68,20 @@
             <groupId>org.reflections</groupId>
             <artifactId>reflections</artifactId>
         </dependency>
+        <!-- Parquet dependencies -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-hadoop</artifactId>
+            <version>${parquet.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetInputByteArray.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetInputByteArray.java
new file mode 100644
index 0000000000..0eafc33518
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetInputByteArray.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+import java.io.ByteArrayInputStream;
+
+public class ParquetInputByteArray implements InputFile {
+
+    private final byte[] data;
+
+    private class SeekableByteArrayInputStream extends ByteArrayInputStream {
+
+        public SeekableByteArrayInputStream(byte[] buf) {
+            super(buf);
+        }
+
+        public void setPos(int pos) {
+            this.pos = pos;
+        }
+
+        public int getPos() {
+            return this.pos;
+        }
+    }
+
+    public ParquetInputByteArray(byte[] data) {
+        this.data = data;
+    }
+
+    @Override
+    public long getLength() {
+        return this.data.length;
+    }
+
+    @Override
+    public SeekableInputStream newStream() {
+        return new DelegatingSeekableInputStream(new 
SeekableByteArrayInputStream(this.data)) {
+
+            @Override
+            public void seek(long newPos) {
+                ((SeekableByteArrayInputStream) this.getStream()).setPos((int) 
newPos);
+            }
+
+            @Override
+            public long getPos() {
+                return ((SeekableByteArrayInputStream) 
this.getStream()).getPos();
+            }
+        };
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceData.java
new file mode 100644
index 0000000000..02eff5167f
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceData.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+
+/**
+ * ParquetSourceData
+ */
+public class ParquetSourceData implements SourceData {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ParquetSourceData.class);
+
+    public static final String ROOT_KEY = "$root.";
+
+    public static final String CHILD_KEY = "$child.";
+
+    private Group rootGroup;
+    private Group childParent;
+    private String childName;
+    private Charset srcCharset;
+    private Type childType;
+    private int rowCount = -1;
+
+    public ParquetSourceData(Group root, String childPath, Charset srcCharset) 
{
+        this.rootGroup = root;
+        String pathStr = "";
+        if (!StringUtils.isEmpty(childPath)) {
+            pathStr = childPath;
+        }
+        if (!StringUtils.isEmpty(pathStr)) {
+            String[] pathNodes = pathStr.split("\\.");
+            this.childName = pathNodes[pathNodes.length - 1];
+            this.childParent = parsePath(rootGroup, pathNodes);
+            if (this.childParent != null) {
+                this.childType = 
this.childParent.getType().getType(this.childName);
+                this.rowCount = 
this.childParent.getFieldRepetitionCount(this.childName);
+            }
+        }
+        this.srcCharset = srcCharset;
+    }
+    @Override
+    public int getRowCount() {
+        if (this.childParent == null) {
+            return 1;
+        } else {
+            return rowCount;
+        }
+    }
+
+    @Override
+    public String getField(int rowNum, String fieldName) {
+        String fieldValue = "";
+        try {
+            if (StringUtils.startsWith(fieldName, ROOT_KEY)) {
+                // Dealing with multi-level paths
+                fieldName = fieldName.substring(ROOT_KEY.length());
+                fieldValue = parseFields(fieldName, rootGroup);
+            } else if (StringUtils.startsWith(fieldName, CHILD_KEY)) {
+                // To meet various situations
+                if (childType instanceof GroupType) {
+                    Group group = childParent.getGroup(childName, rowNum);
+                    if (childParent != null && rowNum < getRowCount()) {
+                        // Dealing with multi-level paths
+                        fieldName = fieldName.substring(CHILD_KEY.length());
+                        fieldValue = parseFields(fieldName, group);
+                    }
+                } else {
+                    fieldValue = getFieldValue(childParent, 
childName).toString();
+                }
+            }
+            return fieldValue;
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+        return fieldValue;
+    }
+
+    private String parseFields(String fieldName, Group rootGroup) {
+        String[] pathNodes = fieldName.split("\\.");
+        Group parNode = parsePath(rootGroup, pathNodes);
+        return getFieldValue(parNode, pathNodes[pathNodes.length - 
1]).toString();
+    }
+
+    /**
+     * Resolve to the parent Group of the path for subsequent value taking
+     * @param root Starting Group for Analysis
+     * @param path Analyze the path, separated by dots "."
+     * @return The parent group of the path
+     */
+    public Group parsePath(Group root, String[] path) {
+        Object cur = root, last = null;
+        int lastIdx = path.length - 1;
+        for (int i = 0; i <= lastIdx; i++) {
+            if (cur instanceof Group) {
+                Object value = getFieldValue((Group) cur, path[i]);
+                last = cur;
+                cur = value;
+            } else if (i == lastIdx) {
+                return (Group) last;
+            } else {
+                return null;
+            }
+            if (cur == null) {
+                return null;
+            }
+        }
+        return (Group) last;
+    }
+
+    public Object getFieldValue(Group group, String fieldName) {
+        try {
+            int idx = 0, start = fieldName.indexOf('(');
+            if (start != -1) {
+                idx = Integer.parseInt(fieldName.substring(start + 1, 
fieldName.indexOf(')')));
+                fieldName = fieldName.substring(0, start);
+            }
+            Type field = group.getType().getType(fieldName);
+            if (field.isPrimitive()) {
+                switch (field.asPrimitiveType().getPrimitiveTypeName()) {
+                    case INT32:
+                        return group.getInteger(fieldName, idx);
+                    case INT64:
+                        return group.getLong(fieldName, idx);
+                    case INT96:
+                        return group.getInt96(fieldName, idx);
+                    case FLOAT:
+                        return group.getFloat(fieldName, idx);
+                    case DOUBLE:
+                        return group.getDouble(fieldName, idx);
+                    case BOOLEAN:
+                        return group.getBoolean(fieldName, idx);
+                    case FIXED_LEN_BYTE_ARRAY:
+                        return group.getBinary(fieldName, idx).getBytes();
+                    case BINARY:
+                        return new String(group.getBinary(fieldName, 
idx).getBytes(), srcCharset);
+                    default:
+                        LOG.error("Unsupported type for field: {} ,field name: 
{}", field, fieldName);
+                        return null;
+                }
+            } else {
+                return group.getGroup(fieldName, idx);
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            return null;
+        }
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
new file mode 100644
index 0000000000..11312370eb
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.inlong.sdk.transform.pojo.ParquetSourceInfo;
+import org.apache.inlong.sdk.transform.process.Context;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+
+/**
+ * PbSourceDecoder
+ */
+public class ParquetSourceDecoder implements SourceDecoder<byte[]> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ParquetSourceDecoder.class);
+
+    protected ParquetSourceInfo sourceInfo;
+    private Charset srcCharset = Charset.defaultCharset();
+    private MessageType schema;
+    private String childMessagePath;
+    private String rootMessageLabel;
+
+    public ParquetSourceDecoder(ParquetSourceInfo sourceInfo) {
+        try {
+            this.sourceInfo = sourceInfo;
+            if (!StringUtils.isBlank(sourceInfo.getCharset())) {
+                this.srcCharset = Charset.forName(sourceInfo.getCharset());
+            }
+            this.schema = 
MessageTypeParser.parseMessageType(sourceInfo.getParquetSchema());
+            this.childMessagePath = sourceInfo.getChildMessagePath();
+            this.rootMessageLabel = sourceInfo.getRootMessageLabel();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            throw new TransformException(e.getMessage(), e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public SourceData decode(byte[] srcBytes, Context context) {
+        try {
+            // Create a custom InputFile
+            InputFile inputFile = new ParquetInputByteArray(srcBytes);
+
+            // Read Parquet data using ParquetFileReader
+            try (ParquetFileReader reader = ParquetFileReader.open(inputFile)) 
{
+                // Retrieve the metadata of the file
+                ParquetMetadata footer = reader.getFooter();
+                MessageType schema = footer.getFileMetaData().getSchema();
+
+                PageReadStore pages;
+                while ((pages = reader.readNextRowGroup()) != null) {
+                    long rows = pages.getRowCount();
+
+                    ColumnIOFactory factory = new ColumnIOFactory();
+                    MessageColumnIO columnIO = factory.getColumnIO(schema);
+
+                    RecordMaterializer<Group> recordMaterializer = new 
GroupRecordConverter(schema);
+                    RecordReader<Group> recordReader = 
columnIO.getRecordReader(pages, recordMaterializer);
+
+                    for (int i = 0; i < rows; i++) {
+                        Group group = recordReader.read();
+                        if (group != null) {
+                            return new ParquetSourceData(group, 
this.childMessagePath, this.srcCharset);
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+        return null;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
index 041e1632eb..0f166dcef8 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
@@ -22,6 +22,7 @@ import org.apache.inlong.sdk.transform.pojo.BsonSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.ParquetSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.YamlSourceInfo;
 
@@ -46,10 +47,15 @@ public class SourceDecoderFactory {
     public static AvroSourceDecoder createAvroDecoder(AvroSourceInfo 
sourceInfo) {
         return new AvroSourceDecoder(sourceInfo);
     }
+
     public static BsonSourceDecoder createBsonDecoder(BsonSourceInfo 
sourceInfo) {
         return new BsonSourceDecoder(sourceInfo);
     }
 
+    public static ParquetSourceDecoder createParquetDecoder(ParquetSourceInfo 
sourceInfo) {
+        return new ParquetSourceDecoder(sourceInfo);
+    }
+
     public static YamlSourceDecoder createYamlDecoder(YamlSourceInfo 
sourceInfo) {
         return new YamlSourceDecoder(sourceInfo);
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ParquetSourceInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ParquetSourceInfo.java
new file mode 100644
index 0000000000..37a077e41e
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ParquetSourceInfo.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * ParquetSourceInfo
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ParquetSourceInfo extends SourceInfo {
+
+    private String parquetSchema;
+    private String rootMessageLabel;
+    private String childMessagePath;
+
+    @JsonCreator
+    public ParquetSourceInfo(
+            @JsonProperty("charset") String charset,
+            @JsonProperty("parquetSchema") String parquetSchema,
+            @JsonProperty("rootMessageLabel") String rootMessageLabel,
+            @JsonProperty("childMessagePath") String childMessagePath) {
+        super(charset);
+        this.parquetSchema = parquetSchema;
+        this.rootMessageLabel = rootMessageLabel;
+        this.childMessagePath = childMessagePath;
+    }
+
+    /**
+     * get ParquetSchema
+     *
+     * @return the protoDescription
+     */
+    @JsonProperty("ParquetSchema")
+    public String getParquetSchema() {
+        return parquetSchema;
+    }
+
+    /**
+     * set ParquetSchema
+     *
+     * @param parquetSchema the parquetSchema to set
+     */
+    public void setParquetSchema(String parquetSchema) {
+        this.parquetSchema = parquetSchema;
+    }
+
+    /**
+     * get rootMessageLabel
+     *
+     * @return the rootMessageLabel
+     */
+    @JsonProperty("rootMessageLabel")
+    public String getRootMessageLabel() {
+        return rootMessageLabel;
+    }
+
+    /**
+     * set rootMessageLabel
+     *
+     * @param rootMessageLabel the rootMessageLabel to set
+     */
+    public void setRootMessageLabel(String rootMessageLabel) {
+        this.rootMessageLabel = rootMessageLabel;
+    }
+
+    /**
+     * get childMessagePath
+     *
+     * @return the childMessagePath
+     */
+    @JsonProperty("childMessagePath")
+    public String getChildMessagePath() {
+        return childMessagePath;
+    }
+
+    /**
+     * set childMessagePath
+     *
+     * @param childMessagePath the childMessagePath to set
+     */
+    public void setChildMessagePath(String childMessagePath) {
+        this.childMessagePath = childMessagePath;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
index 0a71004789..e99a3c83c4 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
@@ -66,6 +66,48 @@ public abstract class AbstractProcessorTestBase {
         return transformBase64;
     }
 
+    protected String getParquetTestDescription() {
+        return "message SdkDataRequest { "
+                + "required binary sid (UTF8); "
+                + "required int64 packageID; "
+                + "repeated group msgs { "
+                + "  required binary msg (UTF8); "
+                + "  required int64 msgTime; "
+                + "  optional group extinfo (MAP) { "
+                + "    repeated group key_value { "
+                + "      required binary key (UTF8); "
+                + "      required binary value (UTF8); "
+                + "    } "
+                + "  } "
+                + "} "
+                + "}";
+    }
+
+    protected byte[] getParquetTestData() {
+        String srcString = 
"UEFSMRUAFRoVHiwVAhUAFQgVCBwYCXNlc3Npb25fMRgJc2Vzc2lvbl8xFg" +
+                
"AAAAANMAkAAABzZXNzaW9uXzEVABUQFRQsFQIVABUIFQgcGAjpAwAAAAAAABgI6QMAAAA" +
+                
"AAAAWAAAAAAgc6QMAAAAAAAAVABVMFUwsFQQVABUGFQYcGAtIZWxsbyBXb3JsZBgHQm9u" +
+                
"am91chYAAAAAJhQCAAAAAwIFBmgDCwAAAEhlbGxvIFdvcmxkBwAAAEJvbmpvdXIVABU4F" +
+                
"TgsFQQVABUGFQYcGAjellVhAAAAABgI05ZVYQAAAAAWAAAAABwUAgAAAAMCBQZAA9OWVW" +
+                
"EAAAAA3pZVYQAAAAAVBBUQFRRMFQIVBAAACBwEAAAAbGFuZxUAFSAVJCwVBBUEFQYVBhw" +
+                
"YBGxhbmcYBGxhbmcWAAAAABA8AwAAAAMEAAMAAAADDwAAAxUAFTQVNCwVBBUAFQYVBhwY" +
+                
"AkZSGAJFThYAAAAAGhgDAAAAAwQABQc0DwACAAAARU4CAAAARlIVAhmsSA5TZGtEYXRhU" +
+                
"mVxdWVzdBUGABUMJQAYA3NpZCUAABUEJQAYCXBhY2thZ2VJRAA1BBgEbXNncxUGABUMJQ" +
+                
"AYA21zZyUAABUEJQAYB21zZ1RpbWUANQIYB2V4dGluZm8VAhUCADUEGAlrZXlfdmFsdWU" +
+                
"VBAAVDCUAGANrZXklAAAVDCUAGAV2YWx1ZSUAABYCGRwZbCYIHBUMGSUIABkYA3NpZBUC" +
+                
"FgIWcBZ0Jgg8GAlzZXNzaW9uXzEYCXNlc3Npb25fMRYAAAAAJnwcFQQZJQgAGRgJcGFja" +
+                
"2FnZUlEFQIWAhZiFmYmfDwYCOkDAAAAAAAAGAjpAwAAAAAAABYAAAAAJuIBHBUMGSUABh" +
+                
"koBG1zZ3MDbXNnFQIWBBaiARaiASbiATwYC0hlbGxvIFdvcmxkGAdCb25qb3VyFgAAAAA" +
+                
"mhAMcFQQZJQAGGSgEbXNncwdtc2dUaW1lFQIWBBaKARaKASaEAzwYCN6WVWEAAAAAGAjT" +
+                
"llVhAAAAABYAAAAAJo4EHBUMGSUEBhlIBG1zZ3MHZXh0aW5mbwlrZXlfdmFsdWUDa2V5F" +
+                
"QIWBBaMARaUASaOBDwYBGxhbmcYBGxhbmcWAAAAACaiBRwVDBklAAYZSARtc2dzB2V4dG" +
+                
"luZm8Ja2V5X3ZhbHVlBXZhbHVlFQIWBBZuFm4mogU8GAJGUhgCRU4WAAAAABb4BRYCACh" +
+                
"JcGFycXVldC1tciB2ZXJzaW9uIDEuOC4xIChidWlsZCA0YWJhNGRhZTdiYjBkNGVkYmNm" +
+                "NzkyM2FlMTMzOWYyOGZkM2Y3ZmNmKQBeAgAAUEFSMQ==";
+        byte[] srcBytes = Base64.getDecoder().decode(srcString);
+        return srcBytes;
+    }
+
     protected byte[] getAvroTestData() {
         String srcString = 
"T2JqAQIWYXZyby5zY2hlbWHIBXsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJTZGtE"
                 + 
"YXRhUmVxdWVzdCIsIm5hbWVzcGFjZSI6InRlc3QiLCJmaWVsZHMiOlt7Im5hbWUi"
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestParquetCsvProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestParquetCsvProcessor.java
new file mode 100644
index 0000000000..0562161424
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestParquetCsvProcessor.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.processor;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.ParquetSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestParquetCsvProcessor extends AbstractProcessorTestBase {
+
+    @Test
+    public void testParquet2Csv() throws Exception {
+        List<FieldInfo> fields = this.getTestFieldList("sid", "packageID", 
"msgTime", "msg");
+        String parquetTestDescription = this.getParquetTestDescription();
+        ParquetSourceInfo parquetSourceInfo =
+                new ParquetSourceInfo("UTF-8", parquetTestDescription, 
"SdkDataRequest", "msgs");
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields);
+        String transformSql = "select 
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1
+        TransformProcessor<byte[], String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createParquetDecoder(parquetSourceInfo),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
+        byte[] srcBytes = this.getParquetTestData();
+        List<String> output = processor.transform(srcBytes);
+        Assert.assertEquals(2, output.size());
+        Assert.assertEquals(output.get(0), "session_1|1001|1632999123|Hello 
World");
+        Assert.assertEquals(output.get(1), 
"session_1|1001|1632999134|Bonjour");
+    }
+
+    @Test
+    public void testParquet2CsvForOne() throws Exception {
+        List<FieldInfo> fields = this.getTestFieldList("sid", "packageID", 
"msgTime", "msg");
+        String parquetTestDescription = this.getParquetTestDescription();
+        ParquetSourceInfo parquetSourceInfo =
+                new ParquetSourceInfo("UTF-8", parquetTestDescription, 
"SdkDataRequest", null);
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields);
+        String transformSql = "select 
$root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1
+        TransformProcessor<byte[], String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createParquetDecoder(parquetSourceInfo),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
+        byte[] srcBytes = this.getParquetTestData();
+        List<String> output = processor.transform(srcBytes, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals(output.get(0), "session_1|1001|1632999134|Hello 
World");
+    }
+
+    @Test
+    public void testParquet2CsvForAdd() throws Exception {
+        List<FieldInfo> fields = this.getTestFieldList("sid", "packageID", 
"msgTime", "msg");
+        String parquetTestDescription = this.getParquetTestDescription();
+        ParquetSourceInfo parquetSourceInfo =
+                new ParquetSourceInfo("UTF-8", parquetTestDescription, 
"SdkDataRequest", null);
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields);
+
+        String transformSql = "select $root.sid,"
+                + 
"($root.msgs(1).msgTime-$root.msgs(0).msgTime+990)/$root.packageID field2,"
+                + 
"$root.packageID*($root.msgs(0).msgTime*$root.packageID+$root.msgs(1).msgTime/($root.packageID
 - 1))"
+                + "*$root.packageID field3,"
+                + "$root.msgs(0).msg field4 from source "
+                + "where 
$root.packageID<($root.msgs(0).msgTime+$root.msgs(1).msgTime"
+                + "+$root.msgs(0).msgTime+$root.msgs(1).msgTime)";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1
+        TransformProcessor<byte[], String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createParquetDecoder(parquetSourceInfo),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
+        byte[] srcBytes = this.getParquetTestData();
+        List<String> output = processor.transform(srcBytes, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals(output.get(0), 
"session_1|1|1637904657266133390.134|Hello World");
+    }
+
+    @Test
+    public void testParquet2CsvForConcat() throws Exception {
+        List<FieldInfo> fields = this.getTestFieldList("sid", "packageID", 
"msgTime", "msg");
+        String parquetTestDescription = this.getParquetTestDescription();
+        ParquetSourceInfo parquetSourceInfo =
+                new ParquetSourceInfo("UTF-8", parquetTestDescription, 
"SdkDataRequest", "msgs");
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields);
+        String transformSql = "select $root.sid,$root.packageID,$child.msg 
Time,"
+                + "concat($root.sid,$root.packageID,$child.msgTime,$child.msg) 
msg,$root.msgs.msgTime.msg from source";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1
+        TransformProcessor<byte[], String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createParquetDecoder(parquetSourceInfo),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
+        byte[] srcBytes = this.getParquetTestData();
+        List<String> output = processor.transform(srcBytes, new HashMap<>());
+        Assert.assertTrue(output.size() == 2);
+        Assert.assertEquals(output.get(0), "session_1|1001|Hello 
World|session_110011632999123Hello World");
+        Assert.assertEquals(output.get(1), 
"session_1|1001|Bonjour|session_110011632999134Bonjour");
+    }
+
+    @Test
+    public void testParquet2CsvForNow() throws Exception {
+        List<FieldInfo> fields = this.getTestFieldList("sid", "packageID", 
"msgTime", "msg");
+        String parquetTestDescription = this.getParquetTestDescription();
+        ParquetSourceInfo parquetSourceInfo =
+                new ParquetSourceInfo("UTF-8", parquetTestDescription, 
"SdkDataRequest", "msgs");
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields);
+        String transformSql = "select now() from source";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1
+        TransformProcessor<byte[], String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createParquetDecoder(parquetSourceInfo),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
+        byte[] srcBytes = this.getParquetTestData();
+        List<String> output = processor.transform(srcBytes, new HashMap<>());
+        Assert.assertEquals(2, output.size());
+    }
+}

Reply via email to