This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 9724aa2f77 Spark 3.3, 3.4: Support read of partition metadata column
when table is over 1k (#10641)
9724aa2f77 is described below
commit 9724aa2f77a01537f65ca2b84185cc4c43ab2ab7
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Fri Jul 12 16:32:12 2024 -0700
Spark 3.3, 3.4: Support read of partition metadata column when table is
over 1k (#10641)
---
.../iceberg/spark/source/SparkScanBuilder.java | 49 ++++++++++++++++++++--
.../spark/source/TestSparkMetadataColumns.java | 49 ++++++++++++++++++++++
.../iceberg/spark/source/SparkScanBuilder.java | 49 ++++++++++++++++++++--
.../spark/source/TestSparkMetadataColumns.java | 47 +++++++++++++++++++++
4 files changed, 188 insertions(+), 6 deletions(-)
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index ef3138d677..afb0f434aa 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -20,6 +20,9 @@ package org.apache.iceberg.spark.source;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.BaseTable;
@@ -47,6 +50,7 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkAggregates;
import org.apache.iceberg.spark.SparkFilters;
@@ -362,15 +366,54 @@ public class SparkScanBuilder
private Schema schemaWithMetadataColumns() {
// metadata columns
- List<Types.NestedField> fields =
+ List<Types.NestedField> metadataFields =
metaColumns.stream()
.distinct()
.map(name -> MetadataColumns.metadataColumn(table, name))
.collect(Collectors.toList());
- Schema meta = new Schema(fields);
+ Schema metadataSchema = calculateMetadataSchema(metadataFields);
// schema or rows returned by readers
- return TypeUtil.join(schema, meta);
+ return TypeUtil.join(schema, metadataSchema);
+ }
+
+ private Schema calculateMetadataSchema(List<Types.NestedField>
metaColumnFields) {
+ Optional<Types.NestedField> partitionField =
+ metaColumnFields.stream()
+ .filter(f -> MetadataColumns.PARTITION_COLUMN_ID == f.fieldId())
+ .findFirst();
+
+ // only calculate potential column id collision if partition metadata
column was requested
+ if (!partitionField.isPresent()) {
+ return new Schema(metaColumnFields);
+ }
+
+ Set<Integer> idsToReassign =
+
TypeUtil.indexById(partitionField.get().type().asStructType()).keySet();
+
+ // Calculate used ids by union metadata columns with all base table schemas
+ Set<Integer> currentlyUsedIds =
+
metaColumnFields.stream().map(Types.NestedField::fieldId).collect(Collectors.toSet());
+ Set<Integer> allUsedIds =
+ table.schemas().values().stream()
+ .map(currSchema ->
TypeUtil.indexById(currSchema.asStruct()).keySet())
+ .reduce(currentlyUsedIds, Sets::union);
+
+ // Reassign selected ids to deduplicate with used ids.
+ AtomicInteger nextId = new AtomicInteger();
+ return new Schema(
+ metaColumnFields,
+ table.schema().identifierFieldIds(),
+ oldId -> {
+ if (!idsToReassign.contains(oldId)) {
+ return oldId;
+ }
+ int candidate = nextId.incrementAndGet();
+ while (allUsedIds.contains(candidate)) {
+ candidate = nextId.incrementAndGet();
+ }
+ return candidate;
+ });
}
@Override
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
index e24e74383b..127b0eb66f 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
@@ -24,6 +24,7 @@ import static
org.apache.iceberg.TableProperties.ORC_VECTORIZATION_ENABLED;
import static org.apache.iceberg.TableProperties.PARQUET_BATCH_SIZE;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED;
+import static org.apache.spark.sql.functions.expr;
import static org.apache.spark.sql.functions.lit;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -31,6 +32,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataColumns;
@@ -53,6 +55,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -169,6 +172,52 @@ public class TestSparkMetadataColumns extends
SparkTestBase {
sql("SELECT _spec_id, _partition FROM %s ORDER BY _spec_id",
TABLE_NAME));
}
+ @Test
+ public void testPartitionMetadataColumnWithManyColumns() {
+ // TODO: support metadata structs in vectorized ORC reads
+ Assume.assumeFalse(fileFormat == FileFormat.ORC && vectorized);
+ List<Types.NestedField> fields =
+ Lists.newArrayList(Types.NestedField.required(0, "id",
Types.LongType.get()));
+ List<Types.NestedField> additionalCols =
+ IntStream.range(1, 1010)
+ .mapToObj(i -> Types.NestedField.optional(i, "c" + i,
Types.StringType.get()))
+ .collect(Collectors.toList());
+ fields.addAll(additionalCols);
+ Schema manyColumnsSchema = new Schema(fields);
+ PartitionSpec spec =
PartitionSpec.builderFor(manyColumnsSchema).identity("id").build();
+
+ TableOperations ops = ((HasTableOperations) table).operations();
+ TableMetadata base = ops.current();
+ ops.commit(
+ base,
+ base.updateSchema(manyColumnsSchema,
manyColumnsSchema.highestFieldId())
+ .updatePartitionSpec(spec));
+
+ Dataset<Row> df =
+ spark
+ .range(2)
+ .withColumns(
+ IntStream.range(1, 1010)
+ .boxed()
+ .collect(Collectors.toMap(i -> "c" + i, i -> expr("CAST(id
as STRING)"))));
+ StructType sparkSchema = spark.table(TABLE_NAME).schema();
+ spark
+ .createDataFrame(df.rdd(), sparkSchema)
+ .coalesce(1)
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(TABLE_NAME);
+
+ Assert.assertEquals(2, spark.table(TABLE_NAME).select("*",
"_partition").count());
+ List<Object[]> expected =
+ ImmutableList.of(row(row(0L), 0L, "0", "0", "0"), row(row(1L), 1L,
"1", "1", "1"));
+ assertEquals(
+ "Rows must match",
+ expected,
+ sql("SELECT _partition, id, c999, c1000, c1001 FROM %s ORDER BY id",
TABLE_NAME));
+ }
+
@Test
public void testPositionMetadataColumnWithMultipleRowGroups() throws
NoSuchTableException {
Assume.assumeTrue(fileFormat == FileFormat.PARQUET);
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 6b97e48133..9dc214a755 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -20,6 +20,9 @@ package org.apache.iceberg.spark.source;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.BaseTable;
@@ -48,6 +51,7 @@ import org.apache.iceberg.metrics.InMemoryMetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkAggregates;
import org.apache.iceberg.spark.SparkReadConf;
@@ -342,15 +346,54 @@ public class SparkScanBuilder
private Schema schemaWithMetadataColumns() {
// metadata columns
- List<Types.NestedField> fields =
+ List<Types.NestedField> metadataFields =
metaColumns.stream()
.distinct()
.map(name -> MetadataColumns.metadataColumn(table, name))
.collect(Collectors.toList());
- Schema meta = new Schema(fields);
+ Schema metadataSchema = calculateMetadataSchema(metadataFields);
// schema or rows returned by readers
- return TypeUtil.join(schema, meta);
+ return TypeUtil.join(schema, metadataSchema);
+ }
+
+ private Schema calculateMetadataSchema(List<Types.NestedField>
metaColumnFields) {
+ Optional<Types.NestedField> partitionField =
+ metaColumnFields.stream()
+ .filter(f -> MetadataColumns.PARTITION_COLUMN_ID == f.fieldId())
+ .findFirst();
+
+ // only calculate potential column id collision if partition metadata
column was requested
+ if (!partitionField.isPresent()) {
+ return new Schema(metaColumnFields);
+ }
+
+ Set<Integer> idsToReassign =
+
TypeUtil.indexById(partitionField.get().type().asStructType()).keySet();
+
+ // Calculate used ids by union metadata columns with all base table schemas
+ Set<Integer> currentlyUsedIds =
+
metaColumnFields.stream().map(Types.NestedField::fieldId).collect(Collectors.toSet());
+ Set<Integer> allUsedIds =
+ table.schemas().values().stream()
+ .map(currSchema ->
TypeUtil.indexById(currSchema.asStruct()).keySet())
+ .reduce(currentlyUsedIds, Sets::union);
+
+ // Reassign selected ids to deduplicate with used ids.
+ AtomicInteger nextId = new AtomicInteger();
+ return new Schema(
+ metaColumnFields,
+ table.schema().identifierFieldIds(),
+ oldId -> {
+ if (!idsToReassign.contains(oldId)) {
+ return oldId;
+ }
+ int candidate = nextId.incrementAndGet();
+ while (allUsedIds.contains(candidate)) {
+ candidate = nextId.incrementAndGet();
+ }
+ return candidate;
+ });
}
@Override
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
index 778c46bba6..0ba34a638a 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
@@ -24,6 +24,7 @@ import static
org.apache.iceberg.TableProperties.ORC_VECTORIZATION_ENABLED;
import static org.apache.iceberg.TableProperties.PARQUET_BATCH_SIZE;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED;
+import static org.apache.spark.sql.functions.expr;
import static org.apache.spark.sql.functions.lit;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -31,6 +32,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataColumns;
@@ -53,6 +55,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -169,6 +172,50 @@ public class TestSparkMetadataColumns extends
SparkTestBase {
sql("SELECT _spec_id, _partition FROM %s ORDER BY _spec_id",
TABLE_NAME));
}
+ @Test
+ public void testPartitionMetadataColumnWithManyColumns() {
+ List<Types.NestedField> fields =
+ Lists.newArrayList(Types.NestedField.required(0, "id",
Types.LongType.get()));
+ List<Types.NestedField> additionalCols =
+ IntStream.range(1, 1010)
+ .mapToObj(i -> Types.NestedField.optional(i, "c" + i,
Types.StringType.get()))
+ .collect(Collectors.toList());
+ fields.addAll(additionalCols);
+ Schema manyColumnsSchema = new Schema(fields);
+ PartitionSpec spec =
PartitionSpec.builderFor(manyColumnsSchema).identity("id").build();
+
+ TableOperations ops = ((HasTableOperations) table).operations();
+ TableMetadata base = ops.current();
+ ops.commit(
+ base,
+ base.updateSchema(manyColumnsSchema,
manyColumnsSchema.highestFieldId())
+ .updatePartitionSpec(spec));
+
+ Dataset<Row> df =
+ spark
+ .range(2)
+ .withColumns(
+ IntStream.range(1, 1010)
+ .boxed()
+ .collect(Collectors.toMap(i -> "c" + i, i -> expr("CAST(id
as STRING)"))));
+ StructType sparkSchema = spark.table(TABLE_NAME).schema();
+ spark
+ .createDataFrame(df.rdd(), sparkSchema)
+ .coalesce(1)
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(TABLE_NAME);
+
+ Assert.assertEquals(2, spark.table(TABLE_NAME).select("*",
"_partition").count());
+ List<Object[]> expected =
+ ImmutableList.of(row(row(0L), 0L, "0", "0", "0"), row(row(1L), 1L,
"1", "1", "1"));
+ assertEquals(
+ "Rows must match",
+ expected,
+ sql("SELECT _partition, id, c999, c1000, c1001 FROM %s ORDER BY id",
TABLE_NAME));
+ }
+
@Test
public void testPositionMetadataColumnWithMultipleRowGroups() throws
NoSuchTableException {
Assume.assumeTrue(fileFormat == FileFormat.PARQUET);