This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 25335a0d9d Parquet: Fix null values when selecting nested field
partition column (#4627)
25335a0d9d is described below
commit 25335a0d9dcb0375a80abfb5920b4b43c3681330
Author: Xianyang Liu <[email protected]>
AuthorDate: Fri Nov 18 23:05:52 2022 +0800
Parquet: Fix null values when selecting nested field partition column
(#4627)
---
.../iceberg/flink/data/FlinkParquetReaders.java | 11 +++-
.../iceberg/flink/source/TestFlinkInputFormat.java | 51 ++++++++++++++
.../iceberg/data/parquet/BaseParquetReaders.java | 11 +++-
.../iceberg/parquet/ParquetValueReaders.java | 45 ++++++++++++-
.../org/apache/iceberg/pig/PigParquetReader.java | 11 +++-
.../iceberg/spark/data/SparkParquetReaders.java | 11 +++-
.../apache/iceberg/spark/source/ComplexRecord.java | 74 +++++++++++++++++++++
.../apache/iceberg/spark/source/NestedRecord.java | 77 ++++++++++++++++++++++
.../iceberg/spark/source/TestPartitionValues.java | 54 +++++++++++++++
9 files changed, 339 insertions(+), 6 deletions(-)
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index 4189d0ae42..ab7b1174c9 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -92,6 +92,7 @@ public class FlinkParquetReaders {
// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Map<Integer, Type> typesById = Maps.newHashMap();
+ Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
@@ -101,6 +102,9 @@ public class FlinkParquetReaders {
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
typesById.put(id, fieldType);
+ if (idToConstant.containsKey(id)) {
+ maxDefinitionLevelsById.put(id, fieldD);
+ }
}
}
}
@@ -110,11 +114,16 @@ public class FlinkParquetReaders {
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
List<Type> types =
Lists.newArrayListWithExpectedSize(expectedFields.size());
+ // Defaulting to parent max definition level
+ int defaultMaxDefinitionLevel =
type.getMaxDefinitionLevel(currentPath());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
-
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+ int fieldMaxDefinitionLevel =
+ maxDefinitionLevelsById.getOrDefault(id,
defaultMaxDefinitionLevel);
+ reorderedFields.add(
+ ParquetValueReaders.constant(idToConstant.get(id),
fieldMaxDefinitionLevel));
types.add(null);
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
index d21b8fd384..73d03710d3 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
@@ -21,12 +21,15 @@ package org.apache.iceberg.flink.source;
import static org.apache.iceberg.types.Types.NestedField.required;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -34,9 +37,11 @@ import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
+import org.junit.Assume;
import org.junit.Test;
/** Test {@link FlinkInputFormat}. */
@@ -135,6 +140,52 @@ public class TestFlinkInputFormat extends TestFlinkSource {
TestHelpers.assertRows(result, expected);
}
+ @Test
+ public void testReadPartitionColumn() throws Exception {
+ Assume.assumeTrue("Temporary skip ORC", FileFormat.ORC != fileFormat);
+
+ Schema nestedSchema =
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(
+ 2,
+ "struct",
+ Types.StructType.of(
+ Types.NestedField.optional(3, "innerId",
Types.LongType.get()),
+ Types.NestedField.optional(4, "innerName",
Types.StringType.get()))));
+ PartitionSpec spec =
+
PartitionSpec.builderFor(nestedSchema).identity("struct.innerName").build();
+
+ Table table =
+ catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
nestedSchema, spec);
+ List<Record> records = RandomGenericData.generate(nestedSchema, 10, 0L);
+ GenericAppenderHelper appender = new GenericAppenderHelper(table,
fileFormat, TEMPORARY_FOLDER);
+ for (Record record : records) {
+ org.apache.iceberg.TestHelpers.Row partition =
+ org.apache.iceberg.TestHelpers.Row.of(record.get(1,
Record.class).get(1));
+ appender.appendToTable(partition, Collections.singletonList(record));
+ }
+
+ TableSchema projectedSchema =
+ TableSchema.builder()
+ .field("struct", DataTypes.ROW(DataTypes.FIELD("innerName",
DataTypes.STRING())))
+ .build();
+ List<Row> result =
+ runFormat(
+ FlinkSource.forRowData()
+ .tableLoader(tableLoader())
+ .project(projectedSchema)
+ .buildFormat());
+
+ List<Row> expected = Lists.newArrayList();
+ for (Record record : records) {
+ Row nested = Row.of(((Record) record.get(1)).get(1));
+ expected.add(Row.of(nested));
+ }
+
+ TestHelpers.assertRows(result, expected);
+ }
+
private List<Row> runFormat(FlinkInputFormat inputFormat) throws IOException
{
RowType rowType = FlinkSchemaUtil.convert(inputFormat.projectedSchema());
return TestHelpers.readRows(inputFormat, rowType);
diff --git
a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
index c96074ebfd..80eafb05ca 100644
---
a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
+++
b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
@@ -129,6 +129,7 @@ public abstract class BaseParquetReaders<T> {
// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Map<Integer, Type> typesById = Maps.newHashMap();
+ Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
ParquetValueReader<?> fieldReader = fieldReaders.get(i);
@@ -138,6 +139,9 @@ public abstract class BaseParquetReaders<T> {
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD,
fieldReader));
typesById.put(id, fieldType);
+ if (idToConstant.containsKey(id)) {
+ maxDefinitionLevelsById.put(id, fieldD);
+ }
}
}
@@ -146,11 +150,16 @@ public abstract class BaseParquetReaders<T> {
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
List<Type> types =
Lists.newArrayListWithExpectedSize(expectedFields.size());
+ // Defaulting to parent max definition level
+ int defaultMaxDefinitionLevel =
type.getMaxDefinitionLevel(currentPath());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
-
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+ int fieldMaxDefinitionLevel =
+ maxDefinitionLevelsById.getOrDefault(id,
defaultMaxDefinitionLevel);
+ reorderedFields.add(
+ ParquetValueReaders.constant(idToConstant.get(id),
fieldMaxDefinitionLevel));
types.add(null);
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
index b995e17071..7d795b7598 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
@@ -55,6 +55,10 @@ public class ParquetValueReaders {
return new ConstantReader<>(value);
}
+ public static <C> ParquetValueReader<C> constant(C value, int
definitionLevel) {
+ return new ConstantReader<>(value, definitionLevel);
+ }
+
public static ParquetValueReader<Long> position() {
return new PositionReader();
}
@@ -113,9 +117,46 @@ public class ParquetValueReaders {
static class ConstantReader<C> implements ParquetValueReader<C> {
private final C constantValue;
+ private final TripleIterator<?> column;
+ private final List<TripleIterator<?>> children;
ConstantReader(C constantValue) {
this.constantValue = constantValue;
+ this.column = NullReader.NULL_COLUMN;
+ this.children = NullReader.COLUMNS;
+ }
+
+ ConstantReader(C constantValue, int definitionLevel) {
+ this.constantValue = constantValue;
+ this.column =
+ new TripleIterator<Object>() {
+ @Override
+ public int currentDefinitionLevel() {
+ return definitionLevel;
+ }
+
+ @Override
+ public int currentRepetitionLevel() {
+ return 0;
+ }
+
+ @Override
+ public <N> N nextNull() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public Object next() {
+ return null;
+ }
+ };
+
+ this.children = ImmutableList.of(column);
}
@Override
@@ -125,12 +166,12 @@ public class ParquetValueReaders {
@Override
public TripleIterator<?> column() {
- return NullReader.NULL_COLUMN;
+ return column;
}
@Override
public List<TripleIterator<?>> columns() {
- return NullReader.COLUMNS;
+ return children;
}
@Override
diff --git a/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java
b/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java
index 4c9e582e4c..935a99f9ab 100644
--- a/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java
+++ b/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java
@@ -136,6 +136,7 @@ public class PigParquetReader {
// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Map<Integer, Type> typesById = Maps.newHashMap();
+ Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
@@ -143,6 +144,9 @@ public class PigParquetReader {
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
typesById.put(id, fieldType);
+ if (partitionValues.containsKey(id)) {
+ maxDefinitionLevelsById.put(id, fieldD);
+ }
}
List<Types.NestedField> expectedFields =
@@ -150,11 +154,16 @@ public class PigParquetReader {
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
List<Type> types =
Lists.newArrayListWithExpectedSize(expectedFields.size());
+ // Defaulting to parent max definition level
+ int defaultMaxDefinitionLevel =
type.getMaxDefinitionLevel(currentPath());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
if (partitionValues.containsKey(id)) {
// the value may be null so containsKey is used to check for a
partition value
-
reorderedFields.add(ParquetValueReaders.constant(partitionValues.get(id)));
+ int fieldMaxDefinitionLevel =
+ maxDefinitionLevelsById.getOrDefault(id,
defaultMaxDefinitionLevel);
+ reorderedFields.add(
+ ParquetValueReaders.constant(partitionValues.get(id),
fieldMaxDefinitionLevel));
types.add(null);
} else {
ParquetValueReader<?> reader = readersById.get(id);
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 4b4964f05f..3ebd8644de 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -140,6 +140,7 @@ public class SparkParquetReaders {
// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Map<Integer, Type> typesById = Maps.newHashMap();
+ Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
@@ -148,6 +149,9 @@ public class SparkParquetReaders {
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
typesById.put(id, fieldType);
+ if (idToConstant.containsKey(id)) {
+ maxDefinitionLevelsById.put(id, fieldD);
+ }
}
}
@@ -156,11 +160,16 @@ public class SparkParquetReaders {
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
List<Type> types =
Lists.newArrayListWithExpectedSize(expectedFields.size());
+ // Defaulting to parent max definition level
+ int defaultMaxDefinitionLevel =
type.getMaxDefinitionLevel(currentPath());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
-
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+ int fieldMaxDefinitionLevel =
+ maxDefinitionLevelsById.getOrDefault(id,
defaultMaxDefinitionLevel);
+ reorderedFields.add(
+ ParquetValueReaders.constant(idToConstant.get(id),
fieldMaxDefinitionLevel));
types.add(null);
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/ComplexRecord.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/ComplexRecord.java
new file mode 100644
index 0000000000..42e8552578
--- /dev/null
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/ComplexRecord.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+
+public class ComplexRecord {
+ private long id;
+ private NestedRecord struct;
+
+ public ComplexRecord() {}
+
+ public ComplexRecord(long id, NestedRecord struct) {
+ this.id = id;
+ this.struct = struct;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public NestedRecord getStruct() {
+ return struct;
+ }
+
+ public void setStruct(NestedRecord struct) {
+ this.struct = struct;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ComplexRecord record = (ComplexRecord) o;
+ return id == record.id && Objects.equal(struct, record.struct);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(id, struct);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("id", id).add("struct",
struct).toString();
+ }
+}
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/NestedRecord.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/NestedRecord.java
new file mode 100644
index 0000000000..ca36bfd493
--- /dev/null
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/NestedRecord.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+
+public class NestedRecord {
+ private long innerId;
+ private String innerName;
+
+ public NestedRecord() {}
+
+ public NestedRecord(long innerId, String innerName) {
+ this.innerId = innerId;
+ this.innerName = innerName;
+ }
+
+ public long getInnerId() {
+ return innerId;
+ }
+
+ public String getInnerName() {
+ return innerName;
+ }
+
+ public void setInnerId(long iId) {
+ innerId = iId;
+ }
+
+ public void setInnerName(String name) {
+ innerName = name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ NestedRecord that = (NestedRecord) o;
+ return innerId == that.innerId && Objects.equal(innerName, that.innerName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(innerId, innerName);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("innerId", innerId)
+ .add("innerName", innerName)
+ .toString();
+ }
+}
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
index df2e4649d9..c231afd5f8 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
@@ -23,6 +23,8 @@ import static
org.apache.iceberg.types.Types.NestedField.required;
import java.io.File;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.Files;
@@ -39,6 +41,7 @@ import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@@ -51,6 +54,7 @@ import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -446,4 +450,54 @@ public class TestPartitionValues {
Assert.assertEquals("Number of rows should match", rows.size(),
actual.size());
}
+
+ @Test
+ public void testReadPartitionColumn() throws Exception {
+ Assume.assumeTrue("Temporary skip ORC", !"orc".equals(format));
+
+ Schema nestedSchema =
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(
+ 2,
+ "struct",
+ Types.StructType.of(
+ Types.NestedField.optional(3, "innerId",
Types.LongType.get()),
+ Types.NestedField.optional(4, "innerName",
Types.StringType.get()))));
+ PartitionSpec spec =
+
PartitionSpec.builderFor(nestedSchema).identity("struct.innerName").build();
+
+ // create table
+ HadoopTables tables = new
HadoopTables(spark.sessionState().newHadoopConf());
+ String baseLocation =
temp.newFolder("partition_by_nested_string").toString();
+ Table table = tables.create(nestedSchema, spec, baseLocation);
+ table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT,
format).commit();
+
+ // write into iceberg
+ MapFunction<Long, ComplexRecord> func =
+ value -> new ComplexRecord(value, new NestedRecord(value, "name_" +
value));
+ spark
+ .range(0, 10, 1, 1)
+ .map(func, Encoders.bean(ComplexRecord.class))
+ .write()
+ .format("iceberg")
+ .mode(SaveMode.Append)
+ .save(baseLocation);
+
+ List<String> actual =
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.VECTORIZATION_ENABLED,
String.valueOf(vectorized))
+ .load(baseLocation)
+ .select("struct.innerName")
+ .as(Encoders.STRING())
+ .collectAsList();
+
+ Assert.assertEquals("Number of rows should match", 10, actual.size());
+
+ List<String> inputRecords =
+ IntStream.range(0, 10).mapToObj(i -> "name_" +
i).collect(Collectors.toList());
+ Assert.assertEquals("Read object should be matched", inputRecords, actual);
+ }
}