SinghAsDev commented on a change in pull request #3774:
URL: https://github.com/apache/iceberg/pull/3774#discussion_r791088471
##########
File path:
parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java
##########
@@ -96,11 +96,13 @@ public Type struct(GroupType struct, List<Type> fieldTypes)
{
@Override
public Type list(GroupType array, Type elementType) {
- GroupType repeated = array.getType(0).asGroupType();
- org.apache.parquet.schema.Type element = repeated.getType(0);
+ org.apache.parquet.schema.Type repeated = array.getType(0);
+ org.apache.parquet.schema.Type repeatedElement = array.getFields().get(0);
+ boolean isElementType =
ParquetSchemaUtil.isListElementType(repeatedElement, array.getName());
+ org.apache.parquet.schema.Type element = isElementType ? repeated :
repeated.asGroupType().getType(0);
Preconditions.checkArgument(
- !element.isRepetition(Repetition.REPEATED),
+ isElementType || !element.isRepetition(Repetition.REPEATED),
"Elements cannot have repetition REPEATED: %s", element);
Review comment:
Yea, forgot to drop this in earlier update. Dropping it.
##########
File path:
parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
##########
@@ -164,4 +164,48 @@ public Boolean primitive(PrimitiveType primitive) {
}
}
+ // Parquet LIST backwards-compatibility rules.
+ //
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
+ public static boolean isOldListElementType(Type repeatedType, String
parentName) {
Review comment:
This is also used outside of iceberg-parquet, like iceberg-spark. Good
point on making method to accept list directly, updating.
##########
File path:
parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java
##########
@@ -96,11 +96,13 @@ public Type struct(GroupType struct, List<Type> fieldTypes)
{
@Override
public Type list(GroupType array, Type elementType) {
- GroupType repeated = array.getType(0).asGroupType();
- org.apache.parquet.schema.Type element = repeated.getType(0);
+ org.apache.parquet.schema.Type repeated = array.getType(0);
+ org.apache.parquet.schema.Type repeatedElement = array.getFields().get(0);
Review comment:
Likely a copy paste error, we only need one of them. Updating.
##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java
##########
@@ -108,15 +108,16 @@ public Type struct(GroupType struct, List<Type> fields) {
@Override
public Type list(GroupType list, Type element) {
- GroupType repeated = list.getType(0).asGroupType();
- Type originalElement = repeated.getType(0);
+ Type repeated = list.getType(0);
+ boolean isOldListElementType =
ParquetSchemaUtil.isOldListElementType(repeated, list.getName());
+ Type originalElement = isOldListElementType ? repeated :
repeated.asGroupType().getType(0);
Integer elementId = getId(originalElement);
if (elementId != null && selectedIds.contains(elementId)) {
return list;
} else if (element != null) {
if (!Objects.equal(element, originalElement)) {
- return list.withNewFields(repeated.withNewFields(element));
+ return
list.withNewFields(repeated.asGroupType().withNewFields(element));
Review comment:
Thanks for catching this.
##########
File path:
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
##########
@@ -169,6 +178,60 @@ public void
testInt96TimestampProducedBySparkIsReadCorrectly() throws IOExceptio
}
}
+ @Test
+ public void testWriteReadAvroBinary() throws IOException {
+ String schema = "{" +
+ "\"type\":\"record\"," +
+ "\"name\":\"DbRecord\"," +
+ "\"namespace\":\"com.iceberg\"," +
+ "\"fields\":[" +
+ "{\"name\":\"arraybytes\", " +
+ "\"type\":[ \"null\", { \"type\":\"array\", \"items\":\"bytes\"}],
\"default\":null}," +
+ "{\"name\":\"topbytes\", \"type\":\"bytes\"}" +
+ "]" +
+ "}";
+
+ org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
+ org.apache.avro.Schema avroSchema = parser.parse(schema);
+ AvroSchemaConverter converter = new AvroSchemaConverter();
+ MessageType parquetScehma = converter.convert(avroSchema);
+ Schema icebergSchema = ParquetSchemaUtil.convert(parquetScehma);
+
+ File testFile = temp.newFile();
+ Assert.assertTrue(testFile.delete());
+
+ ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(new Path(testFile.toURI()))
+ .withDataModel(GenericData.get())
+ .withSchema(avroSchema)
+ .config("parquet.avro.add-list-element-records", "true")
+ .config("parquet.avro.write-old-list-structure", "true")
+ .build();
+
+ GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema);
+ List<ByteBuffer> expectedByteList = new ArrayList();
+ byte[] expectedByte = {0x00, 0x01};
+ expectedByteList.add(ByteBuffer.wrap(expectedByte));
+
+ recordBuilder.set("arraybytes", expectedByteList);
+ recordBuilder.set("topbytes", ByteBuffer.wrap(expectedByte));
+ GenericData.Record record = recordBuilder.build();
+ writer.write(record);
+ writer.close();
+
+ List<InternalRow> rows;
+ try (CloseableIterable<InternalRow> reader =
+ Parquet.read(Files.localInput(testFile))
+ .project(icebergSchema)
+ .createReaderFunc(type ->
SparkParquetReaders.buildReader(icebergSchema, type))
+ .build()) {
Review comment:
`ParquetSchema.convert(MessageType)` assigns fallback ids to top-level
fields and so the fields were being read.
##########
File path:
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java
##########
@@ -610,7 +623,71 @@ public void testStructOfThreeLevelLists() throws Exception
{
structOfThreeLevelLists(false);
}
- public void threeLevelList(boolean useLegacyMode) throws Exception {
+ @Test
+ public void testTwoLevelList() throws IOException {
+ spark.conf().set("spark.sql.parquet.writeLegacyFormat", true);
+
+ String tableName = sourceName("testTwoLevelList");
+ File location = temp.newFolder();
+
+ StructType sparkSchema =
+ new StructType(
+ new StructField[]{
+ new StructField(
+ "col1", new ArrayType(
+ new StructType(
+ new StructField[]{
+ new StructField(
+ "col2",
+ DataTypes.IntegerType,
+ false,
+ Metadata.empty())
+ }), false), true, Metadata.empty())});
+ String expectedParquetSchema =
+ "message spark_schema {\n" +
+ " optional group col1 (LIST) {\n" +
+ " repeated group array {\n" +
+ " required int32 col2;\n" +
+ " }\n" +
+ " }\n" +
+ "}\n";
+
+
+ // generate parquet file with required schema
+ List<String> testData = Collections.singletonList("{\"col1\": [{\"col2\":
1}]}");
+ spark.read().schema(sparkSchema).json(
+
JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(testData))
+
.coalesce(1).write().format("parquet").mode(SaveMode.Append).save(location.getPath());
+
+ File parquetFile =
Arrays.stream(Objects.requireNonNull(location.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.endsWith("parquet");
+ }
+ }))).findAny().get();
+
+ // verify generated parquet file has expected schema
+ ParquetFileReader pqReader =
ParquetFileReader.open(HadoopInputFile.fromPath(new Path(parquetFile.getPath()),
+ new Configuration()));
+ MessageType schema = pqReader.getFooter().getFileMetaData().getSchema();
+ Assert.assertEquals(expectedParquetSchema, schema.toString());
Review comment:
I did not understand this, I am already comparing parquet schemas here.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
##########
@@ -187,13 +187,14 @@ private SparkParquetReaders() {
@Override
public ParquetValueReader<?> list(Types.ListType expectedList, GroupType
array,
ParquetValueReader<?> elementReader) {
- GroupType repeated = array.getFields().get(0).asGroupType();
+ Type repeated = array.getFields().get(0);
+ boolean isOldListElementType =
ParquetSchemaUtil.isOldListElementType(repeated, array.getName());
String[] repeatedPath = currentPath();
int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
- Type elementType = repeated.getType(0);
+ Type elementType = isOldListElementType ? repeated :
repeated.asGroupType().getType(0);
Review comment:
Added. As we discussed, I will backport to older versions of Spark and
flink in a separate diff.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]