This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 8134815a3a API, Core, Spark: Ignore partition fields that are dropped 
from the current-schema (#11868)
8134815a3a is described below

commit 8134815a3a04f3139fccf55a7e02c336e7a1a6e8
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Wed Jul 2 17:02:34 2025 +0200

    API, Core, Spark: Ignore partition fields that are dropped from the 
current-schema (#11868)
    
    Co-authored-by: Nynke Gaikema <nynkegaikema@MacBook-Air-van-Nynke.local>
---
 .../java/org/apache/iceberg/PartitionSpec.java     | 20 +++++++++-
 .../org/apache/iceberg/UnboundPartitionSpec.java   |  4 ++
 .../org/apache/iceberg/PartitionSpecParser.java    |  2 +-
 .../main/java/org/apache/iceberg/Partitioning.java |  8 ++--
 .../java/org/apache/iceberg/TestPartitioning.java  | 30 +++++++++++++-
 .../extensions/TestAlterTablePartitionFields.java  | 46 ++++++++++++++++++++++
 6 files changed, 104 insertions(+), 6 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java 
b/api/src/main/java/org/apache/iceberg/PartitionSpec.java
index 2f9d5abe59..f059c928a9 100644
--- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java
+++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java
@@ -132,6 +132,12 @@ public class PartitionSpec implements Serializable {
           for (PartitionField field : fields) {
             Type sourceType = schema.findType(field.sourceId());
             Type resultType = field.transform().getResultType(sourceType);
+
+            // When the source field has been dropped we cannot determine the 
type
+            if (sourceType == null) {
+              resultType = Types.UnknownType.get();
+            }
+
             structFields.add(Types.NestedField.optional(field.fieldId(), 
field.name(), resultType));
           }
 
@@ -614,8 +620,12 @@ public class PartitionSpec implements Serializable {
     }
 
     public PartitionSpec build() {
+      return build(false);
+    }
+
+    public PartitionSpec build(boolean allowMissingFields) {
       PartitionSpec spec = buildUnchecked();
-      checkCompatibility(spec, schema);
+      checkCompatibility(spec, schema, allowMissingFields);
       return spec;
     }
 
@@ -625,10 +635,18 @@ public class PartitionSpec implements Serializable {
   }
 
   static void checkCompatibility(PartitionSpec spec, Schema schema) {
+    checkCompatibility(spec, schema, false);
+  }
+
+  static void checkCompatibility(PartitionSpec spec, Schema schema, boolean 
allowMissingFields) {
     final Map<Integer, Integer> parents = 
TypeUtil.indexParents(schema.asStruct());
     for (PartitionField field : spec.fields) {
       Type sourceType = schema.findType(field.sourceId());
       Transform<?, ?> transform = field.transform();
+      // In the case the underlying field is dropped, we cannot check if they 
are compatible
+      if (allowMissingFields && sourceType == null) {
+        continue;
+      }
       // In the case of a Version 1 partition-spec field gets deleted,
       // it is replaced with a void transform, see:
       // https://iceberg.apache.org/spec/#partition-transforms
diff --git a/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java 
b/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java
index cc8526f907..30b3cce35f 100644
--- a/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java
+++ b/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java
@@ -46,6 +46,10 @@ public class UnboundPartitionSpec {
     return copyToBuilder(schema).build();
   }
 
+  public PartitionSpec bind(Schema schema, boolean ignoreMissingFields) {
+    return copyToBuilder(schema).build(ignoreMissingFields);
+  }
+
   PartitionSpec bindUnchecked(Schema schema) {
     return copyToBuilder(schema).buildUnchecked();
   }
diff --git a/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java 
b/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java
index a51b03c8f0..7becf0c629 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java
@@ -68,7 +68,7 @@ public class PartitionSpecParser {
   }
 
   public static PartitionSpec fromJson(Schema schema, JsonNode json) {
-    return fromJson(json).bind(schema);
+    return fromJson(json).bind(schema, true);
   }
 
   public static UnboundPartitionSpec fromJson(JsonNode json) {
diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java 
b/core/src/main/java/org/apache/iceberg/Partitioning.java
index 832e0b59fe..c708d39f52 100644
--- a/core/src/main/java/org/apache/iceberg/Partitioning.java
+++ b/core/src/main/java/org/apache/iceberg/Partitioning.java
@@ -239,7 +239,8 @@ public class Partitioning {
    */
   public static StructType partitionType(Table table) {
     Collection<PartitionSpec> specs = table.specs().values();
-    return buildPartitionProjectionType("table partition", specs, 
allFieldIds(specs));
+    return buildPartitionProjectionType(
+        "table partition", specs, allActiveFieldIds(table.schema(), specs));
   }
 
   /**
@@ -346,10 +347,11 @@ public class Partitioning {
         || t2.equals(Transforms.alwaysNull());
   }
 
-  // collects IDs of all partition field used across specs
-  private static Set<Integer> allFieldIds(Collection<PartitionSpec> specs) {
+  // collects IDs of all partition field used across specs that are in the 
current schema
+  private static Set<Integer> allActiveFieldIds(Schema schema, 
Collection<PartitionSpec> specs) {
     return FluentIterable.from(specs)
         .transformAndConcat(PartitionSpec::fields)
+        .filter(field -> schema.findField(field.sourceId()) != null)
         .transform(PartitionField::fieldId)
         .toSet();
   }
diff --git a/core/src/test/java/org/apache/iceberg/TestPartitioning.java 
b/core/src/test/java/org/apache/iceberg/TestPartitioning.java
index da04e67bdd..eb77a693c7 100644
--- a/core/src/test/java/org/apache/iceberg/TestPartitioning.java
+++ b/core/src/test/java/org/apache/iceberg/TestPartitioning.java
@@ -172,7 +172,7 @@ public class TestPartitioning {
 
     PartitionSpec newSpec = 
PartitionSpec.builderFor(table.schema()).identity("category").build();
 
-    TableOperations ops = ((HasTableOperations) table).operations();
+    TableOperations ops = table.operations();
     TableMetadata current = ops.current();
     ops.commit(current, current.updatePartitionSpec(newSpec));
 
@@ -183,6 +183,34 @@ public class TestPartitioning {
         .hasMessageStartingWith("Conflicting partition fields");
   }
 
+  @Test
+  public void testPartitionTypeIgnoreInactiveFields() {
+    TestTables.TestTable table =
+        TestTables.create(
+            tableDir, "test", SCHEMA, BY_DATA_CATEGORY_BUCKET_SPEC, 
V2_FORMAT_VERSION);
+
+    StructType actualType = Partitioning.partitionType(table);
+    assertThat(actualType)
+        .isEqualTo(
+            StructType.of(
+                NestedField.optional(1000, "data", Types.StringType.get()),
+                NestedField.optional(1001, "category_bucket", 
Types.IntegerType.get())));
+
+    // Create a new spec, and drop the field of the old spec
+    table.updateSpec().removeField("category_bucket").commit();
+    table.updateSchema().deleteColumn("category").commit();
+
+    actualType = Partitioning.partitionType(table);
+    assertThat(actualType)
+        .isEqualTo(StructType.of(NestedField.optional(1000, "data", 
Types.StringType.get())));
+
+    table.updateSpec().removeField("data").commit();
+    table.updateSchema().deleteColumn("data").commit();
+
+    actualType = Partitioning.partitionType(table);
+    assertThat(actualType).isEqualTo(StructType.of());
+  }
+
   @Test
   public void testGroupingKeyTypeWithSpecEvolutionInV1Tables() {
     TestTables.TestTable table =
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java
index dd49d8a254..7e5f5454ff 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java
@@ -20,6 +20,9 @@ package org.apache.iceberg.spark.extensions;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
 import org.apache.iceberg.Parameter;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
@@ -27,6 +30,7 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkCatalogConfig;
 import org.apache.iceberg.spark.source.SparkTable;
 import org.apache.spark.sql.connector.catalog.CatalogManager;
@@ -583,4 +587,46 @@ public class TestAlterTablePartitionFields extends 
ExtensionsTestBase {
           tableName, schema, spec, TableProperties.FORMAT_VERSION, 
formatVersion);
     }
   }
+
+  private void runCreateAndDropPartitionField(
+      String column, String partitionType, List<Object[]> expected, String 
predicate) {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql(
+        "CREATE TABLE %s (col_int INTEGER, col_ts TIMESTAMP_NTZ, col_long 
BIGINT) USING ICEBERG TBLPROPERTIES ('format-version' = %d)",
+        tableName, formatVersion);
+    sql("INSERT INTO %s VALUES (1000, CAST('2024-03-01 19:25:00' as 
TIMESTAMP), 2100)", tableName);
+    sql("ALTER TABLE %s ADD PARTITION FIELD %s AS col2_partition", tableName, 
partitionType);
+    sql("INSERT INTO %s VALUES (2000, CAST('2024-04-01 19:25:00' as 
TIMESTAMP), 2200)", tableName);
+    sql("ALTER TABLE %s DROP PARTITION FIELD col2_partition", tableName);
+    sql("INSERT INTO %s VALUES (3000, CAST('2024-05-01 19:25:00' as 
TIMESTAMP), 2300)", tableName);
+    sql("ALTER TABLE %s DROP COLUMN %s", tableName, column);
+
+    assertEquals(
+        "Should return correct data",
+        expected,
+        sql("SELECT * FROM %s WHERE %s ORDER BY col_int", tableName, 
predicate));
+  }
+
+  @TestTemplate
+  public void testDropPartitionAndSourceColumnLong() {
+    String predicateTs = "col_long >= 2200";
+    List<Object[]> expectedTs =
+        Lists.newArrayList(new Object[] {2000, 2200L}, new Object[] {3000, 
2300L});
+    runCreateAndDropPartitionField("col_ts", "col_ts", expectedTs, 
predicateTs);
+    runCreateAndDropPartitionField("col_ts", "year(col_ts)", expectedTs, 
predicateTs);
+    runCreateAndDropPartitionField("col_ts", "month(col_ts)", expectedTs, 
predicateTs);
+    runCreateAndDropPartitionField("col_ts", "day(col_ts)", expectedTs, 
predicateTs);
+  }
+
+  @TestTemplate
+  public void testDropPartitionAndSourceColumnTimestamp() {
+    String predicate = "col_ts >= '2024-04-01 19:25:00'";
+    List<Object[]> expected =
+        Lists.newArrayList(
+            new Object[] {2000, LocalDateTime.ofEpochSecond(1711999500, 0, 
ZoneOffset.UTC)},
+            new Object[] {3000, LocalDateTime.ofEpochSecond(1714591500, 0, 
ZoneOffset.UTC)});
+    runCreateAndDropPartitionField("col_long", "col_long", expected, 
predicate);
+    runCreateAndDropPartitionField("col_long", "truncate(2, col_long)", 
expected, predicate);
+    runCreateAndDropPartitionField("col_long", "bucket(16, col_long)", 
expected, predicate);
+  }
 }

Reply via email to