This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new e40c2d6537 Core: Fix data loss in partial variant shredding (#15087)
e40c2d6537 is described below
commit e40c2d653793c3ec27fee5be32de38f57c0dd22e
Author: yan zhang <[email protected]>
AuthorDate: Fri Jan 30 11:24:26 2026 +0800
Core: Fix data loss in partial variant shredding (#15087)
---
.../apache/iceberg/variants/ShreddedObject.java | 4 +-
.../iceberg/variants/TestShreddedObject.java | 28 ++++++++++++
.../apache/iceberg/parquet/TestVariantWriters.java | 51 ++++++++++++++++++++++
3 files changed, 81 insertions(+), 2 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java
b/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java
index 3aaefc562a..471097e855 100644
--- a/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java
+++ b/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java
@@ -154,12 +154,12 @@ public class ShreddedObject implements VariantObject {
private SerializationState(
VariantMetadata metadata,
VariantObject unshredded,
- Map<String, VariantValue> shreddedFields,
+ Map<String, VariantValue> shredded,
Set<String> removedFields) {
this.metadata = metadata;
// field ID size is the size needed to store the largest field ID in the
data
this.fieldIdSize = VariantUtil.sizeOf(metadata.dictionarySize());
- this.shreddedFields = Maps.newHashMap(shreddedFields);
+ this.shreddedFields = Maps.newHashMap(shredded);
int totalDataSize = 0;
// get the unshredded field names and values as byte buffers
diff --git
a/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java
b/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java
index 7556214d03..cea1c6922c 100644
--- a/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java
+++ b/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java
@@ -213,6 +213,28 @@ public class TestShreddedObject {
.isEqualTo(DateTimeUtil.isoDateToDays("2024-10-12"));
}
+ @Test
+ public void
testPartiallyShreddedUnserializedObjectSerializationMinimalBuffer() {
+ ShreddedObject partial = createUnserializedObject(FIELDS);
+ VariantMetadata metadata = partial.metadata();
+
+ // replace field c with a new value
+ partial.put("c", Variants.ofIsoDate("2024-10-12"));
+ partial.remove("b");
+
+ VariantValue value = roundTripMinimalBuffer(partial, metadata);
+
+ assertThat(value).isInstanceOf(SerializedObject.class);
+ SerializedObject actual = (SerializedObject) value;
+
+ assertThat(actual.get("a")).isInstanceOf(VariantPrimitive.class);
+ assertThat(actual.get("a").asPrimitive().get()).isEqualTo(34);
+ assertThat(actual.get("c")).isInstanceOf(VariantPrimitive.class);
+ assertThat(actual.get("c").type()).isEqualTo(PhysicalType.DATE);
+ assertThat(actual.get("c").asPrimitive().get())
+ .isEqualTo(DateTimeUtil.isoDateToDays("2024-10-12"));
+ }
+
@Test
public void testPartiallyShreddedObjectSerializationLargeBuffer() {
ShreddedObject partial = createUnshreddedObject(FIELDS);
@@ -404,6 +426,12 @@ public class TestShreddedObject {
return object;
}
+ private static ShreddedObject createUnserializedObject(Map<String,
VariantValue> fields) {
+ ByteBuffer metadataBuffer =
VariantTestUtil.createMetadata(fields.keySet(), false);
+ VariantMetadata metadata = SerializedMetadata.from(metadataBuffer);
+ return new ShreddedObject(metadata, createShreddedObject(metadata,
fields));
+ }
+
/** Creates a ShreddedObject with fields in its shredded map */
private static ShreddedObject createShreddedObject(Map<String, VariantValue>
fields) {
ByteBuffer metadataBuffer =
VariantTestUtil.createMetadata(fields.keySet(), false);
diff --git
a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java
b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java
index 2c83406f9f..13521ec029 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java
@@ -41,6 +41,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.variants.ShreddedObject;
import org.apache.iceberg.variants.ValueArray;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantArray;
@@ -52,6 +53,7 @@ import org.apache.iceberg.variants.Variants;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.FieldSource;
@@ -280,4 +282,53 @@ public class TestVariantWriters {
return arr;
}
+
+ @Test
+ public void testPartialShreddingWithShreddedObject() throws IOException {
+ // Test for issue #15086: partial shredding with ShreddedObject created
using put()
+ // Create a ShreddedObject with multiple fields, then partially shred it
+ VariantMetadata metadata = Variants.metadata("id", "name", "city");
+
+ // Create objects using ShreddedObject.put() instead of serialized buffers
+ List<Record> records = Lists.newArrayList();
+ for (int i = 0; i < 3; i++) {
+ ShreddedObject obj = Variants.object(metadata);
+ obj.put("id", Variants.of(1000L + i));
+ obj.put("name", Variants.of("user_" + i));
+ obj.put("city", Variants.of("city_" + i));
+
+ Variant variant = Variant.of(metadata, obj);
+ Record record = RECORD.copy("id", i, "var", variant);
+ records.add(record);
+ }
+
+ // Shredding function that only shreds the "id" field
+ VariantShreddingFunction partialShredding =
+ (id, name) -> {
+ VariantMetadata shreddedMetadata = Variants.metadata("id");
+ ShreddedObject shreddedObject = Variants.object(shreddedMetadata);
+ shreddedObject.put("id", Variants.of(1234L));
+ return ParquetVariantUtil.toParquetSchema(shreddedObject);
+ };
+
+ // Write and read back
+ List<Record> actual = writeAndRead(partialShredding, records);
+
+ // Verify all records match
+ assertThat(actual).hasSameSizeAs(records);
+ for (int i = 0; i < records.size(); i++) {
+ Record expected = records.get(i);
+ Record read = actual.get(i);
+
+ InternalTestHelpers.assertEquals(SCHEMA.asStruct(), expected, read);
+
+ // Also verify the variant object has all fields intact
+ Variant readVariant = (Variant) read.getField("var");
+ VariantObject readObj = readVariant.value().asObject();
+ assertThat(readObj.numFields()).isEqualTo(3);
+ assertThat(readObj.get("id").asPrimitive().get()).isEqualTo(1000L + i);
+ assertThat(readObj.get("name").asPrimitive().get()).isEqualTo("user_" +
i);
+ assertThat(readObj.get("city").asPrimitive().get()).isEqualTo("city_" +
i);
+ }
+ }
}