This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.10.x by this push:
new a966f4987f Core, Spark: Handle unknown type during deletes (#14356)
(#14548)
a966f4987f is described below
commit a966f4987f313a1dd841fc9a82ec51298c032a4f
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Mon Nov 10 13:32:34 2025 +0100
Core, Spark: Handle unknown type during deletes (#14356) (#14548)
---
.../java/org/apache/iceberg/PartitionSpec.java | 5 +
.../java/org/apache/iceberg/types/Comparators.java | 1 +
.../java/org/apache/iceberg/types/Conversions.java | 6 +
.../java/org/apache/iceberg/avro/ValueWriters.java | 7 +-
.../org/apache/iceberg/util/ManifestFileUtil.java | 6 +
.../java/org/apache/iceberg/TestPartitioning.java | 43 +++++++
.../apache/iceberg/util/TestManifestFileUtil.java | 127 +++++++++++++++++++++
.../extensions/TestAlterTablePartitionFields.java | 26 ++++-
8 files changed, 214 insertions(+), 7 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java
b/api/src/main/java/org/apache/iceberg/PartitionSpec.java
index f059c928a9..a8b29c4a9d 100644
--- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java
+++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java
@@ -184,6 +184,11 @@ public class PartitionSpec implements Serializable {
classes[i] = Object.class;
} else {
Type sourceType = schema.findType(field.sourceId());
+ if (null == sourceType) {
+ // When the source field has been dropped we cannot determine
the type
+ sourceType = Types.UnknownType.get();
+ }
+
Type result = field.transform().getResultType(sourceType);
classes[i] = result.typeId().javaClass();
}
diff --git a/api/src/main/java/org/apache/iceberg/types/Comparators.java
b/api/src/main/java/org/apache/iceberg/types/Comparators.java
index 32168d9a09..ab59c89568 100644
--- a/api/src/main/java/org/apache/iceberg/types/Comparators.java
+++ b/api/src/main/java/org/apache/iceberg/types/Comparators.java
@@ -48,6 +48,7 @@ public class Comparators {
.put(Types.StringType.get(), Comparators.charSequences())
.put(Types.UUIDType.get(), Comparator.naturalOrder())
.put(Types.BinaryType.get(), Comparators.unsignedBytes())
+ .put(Types.UnknownType.get(),
Comparator.nullsFirst(Comparator.naturalOrder()))
.buildOrThrow();
public static Comparator<StructLike> forType(Types.StructType struct) {
diff --git a/api/src/main/java/org/apache/iceberg/types/Conversions.java
b/api/src/main/java/org/apache/iceberg/types/Conversions.java
index e18c7b4362..22adcf7ca5 100644
--- a/api/src/main/java/org/apache/iceberg/types/Conversions.java
+++ b/api/src/main/java/org/apache/iceberg/types/Conversions.java
@@ -117,6 +117,9 @@ public class Conversions {
return (ByteBuffer) value;
case DECIMAL:
return ByteBuffer.wrap(((BigDecimal)
value).unscaledValue().toByteArray());
+ case UNKNOWN:
+ // underlying type not known
+ return null;
default:
throw new UnsupportedOperationException("Cannot serialize type: " +
typeId);
}
@@ -177,6 +180,9 @@ public class Conversions {
byte[] unscaledBytes = new byte[buffer.remaining()];
tmp.get(unscaledBytes);
return new BigDecimal(new BigInteger(unscaledBytes), decimal.scale());
+ case UNKNOWN:
+ // underlying type not known
+ return null;
default:
throw new UnsupportedOperationException("Cannot deserialize type: " +
type);
}
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
b/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
index 580175c5f8..b31d157302 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
@@ -144,13 +144,14 @@ public class ValueWriters {
return new StructLikeWriter(writers);
}
- private static class NullWriter implements ValueWriter<Void> {
- private static final NullWriter INSTANCE = new NullWriter();
+ private static class NullWriter implements ValueWriter<Object> {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private static final ValueWriter<Void> INSTANCE = (ValueWriter) new
NullWriter();
private NullWriter() {}
@Override
- public void write(Void ignored, Encoder encoder) throws IOException {
+ public void write(Object ignored, Encoder encoder) throws IOException {
encoder.writeNull();
}
}
diff --git a/core/src/main/java/org/apache/iceberg/util/ManifestFileUtil.java
b/core/src/main/java/org/apache/iceberg/util/ManifestFileUtil.java
index a73a00d0e6..56a385ebf2 100644
--- a/core/src/main/java/org/apache/iceberg/util/ManifestFileUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/ManifestFileUtil.java
@@ -41,9 +41,11 @@ public class ManifestFileUtil {
private final T upperBound;
private final boolean containsNull;
private final boolean containsNaN;
+ private final Type.PrimitiveType type;
@SuppressWarnings("unchecked")
FieldSummary(Type.PrimitiveType primitive,
ManifestFile.PartitionFieldSummary summary) {
+ this.type = primitive;
this.comparator = Comparators.forType(primitive);
this.javaClass = (Class<T>) primitive.typeId().javaClass();
this.lowerBound = Conversions.fromByteBuffer(primitive,
summary.lowerBound());
@@ -61,6 +63,10 @@ public class ManifestFileUtil {
return containsNaN;
}
+ if (Types.UnknownType.get().equals(type)) {
+ return true;
+ }
+
// if lower bound is null, then there are no non-null values
if (lowerBound == null) {
// the value is non-null, so it cannot match
diff --git a/core/src/test/java/org/apache/iceberg/TestPartitioning.java
b/core/src/test/java/org/apache/iceberg/TestPartitioning.java
index eb77a693c7..dc362d33c3 100644
--- a/core/src/test/java/org/apache/iceberg/TestPartitioning.java
+++ b/core/src/test/java/org/apache/iceberg/TestPartitioning.java
@@ -459,4 +459,47 @@ public class TestPartitioning {
assertThat(table.spec()).isEqualTo(spec);
}
+
+ @Test
+ public void deleteFileAfterDeletingAllPartitionFields() {
+ TestTables.TestTable table =
+ TestTables.create(tableDir, "test", SCHEMA, BY_DATA_SPEC,
V2_FORMAT_VERSION);
+
+ DataFile dataFile =
+ DataFiles.builder(BY_DATA_SPEC)
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("data=1")
+ .withRecordCount(1)
+ .build();
+
+ table.newAppend().appendFile(dataFile).commit();
+
assertThat(table.currentSnapshot().summary()).containsEntry("added-data-files",
"1");
+ table.updateSpec().removeField("data").commit();
+ table.updateSchema().deleteColumn("data").commit();
+ table.newDelete().deleteFile(dataFile).commit();
+
assertThat(table.currentSnapshot().summary()).containsEntry("deleted-data-files",
"1");
+ }
+
+ @Test
+ public void deleteFileAfterDeletingOnePartitionField() {
+ TestTables.TestTable table =
+ TestTables.create(tableDir, "test", SCHEMA, BY_CATEGORY_DATA_SPEC,
V2_FORMAT_VERSION);
+
+ // drop one out of 2 partition fields
+ DataFile dataFile =
+ DataFiles.builder(BY_CATEGORY_DATA_SPEC)
+ .withPath("/path/to/data-b.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("category=2/data=2")
+ .withRecordCount(1)
+ .build();
+
+ table.newAppend().appendFile(dataFile).commit();
+
assertThat(table.currentSnapshot().summary()).containsEntry("added-data-files",
"1");
+ table.updateSpec().removeField("data").commit();
+ table.updateSchema().deleteColumn("data").commit();
+ table.newDelete().deleteFile(dataFile).commit();
+
assertThat(table.currentSnapshot().summary()).containsEntry("deleted-data-files",
"1");
+ }
}
diff --git
a/core/src/test/java/org/apache/iceberg/util/TestManifestFileUtil.java
b/core/src/test/java/org/apache/iceberg/util/TestManifestFileUtil.java
new file mode 100644
index 0000000000..8d24160320
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/util/TestManifestFileUtil.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionData;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestManifestFileUtil {
+ private static final Schema SCHEMA =
+ new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "unknown", Types.UnknownType.get()),
+ optional(3, "floats", Types.FloatType.get()));
+
+ @TempDir private Path temp;
+
+ @Test
+ public void canContainWithUnknownTypeOnly() throws IOException {
+ PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("unknown").build();
+ PartitionData partition = new PartitionData(spec.partitionType());
+ partition.set(0, "someValue");
+ ManifestFile manifestFile = writeManifestWithDataFile(spec, partition);
+
+ assertThat(
+ ManifestFileUtil.canContainAny(
+ manifestFile,
+ ImmutableList.of(Pair.of(spec.specId(), partition)),
+ ImmutableMap.of(spec.specId(), spec)))
+ .isTrue();
+ }
+
+ @Test
+ public void canContainWithNaNValueOnly() throws IOException {
+ PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("floats").build();
+ PartitionData partition = new PartitionData(spec.partitionType());
+ partition.set(0, Float.NaN);
+ ManifestFile manifestFile = writeManifestWithDataFile(spec, partition);
+
+ assertThat(
+ ManifestFileUtil.canContainAny(
+ manifestFile,
+ ImmutableList.of(Pair.of(spec.specId(), partition)),
+ ImmutableMap.of(spec.specId(), spec)))
+ .isTrue();
+ }
+
+ @Test
+ public void canContainWithNullValueOnly() throws IOException {
+ PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("floats").build();
+ PartitionData partition = new PartitionData(spec.partitionType());
+ partition.set(0, null);
+ ManifestFile manifestFile = writeManifestWithDataFile(spec, partition);
+
+ assertThat(
+ ManifestFileUtil.canContainAny(
+ manifestFile,
+ ImmutableList.of(Pair.of(spec.specId(), partition)),
+ ImmutableMap.of(spec.specId(), spec)))
+ .isTrue();
+ }
+
+ @Test
+ public void canContainWithUnknownType() throws IOException {
+ PartitionSpec spec =
+
PartitionSpec.builderFor(SCHEMA).identity("floats").identity("unknown").build();
+ PartitionData partition = new PartitionData(spec.partitionType());
+ partition.set(0, 1.0f);
+ partition.set(1, "someValue");
+ ManifestFile manifestFile = writeManifestWithDataFile(spec, partition);
+
+ assertThat(
+ ManifestFileUtil.canContainAny(
+ manifestFile,
+ ImmutableList.of(Pair.of(spec.specId(), partition)),
+ ImmutableMap.of(spec.specId(), spec)))
+ .isTrue();
+ }
+
+ private ManifestFile writeManifestWithDataFile(PartitionSpec spec,
PartitionData partition)
+ throws IOException {
+ ManifestWriter<DataFile> writer = ManifestFiles.write(spec,
Files.localOutput(temp.toFile()));
+ try (writer) {
+ writer.add(
+ DataFiles.builder(spec)
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(10)
+ .withPartition(partition)
+ .withRecordCount(10)
+ .build());
+ }
+
+ return writer.toManifestFile();
+ }
+}
diff --git
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java
index d3d0d9b910..296564e20d 100644
---
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java
+++
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java
@@ -601,10 +601,8 @@ public class TestAlterTablePartitionFields extends
ExtensionsTestBase {
sql("INSERT INTO %s VALUES (3000, CAST('2024-05-01 19:25:00' as
TIMESTAMP), 2300)", tableName);
sql("ALTER TABLE %s DROP COLUMN %s", tableName, column);
- assertEquals(
- "Should return correct data",
- expected,
- sql("SELECT * FROM %s WHERE %s ORDER BY col_int", tableName,
predicate));
+ assertThat(sql("SELECT * FROM %s WHERE %s ORDER BY col_int", tableName,
predicate))
+ .containsExactlyElementsOf(expected);
}
@TestTemplate
@@ -629,4 +627,24 @@ public class TestAlterTablePartitionFields extends
ExtensionsTestBase {
runCreateAndDropPartitionField("col_long", "truncate(2, col_long)",
expected, predicate);
runCreateAndDropPartitionField("col_long", "bucket(16, col_long)",
expected, predicate);
}
+
+ @TestTemplate
+ public void deleteAfterDroppingPartitionAndSourceColumn() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ sql(
+ "CREATE TABLE %s (id INTEGER, data STRING) USING ICEBERG TBLPROPERTIES
('format-version' = %d)",
+ tableName, formatVersion);
+ sql("INSERT INTO %s VALUES (1, 'data1')", tableName);
+ sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+ sql("INSERT INTO %s VALUES (2, 'data2')", tableName);
+ sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
+ sql("INSERT INTO %s VALUES (3, 'data3')", tableName);
+ sql("ALTER TABLE %s DROP COLUMN data", tableName);
+
+ assertThat(sql("SELECT * FROM %s WHERE id >= 1 ORDER BY id", tableName))
+ .containsExactly(row(1), row(2), row(3));
+
+ sql("DELETE FROM %s WHERE id >= 1", tableName);
+ assertThat(sql("SELECT * FROM %s WHERE id >= 1", tableName)).isEmpty();
+ }
}