This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 25335a0d9d Parquet: Fix null values when selecting nested field 
partition column (#4627)
25335a0d9d is described below

commit 25335a0d9dcb0375a80abfb5920b4b43c3681330
Author: Xianyang Liu <[email protected]>
AuthorDate: Fri Nov 18 23:05:52 2022 +0800

    Parquet: Fix null values when selecting nested field partition column 
(#4627)
---
 .../iceberg/flink/data/FlinkParquetReaders.java    | 11 +++-
 .../iceberg/flink/source/TestFlinkInputFormat.java | 51 ++++++++++++++
 .../iceberg/data/parquet/BaseParquetReaders.java   | 11 +++-
 .../iceberg/parquet/ParquetValueReaders.java       | 45 ++++++++++++-
 .../org/apache/iceberg/pig/PigParquetReader.java   | 11 +++-
 .../iceberg/spark/data/SparkParquetReaders.java    | 11 +++-
 .../apache/iceberg/spark/source/ComplexRecord.java | 74 +++++++++++++++++++++
 .../apache/iceberg/spark/source/NestedRecord.java  | 77 ++++++++++++++++++++++
 .../iceberg/spark/source/TestPartitionValues.java  | 54 +++++++++++++++
 9 files changed, 339 insertions(+), 6 deletions(-)

diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index 4189d0ae42..ab7b1174c9 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -92,6 +92,7 @@ public class FlinkParquetReaders {
       // match the expected struct's order
       Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
       Map<Integer, Type> typesById = Maps.newHashMap();
+      Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
       List<Type> fields = struct.getFields();
       for (int i = 0; i < fields.size(); i += 1) {
         Type fieldType = fields.get(i);
@@ -101,6 +102,9 @@ public class FlinkParquetReaders {
             int id = fieldType.getId().intValue();
             readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
             typesById.put(id, fieldType);
+            if (idToConstant.containsKey(id)) {
+              maxDefinitionLevelsById.put(id, fieldD);
+            }
           }
         }
       }
@@ -110,11 +114,16 @@ public class FlinkParquetReaders {
       List<ParquetValueReader<?>> reorderedFields =
           Lists.newArrayListWithExpectedSize(expectedFields.size());
       List<Type> types = 
Lists.newArrayListWithExpectedSize(expectedFields.size());
+      // Defaulting to parent max definition level
+      int defaultMaxDefinitionLevel = 
type.getMaxDefinitionLevel(currentPath());
       for (Types.NestedField field : expectedFields) {
         int id = field.fieldId();
         if (idToConstant.containsKey(id)) {
           // containsKey is used because the constant may be null
-          
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+          int fieldMaxDefinitionLevel =
+              maxDefinitionLevelsById.getOrDefault(id, 
defaultMaxDefinitionLevel);
+          reorderedFields.add(
+              ParquetValueReaders.constant(idToConstant.get(id), 
fieldMaxDefinitionLevel));
           types.add(null);
         } else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
           reorderedFields.add(ParquetValueReaders.position());
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
index d21b8fd384..73d03710d3 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
@@ -21,12 +21,15 @@ package org.apache.iceberg.flink.source;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -34,9 +37,11 @@ import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
+import org.junit.Assume;
 import org.junit.Test;
 
 /** Test {@link FlinkInputFormat}. */
@@ -135,6 +140,52 @@ public class TestFlinkInputFormat extends TestFlinkSource {
     TestHelpers.assertRows(result, expected);
   }
 
+  @Test
+  public void testReadPartitionColumn() throws Exception {
+    Assume.assumeTrue("Temporary skip ORC", FileFormat.ORC != fileFormat);
+
+    Schema nestedSchema =
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.LongType.get()),
+            Types.NestedField.optional(
+                2,
+                "struct",
+                Types.StructType.of(
+                    Types.NestedField.optional(3, "innerId", 
Types.LongType.get()),
+                    Types.NestedField.optional(4, "innerName", 
Types.StringType.get()))));
+    PartitionSpec spec =
+        
PartitionSpec.builderFor(nestedSchema).identity("struct.innerName").build();
+
+    Table table =
+        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
nestedSchema, spec);
+    List<Record> records = RandomGenericData.generate(nestedSchema, 10, 0L);
+    GenericAppenderHelper appender = new GenericAppenderHelper(table, 
fileFormat, TEMPORARY_FOLDER);
+    for (Record record : records) {
+      org.apache.iceberg.TestHelpers.Row partition =
+          org.apache.iceberg.TestHelpers.Row.of(record.get(1, 
Record.class).get(1));
+      appender.appendToTable(partition, Collections.singletonList(record));
+    }
+
+    TableSchema projectedSchema =
+        TableSchema.builder()
+            .field("struct", DataTypes.ROW(DataTypes.FIELD("innerName", 
DataTypes.STRING())))
+            .build();
+    List<Row> result =
+        runFormat(
+            FlinkSource.forRowData()
+                .tableLoader(tableLoader())
+                .project(projectedSchema)
+                .buildFormat());
+
+    List<Row> expected = Lists.newArrayList();
+    for (Record record : records) {
+      Row nested = Row.of(((Record) record.get(1)).get(1));
+      expected.add(Row.of(nested));
+    }
+
+    TestHelpers.assertRows(result, expected);
+  }
+
   private List<Row> runFormat(FlinkInputFormat inputFormat) throws IOException 
{
     RowType rowType = FlinkSchemaUtil.convert(inputFormat.projectedSchema());
     return TestHelpers.readRows(inputFormat, rowType);
diff --git 
a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java 
b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
index c96074ebfd..80eafb05ca 100644
--- 
a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
+++ 
b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
@@ -129,6 +129,7 @@ public abstract class BaseParquetReaders<T> {
       // match the expected struct's order
       Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
       Map<Integer, Type> typesById = Maps.newHashMap();
+      Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
       List<Type> fields = struct.getFields();
       for (int i = 0; i < fields.size(); i += 1) {
         ParquetValueReader<?> fieldReader = fieldReaders.get(i);
@@ -138,6 +139,9 @@ public abstract class BaseParquetReaders<T> {
           int id = fieldType.getId().intValue();
           readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReader));
           typesById.put(id, fieldType);
+          if (idToConstant.containsKey(id)) {
+            maxDefinitionLevelsById.put(id, fieldD);
+          }
         }
       }
 
@@ -146,11 +150,16 @@ public abstract class BaseParquetReaders<T> {
       List<ParquetValueReader<?>> reorderedFields =
           Lists.newArrayListWithExpectedSize(expectedFields.size());
       List<Type> types = 
Lists.newArrayListWithExpectedSize(expectedFields.size());
+      // Defaulting to parent max definition level
+      int defaultMaxDefinitionLevel = 
type.getMaxDefinitionLevel(currentPath());
       for (Types.NestedField field : expectedFields) {
         int id = field.fieldId();
         if (idToConstant.containsKey(id)) {
           // containsKey is used because the constant may be null
-          
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+          int fieldMaxDefinitionLevel =
+              maxDefinitionLevelsById.getOrDefault(id, 
defaultMaxDefinitionLevel);
+          reorderedFields.add(
+              ParquetValueReaders.constant(idToConstant.get(id), 
fieldMaxDefinitionLevel));
           types.add(null);
         } else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
           reorderedFields.add(ParquetValueReaders.position());
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
index b995e17071..7d795b7598 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
@@ -55,6 +55,10 @@ public class ParquetValueReaders {
     return new ConstantReader<>(value);
   }
 
+  public static <C> ParquetValueReader<C> constant(C value, int 
definitionLevel) {
+    return new ConstantReader<>(value, definitionLevel);
+  }
+
   public static ParquetValueReader<Long> position() {
     return new PositionReader();
   }
@@ -113,9 +117,46 @@ public class ParquetValueReaders {
 
   static class ConstantReader<C> implements ParquetValueReader<C> {
     private final C constantValue;
+    private final TripleIterator<?> column;
+    private final List<TripleIterator<?>> children;
 
     ConstantReader(C constantValue) {
       this.constantValue = constantValue;
+      this.column = NullReader.NULL_COLUMN;
+      this.children = NullReader.COLUMNS;
+    }
+
+    ConstantReader(C constantValue, int definitionLevel) {
+      this.constantValue = constantValue;
+      this.column =
+          new TripleIterator<Object>() {
+            @Override
+            public int currentDefinitionLevel() {
+              return definitionLevel;
+            }
+
+            @Override
+            public int currentRepetitionLevel() {
+              return 0;
+            }
+
+            @Override
+            public <N> N nextNull() {
+              return null;
+            }
+
+            @Override
+            public boolean hasNext() {
+              return false;
+            }
+
+            @Override
+            public Object next() {
+              return null;
+            }
+          };
+
+      this.children = ImmutableList.of(column);
     }
 
     @Override
@@ -125,12 +166,12 @@ public class ParquetValueReaders {
 
     @Override
     public TripleIterator<?> column() {
-      return NullReader.NULL_COLUMN;
+      return column;
     }
 
     @Override
     public List<TripleIterator<?>> columns() {
-      return NullReader.COLUMNS;
+      return children;
     }
 
     @Override
diff --git a/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java 
b/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java
index 4c9e582e4c..935a99f9ab 100644
--- a/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java
+++ b/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java
@@ -136,6 +136,7 @@ public class PigParquetReader {
       // match the expected struct's order
       Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
       Map<Integer, Type> typesById = Maps.newHashMap();
+      Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
       List<Type> fields = struct.getFields();
       for (int i = 0; i < fields.size(); i += 1) {
         Type fieldType = fields.get(i);
@@ -143,6 +144,9 @@ public class PigParquetReader {
         int id = fieldType.getId().intValue();
         readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
         typesById.put(id, fieldType);
+        if (partitionValues.containsKey(id)) {
+          maxDefinitionLevelsById.put(id, fieldD);
+        }
       }
 
       List<Types.NestedField> expectedFields =
@@ -150,11 +154,16 @@ public class PigParquetReader {
       List<ParquetValueReader<?>> reorderedFields =
           Lists.newArrayListWithExpectedSize(expectedFields.size());
       List<Type> types = 
Lists.newArrayListWithExpectedSize(expectedFields.size());
+      // Defaulting to parent max definition level
+      int defaultMaxDefinitionLevel = 
type.getMaxDefinitionLevel(currentPath());
       for (Types.NestedField field : expectedFields) {
         int id = field.fieldId();
         if (partitionValues.containsKey(id)) {
           // the value may be null so containsKey is used to check for a 
partition value
-          
reorderedFields.add(ParquetValueReaders.constant(partitionValues.get(id)));
+          int fieldMaxDefinitionLevel =
+              maxDefinitionLevelsById.getOrDefault(id, 
defaultMaxDefinitionLevel);
+          reorderedFields.add(
+              ParquetValueReaders.constant(partitionValues.get(id), 
fieldMaxDefinitionLevel));
           types.add(null);
         } else {
           ParquetValueReader<?> reader = readersById.get(id);
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 4b4964f05f..3ebd8644de 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -140,6 +140,7 @@ public class SparkParquetReaders {
       // match the expected struct's order
       Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
       Map<Integer, Type> typesById = Maps.newHashMap();
+      Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
       List<Type> fields = struct.getFields();
       for (int i = 0; i < fields.size(); i += 1) {
         Type fieldType = fields.get(i);
@@ -148,6 +149,9 @@ public class SparkParquetReaders {
           int id = fieldType.getId().intValue();
           readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
           typesById.put(id, fieldType);
+          if (idToConstant.containsKey(id)) {
+            maxDefinitionLevelsById.put(id, fieldD);
+          }
         }
       }
 
@@ -156,11 +160,16 @@ public class SparkParquetReaders {
       List<ParquetValueReader<?>> reorderedFields =
           Lists.newArrayListWithExpectedSize(expectedFields.size());
       List<Type> types = 
Lists.newArrayListWithExpectedSize(expectedFields.size());
+      // Defaulting to parent max definition level
+      int defaultMaxDefinitionLevel = 
type.getMaxDefinitionLevel(currentPath());
       for (Types.NestedField field : expectedFields) {
         int id = field.fieldId();
         if (idToConstant.containsKey(id)) {
           // containsKey is used because the constant may be null
-          
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+          int fieldMaxDefinitionLevel =
+              maxDefinitionLevelsById.getOrDefault(id, 
defaultMaxDefinitionLevel);
+          reorderedFields.add(
+              ParquetValueReaders.constant(idToConstant.get(id), 
fieldMaxDefinitionLevel));
           types.add(null);
         } else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
           reorderedFields.add(ParquetValueReaders.position());
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/ComplexRecord.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/ComplexRecord.java
new file mode 100644
index 0000000000..42e8552578
--- /dev/null
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/ComplexRecord.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+
+public class ComplexRecord {
+  private long id;
+  private NestedRecord struct;
+
+  public ComplexRecord() {}
+
+  public ComplexRecord(long id, NestedRecord struct) {
+    this.id = id;
+    this.struct = struct;
+  }
+
+  public long getId() {
+    return id;
+  }
+
+  public void setId(long id) {
+    this.id = id;
+  }
+
+  public NestedRecord getStruct() {
+    return struct;
+  }
+
+  public void setStruct(NestedRecord struct) {
+    this.struct = struct;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    ComplexRecord record = (ComplexRecord) o;
+    return id == record.id && Objects.equal(struct, record.struct);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(id, struct);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this).add("id", id).add("struct", 
struct).toString();
+  }
+}
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/NestedRecord.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/NestedRecord.java
new file mode 100644
index 0000000000..ca36bfd493
--- /dev/null
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/NestedRecord.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+
+public class NestedRecord {
+  private long innerId;
+  private String innerName;
+
+  public NestedRecord() {}
+
+  public NestedRecord(long innerId, String innerName) {
+    this.innerId = innerId;
+    this.innerName = innerName;
+  }
+
+  public long getInnerId() {
+    return innerId;
+  }
+
+  public String getInnerName() {
+    return innerName;
+  }
+
+  public void setInnerId(long iId) {
+    innerId = iId;
+  }
+
+  public void setInnerName(String name) {
+    innerName = name;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    NestedRecord that = (NestedRecord) o;
+    return innerId == that.innerId && Objects.equal(innerName, that.innerName);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(innerId, innerName);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("innerId", innerId)
+        .add("innerName", innerName)
+        .toString();
+  }
+}
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
index df2e4649d9..c231afd5f8 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
@@ -23,6 +23,8 @@ import static 
org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.File;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.avro.generic.GenericData;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.Files;
@@ -39,6 +41,7 @@ import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.spark.data.RandomData;
 import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -51,6 +54,7 @@ import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -446,4 +450,54 @@ public class TestPartitionValues {
 
     Assert.assertEquals("Number of rows should match", rows.size(), 
actual.size());
   }
+
+  @Test
+  public void testReadPartitionColumn() throws Exception {
+    Assume.assumeTrue("Temporary skip ORC", !"orc".equals(format));
+
+    Schema nestedSchema =
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.LongType.get()),
+            Types.NestedField.optional(
+                2,
+                "struct",
+                Types.StructType.of(
+                    Types.NestedField.optional(3, "innerId", 
Types.LongType.get()),
+                    Types.NestedField.optional(4, "innerName", 
Types.StringType.get()))));
+    PartitionSpec spec =
+        
PartitionSpec.builderFor(nestedSchema).identity("struct.innerName").build();
+
+    // create table
+    HadoopTables tables = new 
HadoopTables(spark.sessionState().newHadoopConf());
+    String baseLocation = 
temp.newFolder("partition_by_nested_string").toString();
+    Table table = tables.create(nestedSchema, spec, baseLocation);
+    table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, 
format).commit();
+
+    // write into iceberg
+    MapFunction<Long, ComplexRecord> func =
+        value -> new ComplexRecord(value, new NestedRecord(value, "name_" + 
value));
+    spark
+        .range(0, 10, 1, 1)
+        .map(func, Encoders.bean(ComplexRecord.class))
+        .write()
+        .format("iceberg")
+        .mode(SaveMode.Append)
+        .save(baseLocation);
+
+    List<String> actual =
+        spark
+            .read()
+            .format("iceberg")
+            .option(SparkReadOptions.VECTORIZATION_ENABLED, 
String.valueOf(vectorized))
+            .load(baseLocation)
+            .select("struct.innerName")
+            .as(Encoders.STRING())
+            .collectAsList();
+
+    Assert.assertEquals("Number of rows should match", 10, actual.size());
+
+    List<String> inputRecords =
+        IntStream.range(0, 10).mapToObj(i -> "name_" + 
i).collect(Collectors.toList());
+    Assert.assertEquals("Read object should be matched", inputRecords, actual);
+  }
 }

Reply via email to