This is an automated email from the ASF dual-hosted git repository. amoghj pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new 8134815a3a API, Core, Spark: Ignore partition fields that are dropped from the current-schema (#11868) 8134815a3a is described below commit 8134815a3a04f3139fccf55a7e02c336e7a1a6e8 Author: Fokko Driesprong <fo...@apache.org> AuthorDate: Wed Jul 2 17:02:34 2025 +0200 API, Core, Spark: Ignore partition fields that are dropped from the current-schema (#11868) Co-authored-by: Nynke Gaikema <nynkegaikema@MacBook-Air-van-Nynke.local> --- .../java/org/apache/iceberg/PartitionSpec.java | 20 +++++++++- .../org/apache/iceberg/UnboundPartitionSpec.java | 4 ++ .../org/apache/iceberg/PartitionSpecParser.java | 2 +- .../main/java/org/apache/iceberg/Partitioning.java | 8 ++-- .../java/org/apache/iceberg/TestPartitioning.java | 30 +++++++++++++- .../extensions/TestAlterTablePartitionFields.java | 46 ++++++++++++++++++++++ 6 files changed, 104 insertions(+), 6 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java index 2f9d5abe59..f059c928a9 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java @@ -132,6 +132,12 @@ public class PartitionSpec implements Serializable { for (PartitionField field : fields) { Type sourceType = schema.findType(field.sourceId()); Type resultType = field.transform().getResultType(sourceType); + + // When the source field has been dropped we cannot determine the type + if (sourceType == null) { + resultType = Types.UnknownType.get(); + } + structFields.add(Types.NestedField.optional(field.fieldId(), field.name(), resultType)); } @@ -614,8 +620,12 @@ public class PartitionSpec implements Serializable { } public PartitionSpec build() { + return build(false); + } + + public PartitionSpec build(boolean allowMissingFields) { PartitionSpec spec = buildUnchecked(); - checkCompatibility(spec, schema); + checkCompatibility(spec, schema, allowMissingFields); return spec; } @@ -625,10 +635,18 @@ public class PartitionSpec implements Serializable { } static void checkCompatibility(PartitionSpec spec, Schema schema) { + checkCompatibility(spec, schema, false); + } + + static void checkCompatibility(PartitionSpec spec, Schema schema, boolean allowMissingFields) { final Map<Integer, Integer> parents = TypeUtil.indexParents(schema.asStruct()); for (PartitionField field : spec.fields) { Type sourceType = schema.findType(field.sourceId()); Transform<?, ?> transform = field.transform(); + // In the case the underlying field is dropped, we cannot check if they are compatible + if (allowMissingFields && sourceType == null) { + continue; + } // In the case of a Version 1 partition-spec field gets deleted, // it is replaced with a void transform, see: // https://iceberg.apache.org/spec/#partition-transforms diff --git a/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java b/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java index cc8526f907..30b3cce35f 100644 --- a/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java @@ -46,6 +46,10 @@ public class UnboundPartitionSpec { return copyToBuilder(schema).build(); } + public PartitionSpec bind(Schema schema, boolean ignoreMissingFields) { + return copyToBuilder(schema).build(ignoreMissingFields); + } + PartitionSpec bindUnchecked(Schema schema) { return copyToBuilder(schema).buildUnchecked(); } diff --git a/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java b/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java index a51b03c8f0..7becf0c629 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java +++ b/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java @@ -68,7 +68,7 @@ public class PartitionSpecParser { } public static PartitionSpec fromJson(Schema schema, JsonNode json) { - return fromJson(json).bind(schema); + return fromJson(json).bind(schema, true); } public static UnboundPartitionSpec fromJson(JsonNode json) { diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java index 832e0b59fe..c708d39f52 100644 --- a/core/src/main/java/org/apache/iceberg/Partitioning.java +++ b/core/src/main/java/org/apache/iceberg/Partitioning.java @@ -239,7 +239,8 @@ public class Partitioning { */ public static StructType partitionType(Table table) { Collection<PartitionSpec> specs = table.specs().values(); - return buildPartitionProjectionType("table partition", specs, allFieldIds(specs)); + return buildPartitionProjectionType( + "table partition", specs, allActiveFieldIds(table.schema(), specs)); } /** @@ -346,10 +347,11 @@ public class Partitioning { || t2.equals(Transforms.alwaysNull()); } - // collects IDs of all partition field used across specs - private static Set<Integer> allFieldIds(Collection<PartitionSpec> specs) { + // collects IDs of all partition field used across specs that are in the current schema + private static Set<Integer> allActiveFieldIds(Schema schema, Collection<PartitionSpec> specs) { return FluentIterable.from(specs) .transformAndConcat(PartitionSpec::fields) + .filter(field -> schema.findField(field.sourceId()) != null) .transform(PartitionField::fieldId) .toSet(); } diff --git a/core/src/test/java/org/apache/iceberg/TestPartitioning.java b/core/src/test/java/org/apache/iceberg/TestPartitioning.java index da04e67bdd..eb77a693c7 100644 --- a/core/src/test/java/org/apache/iceberg/TestPartitioning.java +++ b/core/src/test/java/org/apache/iceberg/TestPartitioning.java @@ -172,7 +172,7 @@ public class TestPartitioning { PartitionSpec newSpec = PartitionSpec.builderFor(table.schema()).identity("category").build(); - TableOperations ops = ((HasTableOperations) table).operations(); + TableOperations ops = table.operations(); TableMetadata current = ops.current(); ops.commit(current, current.updatePartitionSpec(newSpec)); @@ -183,6 +183,34 @@ public class TestPartitioning { .hasMessageStartingWith("Conflicting partition fields"); } + @Test + public void testPartitionTypeIgnoreInactiveFields() { + TestTables.TestTable table = + TestTables.create( + tableDir, "test", SCHEMA, BY_DATA_CATEGORY_BUCKET_SPEC, V2_FORMAT_VERSION); + + StructType actualType = Partitioning.partitionType(table); + assertThat(actualType) + .isEqualTo( + StructType.of( + NestedField.optional(1000, "data", Types.StringType.get()), + NestedField.optional(1001, "category_bucket", Types.IntegerType.get()))); + + // Create a new spec, and drop the field of the old spec + table.updateSpec().removeField("category_bucket").commit(); + table.updateSchema().deleteColumn("category").commit(); + + actualType = Partitioning.partitionType(table); + assertThat(actualType) + .isEqualTo(StructType.of(NestedField.optional(1000, "data", Types.StringType.get()))); + + table.updateSpec().removeField("data").commit(); + table.updateSchema().deleteColumn("data").commit(); + + actualType = Partitioning.partitionType(table); + assertThat(actualType).isEqualTo(StructType.of()); + } + @Test public void testGroupingKeyTypeWithSpecEvolutionInV1Tables() { TestTables.TestTable table = diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java index dd49d8a254..7e5f5454ff 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java @@ -20,6 +20,9 @@ package org.apache.iceberg.spark.extensions; import static org.assertj.core.api.Assertions.assertThat; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.List; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; @@ -27,6 +30,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.source.SparkTable; import org.apache.spark.sql.connector.catalog.CatalogManager; @@ -583,4 +587,46 @@ public class TestAlterTablePartitionFields extends ExtensionsTestBase { tableName, schema, spec, TableProperties.FORMAT_VERSION, formatVersion); } } + + private void runCreateAndDropPartitionField( + String column, String partitionType, List<Object[]> expected, String predicate) { + sql("DROP TABLE IF EXISTS %s", tableName); + sql( + "CREATE TABLE %s (col_int INTEGER, col_ts TIMESTAMP_NTZ, col_long BIGINT) USING ICEBERG TBLPROPERTIES ('format-version' = %d)", + tableName, formatVersion); + sql("INSERT INTO %s VALUES (1000, CAST('2024-03-01 19:25:00' as TIMESTAMP), 2100)", tableName); + sql("ALTER TABLE %s ADD PARTITION FIELD %s AS col2_partition", tableName, partitionType); + sql("INSERT INTO %s VALUES (2000, CAST('2024-04-01 19:25:00' as TIMESTAMP), 2200)", tableName); + sql("ALTER TABLE %s DROP PARTITION FIELD col2_partition", tableName); + sql("INSERT INTO %s VALUES (3000, CAST('2024-05-01 19:25:00' as TIMESTAMP), 2300)", tableName); + sql("ALTER TABLE %s DROP COLUMN %s", tableName, column); + + assertEquals( + "Should return correct data", + expected, + sql("SELECT * FROM %s WHERE %s ORDER BY col_int", tableName, predicate)); + } + + @TestTemplate + public void testDropPartitionAndSourceColumnLong() { + String predicateTs = "col_long >= 2200"; + List<Object[]> expectedTs = + Lists.newArrayList(new Object[] {2000, 2200L}, new Object[] {3000, 2300L}); + runCreateAndDropPartitionField("col_ts", "col_ts", expectedTs, predicateTs); + runCreateAndDropPartitionField("col_ts", "year(col_ts)", expectedTs, predicateTs); + runCreateAndDropPartitionField("col_ts", "month(col_ts)", expectedTs, predicateTs); + runCreateAndDropPartitionField("col_ts", "day(col_ts)", expectedTs, predicateTs); + } + + @TestTemplate + public void testDropPartitionAndSourceColumnTimestamp() { + String predicate = "col_ts >= '2024-04-01 19:25:00'"; + List<Object[]> expected = + Lists.newArrayList( + new Object[] {2000, LocalDateTime.ofEpochSecond(1711999500, 0, ZoneOffset.UTC)}, + new Object[] {3000, LocalDateTime.ofEpochSecond(1714591500, 0, ZoneOffset.UTC)}); + runCreateAndDropPartitionField("col_long", "col_long", expected, predicate); + runCreateAndDropPartitionField("col_long", "truncate(2, col_long)", expected, predicate); + runCreateAndDropPartitionField("col_long", "bucket(16, col_long)", expected, predicate); + } }