This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5c6b11329c [Bug][Connector][FileBase]Parquet reader parsing array type
exception. (#4457)
5c6b11329c is described below
commit 5c6b11329cb98764af664fb46ed4aa9481eb73e1
Author: lightzhao <[email protected]>
AuthorDate: Tue Nov 28 15:56:15 2023 +0800
[Bug][Connector][FileBase]Parquet reader parsing array type exception.
(#4457)
---------
Co-authored-by: lightzhao <[email protected]>
---
.../file/source/reader/ParquetReadStrategy.java | 12 ++-
.../file/writer/ParquetReadStrategyTest.java | 92 ++++++++++++++++++++++
2 files changed, 103 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index e7a0c0af4a..fcc1ca6e43 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -37,6 +37,7 @@ import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -132,7 +133,16 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
switch (fieldType.getSqlType()) {
case ARRAY:
ArrayList<Object> origArray = new ArrayList<>();
- ((GenericData.Array<?>)
field).iterator().forEachRemaining(origArray::add);
+ ((GenericData.Array<?>) field)
+ .iterator()
+ .forEachRemaining(
+ ele -> {
+ if (ele instanceof Utf8) {
+ origArray.add(ele.toString());
+ } else {
+ origArray.add(ele);
+ }
+ });
SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>)
fieldType).getElementType();
switch (elementType.getSqlType()) {
case STRING:
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
index ffac80407b..731fa8601d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -21,17 +21,32 @@ import
org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
+import java.io.IOException;
import java.net.URL;
import java.nio.file.Paths;
import java.time.LocalDateTime;
@@ -157,6 +172,29 @@ public class ParquetReadStrategyTest {
parquetReadStrategy.read(path, "", testCollector);
}
+ @DisabledOnOs(OS.WINDOWS)
+ @Test
+ public void testParquetReadArray() throws Exception {
+ AutoGenerateParquetData.generateTestData();
+ ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+ LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ parquetReadStrategy.init(localConf);
+ SeaTunnelRowType seaTunnelRowTypeInfo =
+ parquetReadStrategy.getSeaTunnelRowTypeInfo(
+ localConf, AutoGenerateParquetData.DATA_FILE_PATH);
+ Assertions.assertNotNull(seaTunnelRowTypeInfo);
+
Assertions.assertEquals(seaTunnelRowTypeInfo.getFieldType(3).getClass(),
ArrayType.class);
+ TestCollector testCollector = new TestCollector();
+ parquetReadStrategy.read(AutoGenerateParquetData.DATA_FILE_PATH, "1",
testCollector);
+ List<SeaTunnelRow> rows = testCollector.getRows();
+ SeaTunnelRow seaTunnelRow = rows.get(0);
+ Assertions.assertEquals(seaTunnelRow.getField(1).toString(), "Alice");
+ String[] arrayData = (String[]) seaTunnelRow.getField(3);
+ Assertions.assertEquals(arrayData.length, 2);
+ Assertions.assertEquals(arrayData[0], "Java");
+ AutoGenerateParquetData.deleteFile();
+ }
+
public static class TestCollector implements Collector<SeaTunnelRow> {
private final List<SeaTunnelRow> rows = new ArrayList<>();
@@ -195,4 +233,58 @@ public class ParquetReadStrategyTest {
return SCHEMA;
}
}
+
+ public static class AutoGenerateParquetData {
+
+ public static final String DATA_FILE_PATH = "/tmp/data.parquet";
+
+ public static void generateTestData() throws IOException {
+ deleteFile();
+ String schemaString =
+
"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"salary\",\"type\":\"double\"},{\"name\":\"skills\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}";
+ Schema schema = new Schema.Parser().parse(schemaString);
+
+ Configuration conf = new Configuration();
+
+ Path file = new Path(DATA_FILE_PATH);
+
+ ParquetWriter<GenericRecord> writer =
+ AvroParquetWriter.<GenericRecord>builder(file)
+ .withSchema(schema)
+ .withConf(conf)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .build();
+
+ GenericRecord record1 = new GenericData.Record(schema);
+ record1.put("id", 1);
+ record1.put("name", "Alice");
+ record1.put("salary", 50000.0);
+ GenericArray<Utf8> skills1 =
+ new GenericData.Array<>(2,
schema.getField("skills").schema());
+ skills1.add(new Utf8("Java"));
+ skills1.add(new Utf8("Python"));
+ record1.put("skills", skills1);
+ writer.write(record1);
+
+ GenericRecord record2 = new GenericData.Record(schema);
+ record2.put("id", 2);
+ record2.put("name", "Bob");
+ record2.put("salary", 60000.0);
+ GenericArray<Utf8> skills2 =
+ new GenericData.Array<>(2,
schema.getField("skills").schema());
+ skills2.add(new Utf8("C++"));
+ skills2.add(new Utf8("Go"));
+ record2.put("skills", skills2);
+ writer.write(record2);
+
+ writer.close();
+ }
+
+ public static void deleteFile() {
+ File parquetFile = new File(DATA_FILE_PATH);
+ if (parquetFile.exists()) {
+ parquetFile.delete();
+ }
+ }
+ }
}