ConeyLiu commented on code in PR #4627:
URL: https://github.com/apache/iceberg/pull/4627#discussion_r872923557
##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java:
##########
@@ -112,11 +116,14 @@ public ParquetValueReader<RowData>
struct(Types.StructType expected, GroupType s
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(
expectedFields.size());
List<Type> types =
Lists.newArrayListWithExpectedSize(expectedFields.size());
+ // Inferring MaxDefinitionLevel from parent field
+ int inferredMaxDefinitionLevel =
type.getMaxDefinitionLevel(currentPath());
Review Comment:
@kbendick, we could not update the `maxDefinitionLevelsById ` if the
`fieldType.getId()` is null, you could see it at L101.
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java:
##########
@@ -432,4 +436,48 @@ public void testPartitionedByNestedString() throws
Exception {
Assert.assertEquals("Number of rows should match", rows.size(),
actual.size());
}
+
+ @Test
+ public void testReadPartitionColumn() throws Exception {
+ Assume.assumeTrue("Temporary skip ORC", !"orc".equals(format));
+
+ Schema nestedSchema = new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "struct",
+ Types.StructType.of(
+ Types.NestedField.optional(3, "innerId", Types.LongType.get()),
+ Types.NestedField.optional(4, "innerName",
Types.StringType.get())
+ )
+ )
+ );
+ PartitionSpec spec =
PartitionSpec.builderFor(nestedSchema).identity("struct.innerName").build();
+
+ // create table
+ HadoopTables tables = new
HadoopTables(spark.sessionState().newHadoopConf());
+ String baseLocation =
temp.newFolder("partition_by_nested_string").toString();
+ Table table = tables.create(nestedSchema, spec, baseLocation);
+ table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT,
format).commit();
+
+ // write into iceberg
+ MapFunction<Long, ComplexRecord> func =
+ value -> new ComplexRecord(value, new NestedRecord(value, "name_" +
value));
+ spark.range(0, 10, 1, 1).map(func, Encoders.bean(ComplexRecord.class))
+ .write()
+ .format("iceberg")
+ .mode(SaveMode.Append)
+ .save(baseLocation);
Review Comment:
Updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]