This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 6982a3289 [AMORO-2870] Rewrite the task which contain different 
eq-delete fields (#3175)
6982a3289 is described below

commit 6982a3289aabb78328dbbb4bfeceee61122256b9
Author: Xavier Bai <[email protected]>
AuthorDate: Fri Oct 18 15:21:00 2024 +0800

    [AMORO-2870] Rewrite the task which contain different eq-delete fields 
(#3175)
    
    * rewrite multiple delete ids
    
    * update comments
    
    * replace shade guava with google
    
    * replace `computeIfAbsent` with `putIfAbsent` to void projecting schema 
every time
---
 .../org/apache/amoro/io/CloseablePredicate.java    |  12 +-
 .../amoro/io/reader/CombinedDeleteFilter.java      | 184 +++++++++++------
 .../apache/amoro/io/TestIcebergCombinedReader.java | 225 +++++++++++++++++++++
 3 files changed, 350 insertions(+), 71 deletions(-)

diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/CloseablePredicate.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/CloseablePredicate.java
index 00dc4f75e..fa5e31514 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/CloseablePredicate.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/CloseablePredicate.java
@@ -20,17 +20,19 @@ package org.apache.amoro.io;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 import java.util.function.Predicate;
 
 public class CloseablePredicate<T> implements Predicate<T>, Closeable {
 
   private final Predicate<T> predicate;
 
-  private final Closeable closeable;
+  private final List<Closeable> closeable;
 
-  public CloseablePredicate(Predicate<T> predicate, Closeable closeable) {
+  public CloseablePredicate(Predicate<T> predicate, Closeable... closeable) {
     this.predicate = predicate;
-    this.closeable = closeable;
+    this.closeable = Arrays.asList(closeable);
   }
 
   @Override
@@ -40,6 +42,8 @@ public class CloseablePredicate<T> implements Predicate<T>, 
Closeable {
 
   @Override
   public void close() throws IOException {
-    closeable.close();
+    for (Closeable closeable : closeable) {
+      closeable.close();
+    }
   }
 }
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
index f81fea10b..830976ea2 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
@@ -23,9 +23,12 @@ import org.apache.amoro.io.CloseablePredicate;
 import org.apache.amoro.optimizing.RewriteFilesInput;
 import 
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
 import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
-import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableSet;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Multimap;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Multimaps;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
 import org.apache.amoro.shade.guava32.com.google.common.hash.BloomFilter;
 import org.apache.amoro.utils.ContentFiles;
 import org.apache.amoro.utils.map.StructLikeBaseMap;
@@ -51,10 +54,12 @@ import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.Filter;
+import org.apache.iceberg.util.StructProjection;
 import org.roaringbitmap.longlong.Roaring64Bitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -90,17 +95,23 @@ public abstract class CombinedDeleteFilter<T extends 
StructLike> {
 
   private final RewriteFilesInput input;
   private final List<DeleteFile> posDeletes;
-  private final List<DeleteFile> eqDeletes;
+  // There may have multiple equality delete fields within a rewrite input
+  private final Multimap<Set<Integer>, DeleteFile> eqDeleteFilesByDeleteIds =
+      Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
 
   private Map<String, Roaring64Bitmap> positionMap;
 
   private final Set<String> positionPathSets;
 
-  private Set<Integer> deleteIds = new HashSet<>();
+  // The delete ids are union of all equality delete fields
+  private final Set<Integer> deleteIds = new HashSet<>();
 
   private CloseablePredicate<StructForDelete<T>> eqPredicate;
 
-  private final Schema deleteSchema;
+  // Include all identifier fields of equality delete files
+  private final Schema requiredSchema;
+
+  private final Map<Set<Integer>, Schema> deleteSchemaByDeleteIds = new 
HashMap<>();
 
   private StructLikeCollections structLikeCollections = 
StructLikeCollections.DEFAULT;
 
@@ -115,34 +126,19 @@ public abstract class CombinedDeleteFilter<T extends 
StructLike> {
     this.dataRecordCnt =
         
Arrays.stream(rewriteFilesInput.dataFiles()).mapToLong(ContentFile::recordCount).sum();
     ImmutableList.Builder<DeleteFile> posDeleteBuilder = 
ImmutableList.builder();
-    ImmutableList.Builder<DeleteFile> eqDeleteBuilder = 
ImmutableList.builder();
     if (rewriteFilesInput.deleteFiles() != null) {
-      String firstDeleteFilePath = null;
       for (ContentFile<?> delete : rewriteFilesInput.deleteFiles()) {
         switch (delete.content()) {
           case POSITION_DELETES:
             posDeleteBuilder.add(ContentFiles.asDeleteFile(delete));
             break;
           case EQUALITY_DELETES:
-            if (deleteIds.isEmpty()) {
-              deleteIds = 
ImmutableSet.copyOf(ContentFiles.asDeleteFile(delete).equalityFieldIds());
-              firstDeleteFilePath = delete.path().toString();
-            } else {
-              Set<Integer> currentDeleteIds =
-                  
ImmutableSet.copyOf(ContentFiles.asDeleteFile(delete).equalityFieldIds());
-              if (!deleteIds.equals(currentDeleteIds)) {
-                throw new IllegalArgumentException(
-                    String.format(
-                        "Equality delete files have different delete fields, 
first equality field ids:[%s],"
-                            + " current equality field ids:[%s], first delete 
file path:[%s], "
-                            + " current delete file path: [%s].",
-                        deleteIds,
-                        currentDeleteIds,
-                        firstDeleteFilePath,
-                        delete.path().toString()));
-              }
-            }
-            eqDeleteBuilder.add(ContentFiles.asDeleteFile(delete));
+            DeleteFile deleteFile = ContentFiles.asDeleteFile(delete);
+            Set<Integer> eqFieldIds = 
Sets.newHashSet(delete.equalityFieldIds());
+            deleteIds.addAll(eqFieldIds);
+            eqDeleteFilesByDeleteIds.put(eqFieldIds, deleteFile);
+            deleteSchemaByDeleteIds.computeIfAbsent(
+                eqFieldIds, ids -> TypeUtil.select(tableSchema, ids));
             break;
           default:
             throw new UnsupportedOperationException(
@@ -155,8 +151,7 @@ public abstract class CombinedDeleteFilter<T extends 
StructLike> {
             .map(s -> s.path().toString())
             .collect(Collectors.toSet());
     this.posDeletes = posDeleteBuilder.build();
-    this.eqDeletes = eqDeleteBuilder.build();
-    this.deleteSchema = TypeUtil.select(tableSchema, deleteIds);
+    this.requiredSchema = TypeUtil.select(tableSchema, deleteIds);
 
     if (structLikeCollections != null) {
       this.structLikeCollections = structLikeCollections;
@@ -186,12 +181,20 @@ public abstract class CombinedDeleteFilter<T extends 
StructLike> {
 
   protected abstract AuthenticatedFileIO getFileIO();
 
+  /**
+   * Get all delete ids of equality delete files
+   *
+   * <p>For example, if there are two equality delete fields, one is [1, 2] 
and another is [1], the
+   * delete ids will be [1, 2].
+   *
+   * @return delete ids
+   */
   public Set<Integer> deleteIds() {
     return deleteIds;
   }
 
   public boolean hasPosition() {
-    return posDeletes != null && posDeletes.size() > 0;
+    return posDeletes != null && !posDeletes.isEmpty();
   }
 
   public void close() {
@@ -232,35 +235,85 @@ public abstract class CombinedDeleteFilter<T extends 
StructLike> {
       return eqPredicate;
     }
 
-    if (eqDeletes.isEmpty()) {
+    if (eqDeleteFilesByDeleteIds.isEmpty()) {
       return record -> false;
     }
 
-    InternalRecordWrapper internalRecordWrapper =
-        new InternalRecordWrapper(deleteSchema.asStruct());
+    List<Predicate<StructForDelete<T>>> isInDeleteSets = Lists.newArrayList();
+    List<Closeable> structMapCloseable = Lists.newArrayList();
+    BloomFilter<StructLike> bloomFilter = initializeBloomFilter();
+    for (Map.Entry<Set<Integer>, Schema> deleteSchemaEntry : 
deleteSchemaByDeleteIds.entrySet()) {
+      Predicate<StructForDelete<T>> predicate =
+          applyEqDeletesForSchema(deleteSchemaEntry, bloomFilter, 
structMapCloseable);
+      isInDeleteSets.add(predicate);
+    }
 
-    BloomFilter<StructLike> bloomFilter = null;
-    if (filterEqDelete) {
-      LOG.debug(
-          "Enable bloom-filter to filter eq-delete, (rewrite + rewrite pos) 
data count is {}",
-          dataRecordCnt);
-      // one million data is about 1.71M memory usage
-      bloomFilter = BloomFilter.create(StructLikeFunnel.INSTANCE, 
dataRecordCnt, 0.001);
-      try (CloseableIterable<Record> deletes =
-          CloseableIterable.concat(
-              CloseableIterable.transform(
-                  CloseableIterable.withNoopClose(
-                      
Arrays.stream(input.dataFiles()).collect(Collectors.toList())),
-                  s -> openFile(s, deleteSchema)))) {
-        for (Record record : deletes) {
-          StructLike identifier = internalRecordWrapper.copyFor(record);
-          bloomFilter.put(identifier);
-        }
-      } catch (IOException e) {
-        throw new RuntimeException(e);
+    Predicate<StructForDelete<T>> isInDelete =
+        isInDeleteSets.stream().reduce(Predicate::or).orElse(record -> false);
+    this.eqPredicate =
+        new CloseablePredicate<>(isInDelete, structMapCloseable.toArray(new 
Closeable[0]));
+    return isInDelete;
+  }
+
+  private BloomFilter<StructLike> initializeBloomFilter() {
+    if (!filterEqDelete) {
+      return null;
+    }
+
+    LOG.debug(
+        "Enable bloom-filter to filter eq-delete, (rewrite + rewrite pos) data 
count is {}",
+        dataRecordCnt);
+    // one million data is about 1.71M memory usage
+    BloomFilter<StructLike> bloomFilter =
+        BloomFilter.create(StructLikeFunnel.INSTANCE, dataRecordCnt, 0.001);
+
+    Map<Set<Integer>, InternalRecordWrapper> recordWrappers = 
Maps.newHashMap();
+    for (Map.Entry<Set<Integer>, Schema> deleteSchemaEntry : 
deleteSchemaByDeleteIds.entrySet()) {
+      Set<Integer> ids = deleteSchemaEntry.getKey();
+      Schema deleteSchema = deleteSchemaEntry.getValue();
+
+      InternalRecordWrapper internalRecordWrapper =
+          new InternalRecordWrapper(deleteSchema.asStruct());
+      recordWrappers.put(ids, internalRecordWrapper);
+    }
+
+    try (CloseableIterable<Record> deletes = readRecords()) {
+      for (Record record : deletes) {
+        recordWrappers.forEach(
+            (ids, internalRecordWrapper) -> {
+              Schema deleteSchema = deleteSchemaByDeleteIds.get(ids);
+              StructProjection projection =
+                  StructProjection.create(requiredSchema, 
deleteSchema).wrap(record);
+              StructLike deletePK = internalRecordWrapper.copyFor(projection);
+              bloomFilter.put(deletePK);
+            });
       }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
 
+    return bloomFilter;
+  }
+
+  private CloseableIterable<Record> readRecords() {
+    return CloseableIterable.concat(
+        CloseableIterable.transform(
+            CloseableIterable.withNoopClose(
+                Arrays.stream(input.dataFiles()).collect(Collectors.toList())),
+            s -> openFile(s, requiredSchema)));
+  }
+
+  private Predicate<StructForDelete<T>> applyEqDeletesForSchema(
+      Map.Entry<Set<Integer>, Schema> deleteSchemaEntry,
+      BloomFilter<StructLike> bloomFilter,
+      List<Closeable> structMapCloseable) {
+    Set<Integer> ids = deleteSchemaEntry.getKey();
+    Schema deleteSchema = deleteSchemaEntry.getValue();
+    Iterable<DeleteFile> eqDeletes = eqDeleteFilesByDeleteIds.get(ids);
+
+    InternalRecordWrapper internalRecordWrapper =
+        new InternalRecordWrapper(deleteSchema.asStruct());
+
     CloseableIterable<RecordWithLsn> deleteRecords =
         CloseableIterable.transform(
             CloseableIterable.concat(
@@ -294,23 +347,20 @@ public abstract class CombinedDeleteFilter<T extends 
StructLike> {
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
+    structMapCloseable.add(structLikeMap);
+
+    return structForDelete -> {
+      StructProjection deleteProjection =
+          StructProjection.create(requiredSchema, 
deleteSchema).wrap(structForDelete.getPk());
+      StructLike dataPk = internalRecordWrapper.copyFor(deleteProjection);
+      Long dataLSN = structForDelete.getLsn();
+      Long deleteLsn = structLikeMap.get(dataPk);
+      if (deleteLsn == null) {
+        return false;
+      }
 
-    Predicate<StructForDelete<T>> isInDeleteSet =
-        structForDelete -> {
-          StructLike dataPk = 
internalRecordWrapper.copyFor(structForDelete.getPk());
-          Long dataLSN = structForDelete.getLsn();
-          Long deleteLsn = structLikeMap.get(dataPk);
-          if (deleteLsn == null) {
-            return false;
-          }
-
-          return deleteLsn.compareTo(dataLSN) > 0;
-        };
-
-    CloseablePredicate<StructForDelete<T>> closeablePredicate =
-        new CloseablePredicate<>(isInDeleteSet, structLikeMap);
-    this.eqPredicate = closeablePredicate;
-    return isInDeleteSet;
+      return deleteLsn.compareTo(dataLSN) > 0;
+    };
   }
 
   private CloseableIterable<StructForDelete<T>> applyEqDeletes(
@@ -322,7 +372,7 @@ public abstract class CombinedDeleteFilter<T extends 
StructLike> {
   private CloseableIterable<StructForDelete<T>> eqDeletesBase(
       CloseableIterable<StructForDelete<T>> records, 
Predicate<StructForDelete<T>> predicate) {
     // Predicate to test whether a row should be visible to user after 
applying equality deletions.
-    if (eqDeletes.isEmpty()) {
+    if (eqDeleteFilesByDeleteIds.isEmpty()) {
       return records;
     }
 
diff --git 
a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java
 
b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java
index 3e33363c4..01eaf4180 100644
--- 
a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java
+++ 
b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java
@@ -297,4 +297,229 @@ public class TestIcebergCombinedReader extends 
TableTestBase {
     }
     dataReader.close();
   }
+
+  @Test
+  public void readDataDropAEqField() throws IOException {
+    CombinedDeleteFilter.FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT = 100L;
+    StructLike partitionData = getPartitionData();
+    OutputFileFactory outputFileFactory =
+        OutputFileFactory.builderFor(getMixedTable().asUnkeyedTable(), 0, 1)
+            .format(fileFormat)
+            .build();
+    DataFile dataFile =
+        FileHelpers.writeDataFile(
+            getMixedTable().asUnkeyedTable(),
+            
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+            partitionData,
+            Arrays.asList(
+                MixedDataTestHelpers.createRecord(1, "john", 0, 
"1970-01-01T08:00:00"),
+                MixedDataTestHelpers.createRecord(2, "lily", 1, 
"1970-01-01T08:00:00"),
+                MixedDataTestHelpers.createRecord(3, "sam", 2, 
"1970-01-01T08:00:00")));
+
+    Schema idSchema1 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, 
Sets.newHashSet(1, 2));
+    GenericRecord idRecord = GenericRecord.create(idSchema1);
+    List<Record> records = new ArrayList<>();
+    IntStream.range(2, 100).forEach(id -> records.add(idRecord.copy("id", id, 
"name", "john")));
+    DeleteFile eqDeleteFile1 =
+        FileHelpers.writeDeleteFile(
+            getMixedTable().asUnkeyedTable(),
+            
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+            partitionData,
+            records,
+            idSchema1);
+
+    // Assuming that drop an identifier field
+    Schema idSchema2 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, 
Sets.newHashSet(1));
+    GenericRecord idRecord2 = GenericRecord.create(idSchema2);
+    List<Record> records2 = new ArrayList<>();
+    IntStream.range(2, 100).forEach(id -> records2.add(idRecord2.copy("id", 
id)));
+    DeleteFile eqDeleteFile2 =
+        FileHelpers.writeDeleteFile(
+            getMixedTable().asUnkeyedTable(),
+            
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+            partitionData,
+            records2,
+            idSchema2);
+
+    RewriteFilesInput task2 =
+        new RewriteFilesInput(
+            new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 
3L)},
+            new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 
3L)},
+            new DeleteFile[] {},
+            new DeleteFile[] {
+              MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile1, 4L),
+              MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile2, 5L)
+            },
+            getMixedTable());
+
+    GenericCombinedIcebergDataReader dataReader =
+        new GenericCombinedIcebergDataReader(
+            getMixedTable().io(),
+            getMixedTable().schema(),
+            getMixedTable().spec(),
+            getMixedTable().asUnkeyedTable().encryption(),
+            null,
+            false,
+            IdentityPartitionConverters::convertConstant,
+            false,
+            null,
+            task2);
+    try (CloseableIterable<Record> readRecords = dataReader.readData()) {
+      Assert.assertEquals(1, Iterables.size(readRecords));
+    }
+
+    try (CloseableIterable<Record> readRecords = dataReader.readDeletedData()) 
{
+      Assert.assertEquals(2, Iterables.size(readRecords));
+    }
+
+    dataReader.close();
+  }
+
+  @Test
+  public void readDataReplaceAEqField() throws IOException {
+    StructLike partitionData = getPartitionData();
+    OutputFileFactory outputFileFactory =
+        OutputFileFactory.builderFor(getMixedTable().asUnkeyedTable(), 0, 1)
+            .format(fileFormat)
+            .build();
+    DataFile dataFile =
+        FileHelpers.writeDataFile(
+            getMixedTable().asUnkeyedTable(),
+            
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+            partitionData,
+            Arrays.asList(
+                MixedDataTestHelpers.createRecord(1, "john", 0, 
"1970-01-01T08:00:00"),
+                MixedDataTestHelpers.createRecord(2, "lily", 1, 
"1970-01-01T08:00:00"),
+                MixedDataTestHelpers.createRecord(3, "sam", 2, 
"1970-01-01T08:00:00")));
+
+    Schema idSchema1 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, 
Sets.newHashSet(1));
+    GenericRecord idRecord1 = GenericRecord.create(idSchema1);
+    List<Record> records1 = new ArrayList<>();
+    IntStream.range(2, 100).forEach(id -> records1.add(idRecord1.copy("id", 
id)));
+    DeleteFile eqDeleteFile1 =
+        FileHelpers.writeDeleteFile(
+            getMixedTable().asUnkeyedTable(),
+            
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+            partitionData,
+            records1,
+            idSchema1);
+
+    // Write records and identifier field is `name` instead
+    Schema idSchema2 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, 
Sets.newHashSet(2));
+    GenericRecord idRecord2 = GenericRecord.create(idSchema2);
+    List<Record> records2 = new ArrayList<>();
+    records2.add(idRecord2.copy("name", "john"));
+    DeleteFile eqDeleteFile2 =
+        FileHelpers.writeDeleteFile(
+            getMixedTable().asUnkeyedTable(),
+            
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+            partitionData,
+            records2,
+            idSchema2);
+    RewriteFilesInput task =
+        new RewriteFilesInput(
+            new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 
3L)},
+            new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 
3L)},
+            new DeleteFile[] {},
+            new DeleteFile[] {
+              MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile1, 4L),
+              MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile2, 5L)
+            },
+            getMixedTable());
+    GenericCombinedIcebergDataReader dataReader =
+        new GenericCombinedIcebergDataReader(
+            getMixedTable().io(),
+            getMixedTable().schema(),
+            getMixedTable().spec(),
+            getMixedTable().asUnkeyedTable().encryption(),
+            null,
+            false,
+            IdentityPartitionConverters::convertConstant,
+            false,
+            null,
+            task);
+    try (CloseableIterable<Record> readRecords = dataReader.readData()) {
+      Assert.assertEquals(0, Iterables.size(readRecords));
+    }
+    try (CloseableIterable<Record> readRecords = dataReader.readDeletedData()) 
{
+      Assert.assertEquals(3, Iterables.size(readRecords));
+    }
+
+    dataReader.close();
+  }
+
+  @Test
+  public void readReadAddAEqField() throws IOException {
+    StructLike partitionData = getPartitionData();
+    OutputFileFactory outputFileFactory =
+        OutputFileFactory.builderFor(getMixedTable().asUnkeyedTable(), 0, 1)
+            .format(fileFormat)
+            .build();
+    DataFile dataFile =
+        FileHelpers.writeDataFile(
+            getMixedTable().asUnkeyedTable(),
+            
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+            partitionData,
+            Arrays.asList(
+                MixedDataTestHelpers.createRecord(1, "john", 0, 
"1970-01-01T08:00:00"),
+                MixedDataTestHelpers.createRecord(2, "lily", 1, 
"1970-01-01T08:00:00"),
+                MixedDataTestHelpers.createRecord(3, "sam", 2, 
"1970-01-01T08:00:00")));
+
+    Schema idSchema1 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, 
Sets.newHashSet(1));
+    GenericRecord idRecord1 = GenericRecord.create(idSchema1);
+    List<Record> records1 = new ArrayList<>();
+    IntStream.range(2, 100).forEach(id -> records1.add(idRecord1.copy("id", 
id)));
+    DeleteFile eqDeleteFile1 =
+        FileHelpers.writeDeleteFile(
+            getMixedTable().asUnkeyedTable(),
+            
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+            partitionData,
+            records1,
+            idSchema1);
+
+    // Write delete records and add a new field `name`
+    Schema idSchema2 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, 
Sets.newHashSet(1, 2));
+    GenericRecord idRecord2 = GenericRecord.create(idSchema2);
+    List<Record> records2 = new ArrayList<>();
+    IntStream.range(1, 100).forEach(id -> records2.add(idRecord2.copy("id", 
id, "name", "john")));
+    DeleteFile eqDeleteFile2 =
+        FileHelpers.writeDeleteFile(
+            getMixedTable().asUnkeyedTable(),
+            
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+            partitionData,
+            records2,
+            idSchema2);
+    RewriteFilesInput task =
+        new RewriteFilesInput(
+            new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 
3L)},
+            new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 
3L)},
+            new DeleteFile[] {},
+            new DeleteFile[] {
+              MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile1, 4L),
+              MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile2, 5L)
+            },
+            getMixedTable());
+
+    GenericCombinedIcebergDataReader dataReader =
+        new GenericCombinedIcebergDataReader(
+            getMixedTable().io(),
+            getMixedTable().schema(),
+            getMixedTable().spec(),
+            getMixedTable().asUnkeyedTable().encryption(),
+            null,
+            false,
+            IdentityPartitionConverters::convertConstant,
+            false,
+            null,
+            task);
+
+    try (CloseableIterable<Record> readRecords = dataReader.readData()) {
+      Assert.assertEquals(0, Iterables.size(readRecords));
+    }
+    try (CloseableIterable<Record> readRecords = dataReader.readDeletedData()) 
{
+      Assert.assertEquals(3, Iterables.size(readRecords));
+    }
+
+    dataReader.close();
+  }
 }

Reply via email to