This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f489c51f0e [INLONG-11227][SDK] Add Parquet formatted data sink for
Transform (#11245)
f489c51f0e is described below
commit f489c51f0e36f23cd9672437fcff3f9330248096
Author: Zkplo <[email protected]>
AuthorDate: Wed Oct 9 12:49:54 2024 +0800
[INLONG-11227][SDK] Add Parquet formatted data sink for Transform (#11245)
---
.../transform/encode/ParquetByteArrayWriter.java | 177 ++++++++++++++++++
.../transform/encode/ParquetOutputByteArray.java | 60 ++++++
.../sdk/transform/encode/ParquetSinkEncoder.java | 106 +++++++++++
...EncoderFactory.java => ParquetValueWriter.java} | 18 +-
...EncoderFactory.java => ParquetWriteRunner.java} | 23 +--
.../sdk/transform/encode/SinkEncoderFactory.java | 5 +
.../inlong/sdk/transform/pojo/ParquetSinkInfo.java | 64 +++++++
.../apache/inlong/sdk/transform/pojo/SinkInfo.java | 1 +
.../processor/AbstractProcessorTestBase.java | 49 +++++
.../processor/TestJson2ParquetProcessor.java | 208 +++++++++++++++++++++
10 files changed, 679 insertions(+), 32 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetByteArrayWriter.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetByteArrayWriter.java
new file mode 100644
index 0000000000..bfb072ea66
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetByteArrayWriter.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+
+public final class ParquetByteArrayWriter<T> implements Closeable {
+
+ private final org.apache.parquet.hadoop.ParquetWriter<T> writer;
+ private final ParquetOutputByteArray outputByteArray;
+
+ public static <T> ParquetByteArrayWriter<T> buildWriter(MessageType
schema, ParquetWriteRunner<T> writeRunner)
+ throws IOException {
+ return new ParquetByteArrayWriter<>(new ParquetOutputByteArray(),
schema, writeRunner);
+ }
+
+ private ParquetByteArrayWriter(ParquetOutputByteArray outputFile,
MessageType schema,
+ ParquetWriteRunner<T> writeRunner)
+ throws IOException {
+ this.writer = new Builder<T>(outputFile)
+ .withType(schema)
+ .withWriteRunner(writeRunner)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
+ .build();
+ outputByteArray = outputFile;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.writer.close();
+ }
+
+ public void write(T record) throws IOException {
+ this.writer.write(record);
+ }
+
+ public ByteArrayOutputStream getByteArrayOutputStream() {
+ return outputByteArray.getByteArrayOutputStream();
+ }
+
+ private static final class Builder<T>
+ extends
+ org.apache.parquet.hadoop.ParquetWriter.Builder<T,
ParquetByteArrayWriter.Builder<T>> {
+
+ private MessageType schema;
+ private ParquetWriteRunner<T> writeRunner;
+
+ private Builder(OutputFile file) {
+ super(file);
+ }
+
+ public Builder<T> withType(MessageType schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ public Builder<T> withWriteRunner(ParquetWriteRunner<T> writeRunner) {
+ this.writeRunner = writeRunner;
+ return this;
+ }
+
+ @Override
+ protected Builder<T> self() {
+ return this;
+ }
+
+ @Override
+ protected WriteSupport<T> getWriteSupport(Configuration conf) {
+ return new ParquetByteArrayWriter.SimpleWriteSupport<>(schema,
writeRunner);
+ }
+ }
+
+ private static class SimpleWriteSupport<T> extends WriteSupport<T> {
+
+ private final MessageType schema;
+ private final ParquetWriteRunner<T> writeRunner;
+ private final ParquetValueWriter valueWriter;
+
+ private RecordConsumer recordConsumer;
+
+ SimpleWriteSupport(MessageType schema, ParquetWriteRunner<T>
writeRunner) {
+ this.schema = schema;
+ this.writeRunner = writeRunner;
+ this.valueWriter = this::write;
+ }
+
+ public void write(String name, Object value) {
+ int fieldIndex = schema.getFieldIndex(name);
+ PrimitiveType type = schema.getType(fieldIndex).asPrimitiveType();
+ recordConsumer.startField(name, fieldIndex);
+
+ switch (type.getPrimitiveTypeName()) {
+ case INT32:
+ recordConsumer.addInteger((int) value);
+ break;
+ case INT64:
+ recordConsumer.addLong((long) value);
+ break;
+ case DOUBLE:
+ recordConsumer.addDouble((double) value);
+ break;
+ case BOOLEAN:
+ recordConsumer.addBoolean((boolean) value);
+ break;
+ case FLOAT:
+ recordConsumer.addFloat((float) value);
+ break;
+ case BINARY:
+ if (type.getLogicalTypeAnnotation() ==
LogicalTypeAnnotation.stringType()) {
+ recordConsumer.addBinary(Binary.fromString((String)
value));
+ } else {
+ throw new UnsupportedOperationException(
+ "Don't support writing " +
type.getLogicalTypeAnnotation());
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Don't support
writing " + type.getPrimitiveTypeName());
+ }
+ recordConsumer.endField(name, fieldIndex);
+ }
+
+ @Override
+ public WriteContext init(Configuration configuration) {
+ return new WriteContext(schema, Collections.emptyMap());
+ }
+
+ @Override
+ public void prepareForWrite(RecordConsumer recordConsumer) {
+ this.recordConsumer = recordConsumer;
+ }
+
+ @Override
+ public void write(T record) {
+ recordConsumer.startMessage();
+ writeRunner.doWrite(record, valueWriter);
+ recordConsumer.endMessage();
+ }
+
+ @Override
+ public String getName() {
+ return null;
+ }
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetOutputByteArray.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetOutputByteArray.java
new file mode 100644
index 0000000000..bf60301a10
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetOutputByteArray.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import org.apache.parquet.io.DelegatingPositionOutputStream;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class ParquetOutputByteArray implements OutputFile {
+
+ private final ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+
+ public ByteArrayOutputStream getByteArrayOutputStream() {
+ return byteArrayOutputStream;
+ }
+
+ @Override
+ public PositionOutputStream create(long blockSizeHint) throws IOException {
+ return createOrOverwrite(blockSizeHint);
+ }
+
+ @Override
+ public PositionOutputStream createOrOverwrite(long blockSizeHint) throws
IOException {
+ return new DelegatingPositionOutputStream(byteArrayOutputStream) {
+
+ @Override
+ public long getPos() throws IOException {
+ return byteArrayOutputStream.size();
+ }
+ };
+ }
+
+ @Override
+ public boolean supportsBlockSize() {
+ return false;
+ }
+
+ @Override
+ public long defaultBlockSize() {
+ return 1024L;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
new file mode 100644
index 0000000000..168d7d0c44
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo;
+import org.apache.inlong.sdk.transform.process.Context;
+
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+
+/**
+ * ParquetSinkEncoder
+ */
+public class ParquetSinkEncoder implements SinkEncoder<ByteArrayOutputStream> {
+
+ protected ParquetSinkInfo sinkInfo;
+ protected Charset sinkCharset = Charset.defaultCharset();
+
+ private final List<FieldInfo> fields;
+ private ParquetByteArrayWriter<Object[]> writer;
+
+ public ParquetSinkEncoder(ParquetSinkInfo sinkInfo) {
+ this.sinkInfo = sinkInfo;
+ this.fields = sinkInfo.getFields();
+ ArrayList<Type> typesList = new ArrayList<>();
+ for (FieldInfo fieldInfo : this.fields) {
+ typesList.add(Types.required(BINARY)
+ .as(LogicalTypeAnnotation.stringType())
+ .named(fieldInfo.getName()));
+ }
+ MessageType schema = new MessageType("Output", typesList);
+ ParquetWriteRunner<Object[]> writeRunner = (record, valueWriter) -> {
+ for (int i = 0; i < record.length; i++) {
+ valueWriter.write(this.fields.get(i).getName(), record[i]);
+ }
+ };
+ try {
+ writer = ParquetByteArrayWriter.buildWriter(schema, writeRunner);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public ByteArrayOutputStream encode(SinkData sinkData, Context context) {
+ int size = this.fields.size();
+ Object[] rowsInfo = new Object[size];
+ Arrays.fill(rowsInfo, "");
+ for (int i = 0; i < size; i++) {
+ String fieldData = sinkData.getField(this.fields.get(i).getName());
+ if (fieldData == null) {
+ continue;
+ }
+ rowsInfo[i] = fieldData;
+ }
+ try {
+ writer.write(rowsInfo);
+ } catch (Exception ignored) {
+
+ }
+ return writer.getByteArrayOutputStream();
+ }
+
+ @Override
+ public List<FieldInfo> getFields() {
+ return this.fields;
+ }
+ public byte[] mergeByteArray(List<ByteArrayOutputStream> list) {
+ if (list.isEmpty()) {
+ return null;
+ }
+ try {
+ this.writer.close(); // need firstly close
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return list.get(0).toByteArray();
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetValueWriter.java
similarity index 58%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetValueWriter.java
index 30619078ac..0e3e57b999 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetValueWriter.java
@@ -17,21 +17,7 @@
package org.apache.inlong.sdk.transform.encode;
-import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
-import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
-import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+public interface ParquetValueWriter {
-public class SinkEncoderFactory {
-
- public static CsvSinkEncoder createCsvEncoder(CsvSinkInfo csvSinkInfo) {
- return new CsvSinkEncoder(csvSinkInfo);
- }
-
- public static KvSinkEncoder createKvEncoder(KvSinkInfo kvSinkInfo) {
- return new KvSinkEncoder(kvSinkInfo);
- }
-
- public static MapSinkEncoder createMapEncoder(MapSinkInfo mapSinkInfo) {
- return new MapSinkEncoder(mapSinkInfo);
- }
+ void write(String name, Object value);
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetWriteRunner.java
similarity index 58%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetWriteRunner.java
index 30619078ac..91c6fa590a 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetWriteRunner.java
@@ -17,21 +17,12 @@
package org.apache.inlong.sdk.transform.encode;
-import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
-import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
-import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+public interface ParquetWriteRunner<T> {
-public class SinkEncoderFactory {
-
- public static CsvSinkEncoder createCsvEncoder(CsvSinkInfo csvSinkInfo) {
- return new CsvSinkEncoder(csvSinkInfo);
- }
-
- public static KvSinkEncoder createKvEncoder(KvSinkInfo kvSinkInfo) {
- return new KvSinkEncoder(kvSinkInfo);
- }
-
- public static MapSinkEncoder createMapEncoder(MapSinkInfo mapSinkInfo) {
- return new MapSinkEncoder(mapSinkInfo);
- }
+ /**
+ * Write the specified record into the Parquet row by the supplied writer.
+ * @param record data that needs to be written
+ * @param valueWriter parquet data writer
+ */
+ void doWrite(T record, ParquetValueWriter valueWriter);
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
index 30619078ac..0fa308162b 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sdk.transform.encode;
import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo;
public class SinkEncoderFactory {
@@ -34,4 +35,8 @@ public class SinkEncoderFactory {
public static MapSinkEncoder createMapEncoder(MapSinkInfo mapSinkInfo) {
return new MapSinkEncoder(mapSinkInfo);
}
+
+ public static ParquetSinkEncoder createParquetEncoder(ParquetSinkInfo
parquetSinkInfo) {
+ return new ParquetSinkEncoder(parquetSinkInfo);
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ParquetSinkInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ParquetSinkInfo.java
new file mode 100644
index 0000000000..c54670e44a
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ParquetSinkInfo.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ParquetSinkInfo
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ParquetSinkInfo extends SinkInfo {
+
+ private List<FieldInfo> fields;
+
+ @JsonCreator
+ public ParquetSinkInfo(
+ @JsonProperty("charset") String charset,
+ @JsonProperty("fields") List<FieldInfo> fields) {
+ super(SinkInfo.PARQUET, charset);
+ if (fields != null) {
+ this.fields = fields;
+ } else {
+ this.fields = new ArrayList<>();
+ }
+ }
+
+ /**
+ * get fields
+ * @return the fields
+ */
+ @JsonProperty("fields")
+ public List<FieldInfo> getFields() {
+ return fields;
+ }
+
+ /**
+ * set fields
+ * @param fields the fields to set
+ */
+ public void setFields(List<FieldInfo> fields) {
+ this.fields = fields;
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
index 9c61c6b46c..3c976c1b4c 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
@@ -44,6 +44,7 @@ public abstract class SinkInfo {
public static final String CSV = "csv";
public static final String KV = "kv";
public static final String ES_MAP = "es_map";
+ public static final String PARQUET = "parquet";
@JsonIgnore
private String type;
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
index e99a3c83c4..3322d83199 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
@@ -17,11 +17,26 @@
package org.apache.inlong.sdk.transform.process.processor;
+import org.apache.inlong.sdk.transform.decode.ParquetInputByteArray;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
+
/**
* AbstractProcessorTestBase
* description: define static parameters for Processor tests
@@ -123,4 +138,38 @@ public abstract class AbstractProcessorTestBase {
byte[] srcBytes = Base64.getDecoder().decode(srcString);
return srcBytes;
}
+
+ public static List<String> ParquetByteArray2CsvStr(byte[] parquetBytes)
throws IOException {
+ InputFile inputFile = new ParquetInputByteArray(parquetBytes);
+ List<String> strRows = new ArrayList<>();
+ try (ParquetFileReader reader = ParquetFileReader.open(inputFile)) {
+ ParquetMetadata footer = reader.getFooter();
+ MessageType schema = footer.getFileMetaData().getSchema();
+ int fieldSize = schema.getFields().size();
+ PageReadStore pages;
+
+ while ((pages = reader.readNextRowGroup()) != null) {
+ long rows = pages.getRowCount();
+
+ ColumnIOFactory factory = new ColumnIOFactory();
+ MessageColumnIO columnIO = factory.getColumnIO(schema);
+
+ RecordMaterializer<Group> recordMaterializer = new
GroupRecordConverter(schema);
+
+ RecordReader<Group> recordReader =
columnIO.getRecordReader(pages, recordMaterializer);
+
+ for (int i = 0; i < rows; i++) {
+ Group group = recordReader.read();
+ if (group != null) {
+ StringBuilder builder = new StringBuilder();
+ for (int j = 0; j < fieldSize; j++) {
+ builder.append(group.getValueToString(j, 0) + "|");
+ }
+ strRows.add(builder.substring(0, builder.length() -
1));
+ }
+ }
+ }
+ }
+ return strRows;
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2ParquetProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2ParquetProcessor.java
new file mode 100644
index 0000000000..30e2d4f9e5
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2ParquetProcessor.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.processor;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.ParquetSinkEncoder;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.List;
+
+public class TestJson2ParquetProcessor extends AbstractProcessorTestBase {
+
+ @Test
+ public void testJson2Parquet() throws Exception {
+ List<FieldInfo> fields;
+ JsonSourceInfo jsonSource;
+ ParquetSinkInfo parquetSinkInfo;
+ ParquetSinkEncoder parquetEncoder;
+ String transformSql;
+ TransformConfig config;
+ TransformProcessor<String, ByteArrayOutputStream> processor;
+ String srcString;
+ List<ByteArrayOutputStream> output;
+ List<String> result;
+ byte[] bytes;
+
+ fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg");
+ jsonSource = new JsonSourceInfo("UTF-8", "msgs");
+ parquetSinkInfo = new ParquetSinkInfo("UTF-8", fields);
+ parquetEncoder =
SinkEncoderFactory.createParquetEncoder(parquetSinkInfo);
+ transformSql = "select
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
+ config = new TransformConfig(transformSql);
+ // case1
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createJsonDecoder(jsonSource),
+ parquetEncoder);
+ srcString = "{\n"
+ + " \"sid\":\"value1\",\n"
+ + " \"packageID\":\"value2\",\n"
+ + " \"msgs\":[\n"
+ + " {\"msg\":\"value4\",\"msgTime\":1713243918000},\n"
+ + " {\"msg\":\"v4\",\"msgTime\":1713243918000}\n"
+ + " ]\n"
+ + "}";
+ output = processor.transform(srcString, new HashMap<>());
+ bytes = parquetEncoder.mergeByteArray(output);
+ result = ParquetByteArray2CsvStr(bytes);
+ Assert.assertEquals(2, result.size());
+ Assert.assertEquals("value1|value2|1713243918000|value4",
result.get(0));
+ Assert.assertEquals("value1|value2|1713243918000|v4", result.get(1));
+
+ fields = this.getTestFieldList("id", "itemId", "subItemId", "msg");
+ jsonSource = new JsonSourceInfo("UTF-8", "items");
+ parquetSinkInfo = new ParquetSinkInfo("UTF-8", fields);
+ parquetEncoder =
SinkEncoderFactory.createParquetEncoder(parquetSinkInfo);
+ transformSql = "select
$root.id,$child.itemId,$child.subItems(0).subItemId,$child.subItems(1).msg from
source";
+ config = new TransformConfig(transformSql);
+ // case2
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createJsonDecoder(jsonSource),
+ parquetEncoder);
+ srcString = "{\n"
+ + " \"id\":\"value1\",\n"
+ + " \"name\":\"value2\",\n"
+ + " \"items\":[\n"
+ + " {\"itemId\":\"item1\",\n"
+ + " \"subItems\":[\n"
+ + " {\"subItemId\":\"1001\", \"msg\":\"1001msg\"},\n"
+ + " {\"subItemId\":\"1002\", \"msg\":\"1002msg\"}\n"
+ + " ]\n"
+ + " },\n"
+ + " {\"itemId\":\"item2\",\n"
+ + " \"subItems\":[\n"
+ + " {\"subItemId\":\"2001\", \"msg\":\"2001msg\"},\n"
+ + " {\"subItemId\":\"2002\", \"msg\":\"2002msg\"}\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + "}";
+ output = processor.transform(srcString, new HashMap<>());
+ bytes = parquetEncoder.mergeByteArray(output);
+ result = ParquetByteArray2CsvStr(bytes);
+ Assert.assertEquals(2, result.size());
+ Assert.assertEquals("value1|item1|1001|1002msg", result.get(0));
+ Assert.assertEquals("value1|item2|2001|2002msg", result.get(1));
+
+ fields = this.getTestFieldList("matrix(0,0)", "matrix(1,1)",
"matrix(2,2)");
+ jsonSource = new JsonSourceInfo("UTF-8", "");
+ parquetSinkInfo = new ParquetSinkInfo("UTF-8", fields);
+ parquetEncoder =
SinkEncoderFactory.createParquetEncoder(parquetSinkInfo);
+ transformSql = "select $root.matrix(0, 0), $root.matrix(1, 1),
$root.matrix(2, 2) from source";
+ config = new TransformConfig(transformSql);
+ // case3
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createJsonDecoder(jsonSource),
+ parquetEncoder);
+ srcString = "{\n"
+ + " \"matrix\": [\n"
+ + " [1, 2, 3],\n"
+ + " [4, 5, 6],\n"
+ + " [7, 8, 9]\n"
+ + " ]\n"
+ + "}";
+ output = processor.transform(srcString, new HashMap<>());
+ bytes = parquetEncoder.mergeByteArray(output);
+ result = ParquetByteArray2CsvStr(bytes);
+ Assert.assertEquals(1, result.size());
+ Assert.assertEquals("1|5|9", result.get(0));
+
+ fields = this.getTestFieldList("department_name", "course_id", "num");
+ jsonSource = new JsonSourceInfo("UTF-8", "");
+ parquetSinkInfo = new ParquetSinkInfo("UTF-8", fields);
+ parquetEncoder =
SinkEncoderFactory.createParquetEncoder(parquetSinkInfo);
+ transformSql =
+ "select $root.departments(0).name,
$root.departments(0).courses(0,1).courseId,
sqrt($root.departments(0).courses(0,1).courseId - 2) from source";
+ config = new TransformConfig(transformSql);
+ // case4
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createJsonDecoder(jsonSource),
+ parquetEncoder);
+ srcString = "{\n" +
+ " \"departments\": [\n" +
+ " {\n" +
+ " \"name\": \"Mathematics\",\n" +
+ " \"courses\": [\n" +
+ " [\n" +
+ " {\"courseId\": \"101\", \"title\": \"Calculus
I\"},\n" +
+ " {\"courseId\": \"102\", \"title\": \"Linear
Algebra\"}\n" +
+ " ],\n" +
+ " [\n" +
+ " {\"courseId\": \"201\", \"title\": \"Calculus
II\"},\n" +
+ " {\"courseId\": \"202\", \"title\": \"Abstract
Algebra\"}\n" +
+ " ]\n" +
+ " ]\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ output = processor.transform(srcString, new HashMap<>());
+ bytes = parquetEncoder.mergeByteArray(output);
+ result = ParquetByteArray2CsvStr(bytes);
+ Assert.assertEquals(1, result.size());
+ Assert.assertEquals("Mathematics|102|10.0", result.get(0));
+ }
+
+ @Test
+ public void testJson2ParquetForOne() throws Exception {
+ List<FieldInfo> fields;
+ JsonSourceInfo jsonSource;
+ ParquetSinkInfo parquetSinkInfo;
+ ParquetSinkEncoder parquetEncoder;
+ String transformSql;
+ TransformConfig config;
+ TransformProcessor<String, ByteArrayOutputStream> processor;
+ String srcString;
+ List<ByteArrayOutputStream> output;
+ List<String> result;
+ byte[] bytes;
+
+ fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg");
+ jsonSource = new JsonSourceInfo("UTF-8", "");
+ parquetSinkInfo = new ParquetSinkInfo("UTF-8", fields);
+ parquetEncoder =
SinkEncoderFactory.createParquetEncoder(parquetSinkInfo);
+ transformSql = "select
$root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
+ config = new TransformConfig(transformSql);
+ // case1
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createJsonDecoder(jsonSource),
+ parquetEncoder);
+ srcString = "{\n"
+ + " \"sid\":\"value1\",\n"
+ + " \"packageID\":\"value2\",\n"
+ + " \"msgs\":[\n"
+ + " {\"msg\":\"value4\",\"msgTime\":1713243918000},\n"
+ + " {\"msg\":\"v4\",\"msgTime\":1713243918000}\n"
+ + " ]\n"
+ + "}";
+ output = processor.transform(srcString, new HashMap<>());
+ bytes = parquetEncoder.mergeByteArray(output);
+ result = ParquetByteArray2CsvStr(bytes);
+ Assert.assertEquals(1, result.size());
+ Assert.assertEquals("value1|value2|1713243918000|value4",
result.get(0));
+ }
+}