This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 6982a3289 [AMORO-2870] Rewrite the task which contain different
eq-delete fields (#3175)
6982a3289 is described below
commit 6982a3289aabb78328dbbb4bfeceee61122256b9
Author: Xavier Bai <[email protected]>
AuthorDate: Fri Oct 18 15:21:00 2024 +0800
[AMORO-2870] Rewrite the task which contain different eq-delete fields
(#3175)
* rewrite multiple delete ids
* update comments
* replace shade guava with google
* replace `computeIfAbsent` with `putIfAbsent` to void projecting schema
every time
---
.../org/apache/amoro/io/CloseablePredicate.java | 12 +-
.../amoro/io/reader/CombinedDeleteFilter.java | 184 +++++++++++------
.../apache/amoro/io/TestIcebergCombinedReader.java | 225 +++++++++++++++++++++
3 files changed, 350 insertions(+), 71 deletions(-)
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/CloseablePredicate.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/CloseablePredicate.java
index 00dc4f75e..fa5e31514 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/CloseablePredicate.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/CloseablePredicate.java
@@ -20,17 +20,19 @@ package org.apache.amoro.io;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
import java.util.function.Predicate;
public class CloseablePredicate<T> implements Predicate<T>, Closeable {
private final Predicate<T> predicate;
- private final Closeable closeable;
+ private final List<Closeable> closeable;
- public CloseablePredicate(Predicate<T> predicate, Closeable closeable) {
+ public CloseablePredicate(Predicate<T> predicate, Closeable... closeable) {
this.predicate = predicate;
- this.closeable = closeable;
+ this.closeable = Arrays.asList(closeable);
}
@Override
@@ -40,6 +42,8 @@ public class CloseablePredicate<T> implements Predicate<T>,
Closeable {
@Override
public void close() throws IOException {
- closeable.close();
+ for (Closeable closeable : closeable) {
+ closeable.close();
+ }
}
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
index f81fea10b..830976ea2 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
@@ -23,9 +23,12 @@ import org.apache.amoro.io.CloseablePredicate;
import org.apache.amoro.optimizing.RewriteFilesInput;
import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
-import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableSet;
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Multimap;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Multimaps;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.shade.guava32.com.google.common.hash.BloomFilter;
import org.apache.amoro.utils.ContentFiles;
import org.apache.amoro.utils.map.StructLikeBaseMap;
@@ -51,10 +54,12 @@ import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.Filter;
+import org.apache.iceberg.util.StructProjection;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
@@ -90,17 +95,23 @@ public abstract class CombinedDeleteFilter<T extends
StructLike> {
private final RewriteFilesInput input;
private final List<DeleteFile> posDeletes;
- private final List<DeleteFile> eqDeletes;
+ // There may have multiple equality delete fields within a rewrite input
+ private final Multimap<Set<Integer>, DeleteFile> eqDeleteFilesByDeleteIds =
+ Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
private Map<String, Roaring64Bitmap> positionMap;
private final Set<String> positionPathSets;
- private Set<Integer> deleteIds = new HashSet<>();
+ // The delete ids are union of all equality delete fields
+ private final Set<Integer> deleteIds = new HashSet<>();
private CloseablePredicate<StructForDelete<T>> eqPredicate;
- private final Schema deleteSchema;
+ // Include all identifier fields of equality delete files
+ private final Schema requiredSchema;
+
+ private final Map<Set<Integer>, Schema> deleteSchemaByDeleteIds = new
HashMap<>();
private StructLikeCollections structLikeCollections =
StructLikeCollections.DEFAULT;
@@ -115,34 +126,19 @@ public abstract class CombinedDeleteFilter<T extends
StructLike> {
this.dataRecordCnt =
Arrays.stream(rewriteFilesInput.dataFiles()).mapToLong(ContentFile::recordCount).sum();
ImmutableList.Builder<DeleteFile> posDeleteBuilder =
ImmutableList.builder();
- ImmutableList.Builder<DeleteFile> eqDeleteBuilder =
ImmutableList.builder();
if (rewriteFilesInput.deleteFiles() != null) {
- String firstDeleteFilePath = null;
for (ContentFile<?> delete : rewriteFilesInput.deleteFiles()) {
switch (delete.content()) {
case POSITION_DELETES:
posDeleteBuilder.add(ContentFiles.asDeleteFile(delete));
break;
case EQUALITY_DELETES:
- if (deleteIds.isEmpty()) {
- deleteIds =
ImmutableSet.copyOf(ContentFiles.asDeleteFile(delete).equalityFieldIds());
- firstDeleteFilePath = delete.path().toString();
- } else {
- Set<Integer> currentDeleteIds =
-
ImmutableSet.copyOf(ContentFiles.asDeleteFile(delete).equalityFieldIds());
- if (!deleteIds.equals(currentDeleteIds)) {
- throw new IllegalArgumentException(
- String.format(
- "Equality delete files have different delete fields,
first equality field ids:[%s],"
- + " current equality field ids:[%s], first delete
file path:[%s], "
- + " current delete file path: [%s].",
- deleteIds,
- currentDeleteIds,
- firstDeleteFilePath,
- delete.path().toString()));
- }
- }
- eqDeleteBuilder.add(ContentFiles.asDeleteFile(delete));
+ DeleteFile deleteFile = ContentFiles.asDeleteFile(delete);
+ Set<Integer> eqFieldIds =
Sets.newHashSet(delete.equalityFieldIds());
+ deleteIds.addAll(eqFieldIds);
+ eqDeleteFilesByDeleteIds.put(eqFieldIds, deleteFile);
+ deleteSchemaByDeleteIds.computeIfAbsent(
+ eqFieldIds, ids -> TypeUtil.select(tableSchema, ids));
break;
default:
throw new UnsupportedOperationException(
@@ -155,8 +151,7 @@ public abstract class CombinedDeleteFilter<T extends
StructLike> {
.map(s -> s.path().toString())
.collect(Collectors.toSet());
this.posDeletes = posDeleteBuilder.build();
- this.eqDeletes = eqDeleteBuilder.build();
- this.deleteSchema = TypeUtil.select(tableSchema, deleteIds);
+ this.requiredSchema = TypeUtil.select(tableSchema, deleteIds);
if (structLikeCollections != null) {
this.structLikeCollections = structLikeCollections;
@@ -186,12 +181,20 @@ public abstract class CombinedDeleteFilter<T extends
StructLike> {
protected abstract AuthenticatedFileIO getFileIO();
+ /**
+ * Get all delete ids of equality delete files
+ *
+ * <p>For example, if there are two equality delete fields, one is [1, 2]
and another is [1], the
+ * delete ids will be [1, 2].
+ *
+ * @return delete ids
+ */
public Set<Integer> deleteIds() {
return deleteIds;
}
public boolean hasPosition() {
- return posDeletes != null && posDeletes.size() > 0;
+ return posDeletes != null && !posDeletes.isEmpty();
}
public void close() {
@@ -232,35 +235,85 @@ public abstract class CombinedDeleteFilter<T extends
StructLike> {
return eqPredicate;
}
- if (eqDeletes.isEmpty()) {
+ if (eqDeleteFilesByDeleteIds.isEmpty()) {
return record -> false;
}
- InternalRecordWrapper internalRecordWrapper =
- new InternalRecordWrapper(deleteSchema.asStruct());
+ List<Predicate<StructForDelete<T>>> isInDeleteSets = Lists.newArrayList();
+ List<Closeable> structMapCloseable = Lists.newArrayList();
+ BloomFilter<StructLike> bloomFilter = initializeBloomFilter();
+ for (Map.Entry<Set<Integer>, Schema> deleteSchemaEntry :
deleteSchemaByDeleteIds.entrySet()) {
+ Predicate<StructForDelete<T>> predicate =
+ applyEqDeletesForSchema(deleteSchemaEntry, bloomFilter,
structMapCloseable);
+ isInDeleteSets.add(predicate);
+ }
- BloomFilter<StructLike> bloomFilter = null;
- if (filterEqDelete) {
- LOG.debug(
- "Enable bloom-filter to filter eq-delete, (rewrite + rewrite pos)
data count is {}",
- dataRecordCnt);
- // one million data is about 1.71M memory usage
- bloomFilter = BloomFilter.create(StructLikeFunnel.INSTANCE,
dataRecordCnt, 0.001);
- try (CloseableIterable<Record> deletes =
- CloseableIterable.concat(
- CloseableIterable.transform(
- CloseableIterable.withNoopClose(
-
Arrays.stream(input.dataFiles()).collect(Collectors.toList())),
- s -> openFile(s, deleteSchema)))) {
- for (Record record : deletes) {
- StructLike identifier = internalRecordWrapper.copyFor(record);
- bloomFilter.put(identifier);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
+ Predicate<StructForDelete<T>> isInDelete =
+ isInDeleteSets.stream().reduce(Predicate::or).orElse(record -> false);
+ this.eqPredicate =
+ new CloseablePredicate<>(isInDelete, structMapCloseable.toArray(new
Closeable[0]));
+ return isInDelete;
+ }
+
+ private BloomFilter<StructLike> initializeBloomFilter() {
+ if (!filterEqDelete) {
+ return null;
+ }
+
+ LOG.debug(
+ "Enable bloom-filter to filter eq-delete, (rewrite + rewrite pos) data
count is {}",
+ dataRecordCnt);
+ // one million data is about 1.71M memory usage
+ BloomFilter<StructLike> bloomFilter =
+ BloomFilter.create(StructLikeFunnel.INSTANCE, dataRecordCnt, 0.001);
+
+ Map<Set<Integer>, InternalRecordWrapper> recordWrappers =
Maps.newHashMap();
+ for (Map.Entry<Set<Integer>, Schema> deleteSchemaEntry :
deleteSchemaByDeleteIds.entrySet()) {
+ Set<Integer> ids = deleteSchemaEntry.getKey();
+ Schema deleteSchema = deleteSchemaEntry.getValue();
+
+ InternalRecordWrapper internalRecordWrapper =
+ new InternalRecordWrapper(deleteSchema.asStruct());
+ recordWrappers.put(ids, internalRecordWrapper);
+ }
+
+ try (CloseableIterable<Record> deletes = readRecords()) {
+ for (Record record : deletes) {
+ recordWrappers.forEach(
+ (ids, internalRecordWrapper) -> {
+ Schema deleteSchema = deleteSchemaByDeleteIds.get(ids);
+ StructProjection projection =
+ StructProjection.create(requiredSchema,
deleteSchema).wrap(record);
+ StructLike deletePK = internalRecordWrapper.copyFor(projection);
+ bloomFilter.put(deletePK);
+ });
}
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
+ return bloomFilter;
+ }
+
+ private CloseableIterable<Record> readRecords() {
+ return CloseableIterable.concat(
+ CloseableIterable.transform(
+ CloseableIterable.withNoopClose(
+ Arrays.stream(input.dataFiles()).collect(Collectors.toList())),
+ s -> openFile(s, requiredSchema)));
+ }
+
+ private Predicate<StructForDelete<T>> applyEqDeletesForSchema(
+ Map.Entry<Set<Integer>, Schema> deleteSchemaEntry,
+ BloomFilter<StructLike> bloomFilter,
+ List<Closeable> structMapCloseable) {
+ Set<Integer> ids = deleteSchemaEntry.getKey();
+ Schema deleteSchema = deleteSchemaEntry.getValue();
+ Iterable<DeleteFile> eqDeletes = eqDeleteFilesByDeleteIds.get(ids);
+
+ InternalRecordWrapper internalRecordWrapper =
+ new InternalRecordWrapper(deleteSchema.asStruct());
+
CloseableIterable<RecordWithLsn> deleteRecords =
CloseableIterable.transform(
CloseableIterable.concat(
@@ -294,23 +347,20 @@ public abstract class CombinedDeleteFilter<T extends
StructLike> {
} catch (IOException e) {
throw new RuntimeException(e);
}
+ structMapCloseable.add(structLikeMap);
+
+ return structForDelete -> {
+ StructProjection deleteProjection =
+ StructProjection.create(requiredSchema,
deleteSchema).wrap(structForDelete.getPk());
+ StructLike dataPk = internalRecordWrapper.copyFor(deleteProjection);
+ Long dataLSN = structForDelete.getLsn();
+ Long deleteLsn = structLikeMap.get(dataPk);
+ if (deleteLsn == null) {
+ return false;
+ }
- Predicate<StructForDelete<T>> isInDeleteSet =
- structForDelete -> {
- StructLike dataPk =
internalRecordWrapper.copyFor(structForDelete.getPk());
- Long dataLSN = structForDelete.getLsn();
- Long deleteLsn = structLikeMap.get(dataPk);
- if (deleteLsn == null) {
- return false;
- }
-
- return deleteLsn.compareTo(dataLSN) > 0;
- };
-
- CloseablePredicate<StructForDelete<T>> closeablePredicate =
- new CloseablePredicate<>(isInDeleteSet, structLikeMap);
- this.eqPredicate = closeablePredicate;
- return isInDeleteSet;
+ return deleteLsn.compareTo(dataLSN) > 0;
+ };
}
private CloseableIterable<StructForDelete<T>> applyEqDeletes(
@@ -322,7 +372,7 @@ public abstract class CombinedDeleteFilter<T extends
StructLike> {
private CloseableIterable<StructForDelete<T>> eqDeletesBase(
CloseableIterable<StructForDelete<T>> records,
Predicate<StructForDelete<T>> predicate) {
// Predicate to test whether a row should be visible to user after
applying equality deletions.
- if (eqDeletes.isEmpty()) {
+ if (eqDeleteFilesByDeleteIds.isEmpty()) {
return records;
}
diff --git
a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java
b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java
index 3e33363c4..01eaf4180 100644
---
a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java
+++
b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java
@@ -297,4 +297,229 @@ public class TestIcebergCombinedReader extends
TableTestBase {
}
dataReader.close();
}
+
+ @Test
+ public void readDataDropAEqField() throws IOException {
+ CombinedDeleteFilter.FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT = 100L;
+ StructLike partitionData = getPartitionData();
+ OutputFileFactory outputFileFactory =
+ OutputFileFactory.builderFor(getMixedTable().asUnkeyedTable(), 0, 1)
+ .format(fileFormat)
+ .build();
+ DataFile dataFile =
+ FileHelpers.writeDataFile(
+ getMixedTable().asUnkeyedTable(),
+
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+ partitionData,
+ Arrays.asList(
+ MixedDataTestHelpers.createRecord(1, "john", 0,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(2, "lily", 1,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(3, "sam", 2,
"1970-01-01T08:00:00")));
+
+ Schema idSchema1 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA,
Sets.newHashSet(1, 2));
+ GenericRecord idRecord = GenericRecord.create(idSchema1);
+ List<Record> records = new ArrayList<>();
+ IntStream.range(2, 100).forEach(id -> records.add(idRecord.copy("id", id,
"name", "john")));
+ DeleteFile eqDeleteFile1 =
+ FileHelpers.writeDeleteFile(
+ getMixedTable().asUnkeyedTable(),
+
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+ partitionData,
+ records,
+ idSchema1);
+
+ // Assuming that drop an identifier field
+ Schema idSchema2 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA,
Sets.newHashSet(1));
+ GenericRecord idRecord2 = GenericRecord.create(idSchema2);
+ List<Record> records2 = new ArrayList<>();
+ IntStream.range(2, 100).forEach(id -> records2.add(idRecord2.copy("id",
id)));
+ DeleteFile eqDeleteFile2 =
+ FileHelpers.writeDeleteFile(
+ getMixedTable().asUnkeyedTable(),
+
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+ partitionData,
+ records2,
+ idSchema2);
+
+ RewriteFilesInput task2 =
+ new RewriteFilesInput(
+ new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile,
3L)},
+ new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile,
3L)},
+ new DeleteFile[] {},
+ new DeleteFile[] {
+ MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile1, 4L),
+ MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile2, 5L)
+ },
+ getMixedTable());
+
+ GenericCombinedIcebergDataReader dataReader =
+ new GenericCombinedIcebergDataReader(
+ getMixedTable().io(),
+ getMixedTable().schema(),
+ getMixedTable().spec(),
+ getMixedTable().asUnkeyedTable().encryption(),
+ null,
+ false,
+ IdentityPartitionConverters::convertConstant,
+ false,
+ null,
+ task2);
+ try (CloseableIterable<Record> readRecords = dataReader.readData()) {
+ Assert.assertEquals(1, Iterables.size(readRecords));
+ }
+
+ try (CloseableIterable<Record> readRecords = dataReader.readDeletedData())
{
+ Assert.assertEquals(2, Iterables.size(readRecords));
+ }
+
+ dataReader.close();
+ }
+
+ @Test
+ public void readDataReplaceAEqField() throws IOException {
+ StructLike partitionData = getPartitionData();
+ OutputFileFactory outputFileFactory =
+ OutputFileFactory.builderFor(getMixedTable().asUnkeyedTable(), 0, 1)
+ .format(fileFormat)
+ .build();
+ DataFile dataFile =
+ FileHelpers.writeDataFile(
+ getMixedTable().asUnkeyedTable(),
+
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+ partitionData,
+ Arrays.asList(
+ MixedDataTestHelpers.createRecord(1, "john", 0,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(2, "lily", 1,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(3, "sam", 2,
"1970-01-01T08:00:00")));
+
+ Schema idSchema1 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA,
Sets.newHashSet(1));
+ GenericRecord idRecord1 = GenericRecord.create(idSchema1);
+ List<Record> records1 = new ArrayList<>();
+ IntStream.range(2, 100).forEach(id -> records1.add(idRecord1.copy("id",
id)));
+ DeleteFile eqDeleteFile1 =
+ FileHelpers.writeDeleteFile(
+ getMixedTable().asUnkeyedTable(),
+
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+ partitionData,
+ records1,
+ idSchema1);
+
+ // Write records and identifier field is `name` instead
+ Schema idSchema2 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA,
Sets.newHashSet(2));
+ GenericRecord idRecord2 = GenericRecord.create(idSchema2);
+ List<Record> records2 = new ArrayList<>();
+ records2.add(idRecord2.copy("name", "john"));
+ DeleteFile eqDeleteFile2 =
+ FileHelpers.writeDeleteFile(
+ getMixedTable().asUnkeyedTable(),
+
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+ partitionData,
+ records2,
+ idSchema2);
+ RewriteFilesInput task =
+ new RewriteFilesInput(
+ new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile,
3L)},
+ new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile,
3L)},
+ new DeleteFile[] {},
+ new DeleteFile[] {
+ MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile1, 4L),
+ MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile2, 5L)
+ },
+ getMixedTable());
+ GenericCombinedIcebergDataReader dataReader =
+ new GenericCombinedIcebergDataReader(
+ getMixedTable().io(),
+ getMixedTable().schema(),
+ getMixedTable().spec(),
+ getMixedTable().asUnkeyedTable().encryption(),
+ null,
+ false,
+ IdentityPartitionConverters::convertConstant,
+ false,
+ null,
+ task);
+ try (CloseableIterable<Record> readRecords = dataReader.readData()) {
+ Assert.assertEquals(0, Iterables.size(readRecords));
+ }
+ try (CloseableIterable<Record> readRecords = dataReader.readDeletedData())
{
+ Assert.assertEquals(3, Iterables.size(readRecords));
+ }
+
+ dataReader.close();
+ }
+
+ @Test
+ public void readReadAddAEqField() throws IOException {
+ StructLike partitionData = getPartitionData();
+ OutputFileFactory outputFileFactory =
+ OutputFileFactory.builderFor(getMixedTable().asUnkeyedTable(), 0, 1)
+ .format(fileFormat)
+ .build();
+ DataFile dataFile =
+ FileHelpers.writeDataFile(
+ getMixedTable().asUnkeyedTable(),
+
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+ partitionData,
+ Arrays.asList(
+ MixedDataTestHelpers.createRecord(1, "john", 0,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(2, "lily", 1,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(3, "sam", 2,
"1970-01-01T08:00:00")));
+
+ Schema idSchema1 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA,
Sets.newHashSet(1));
+ GenericRecord idRecord1 = GenericRecord.create(idSchema1);
+ List<Record> records1 = new ArrayList<>();
+ IntStream.range(2, 100).forEach(id -> records1.add(idRecord1.copy("id",
id)));
+ DeleteFile eqDeleteFile1 =
+ FileHelpers.writeDeleteFile(
+ getMixedTable().asUnkeyedTable(),
+
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+ partitionData,
+ records1,
+ idSchema1);
+
+ // Write delete records and add a new field `name`
+ Schema idSchema2 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA,
Sets.newHashSet(1, 2));
+ GenericRecord idRecord2 = GenericRecord.create(idSchema2);
+ List<Record> records2 = new ArrayList<>();
+ IntStream.range(1, 100).forEach(id -> records2.add(idRecord2.copy("id",
id, "name", "john")));
+ DeleteFile eqDeleteFile2 =
+ FileHelpers.writeDeleteFile(
+ getMixedTable().asUnkeyedTable(),
+
outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(),
+ partitionData,
+ records2,
+ idSchema2);
+ RewriteFilesInput task =
+ new RewriteFilesInput(
+ new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile,
3L)},
+ new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile,
3L)},
+ new DeleteFile[] {},
+ new DeleteFile[] {
+ MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile1, 4L),
+ MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile2, 5L)
+ },
+ getMixedTable());
+
+ GenericCombinedIcebergDataReader dataReader =
+ new GenericCombinedIcebergDataReader(
+ getMixedTable().io(),
+ getMixedTable().schema(),
+ getMixedTable().spec(),
+ getMixedTable().asUnkeyedTable().encryption(),
+ null,
+ false,
+ IdentityPartitionConverters::convertConstant,
+ false,
+ null,
+ task);
+
+ try (CloseableIterable<Record> readRecords = dataReader.readData()) {
+ Assert.assertEquals(0, Iterables.size(readRecords));
+ }
+ try (CloseableIterable<Record> readRecords = dataReader.readDeletedData())
{
+ Assert.assertEquals(3, Iterables.size(readRecords));
+ }
+
+ dataReader.close();
+ }
}