This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 70f9239cde Core: Fix leak of DeleteFile streams (#8132)
70f9239cde is described below
commit 70f9239cde0ca96adef70e91c97b0e7f1d1ee086
Author: Szehon Ho <[email protected]>
AuthorDate: Mon Aug 7 10:46:35 2023 -0700
Core: Fix leak of DeleteFile streams (#8132)
---
.../java/org/apache/iceberg/deletes/Deletes.java | 64 ++++++++++---
.../apache/iceberg/deletes/TestPositionFilter.java | 105 +++++++++++++++++++++
2 files changed, 157 insertions(+), 12 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
index beec06e045..2dee8b8f58 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
@@ -39,8 +39,13 @@ import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Filter;
import org.apache.iceberg.util.SortedMerge;
import org.apache.iceberg.util.StructLikeSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class Deletes {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Deletes.class);
+
private static final Schema POSITION_DELETE_SCHEMA =
new Schema(MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS);
@@ -219,7 +224,7 @@ public class Deletes {
CloseableIterator<T> iter;
if (deletePosIterator.hasNext()) {
nextDeletePos = deletePosIterator.next();
- iter = applyDelete(rows.iterator());
+ iter = applyDelete(rows.iterator(), deletePosIterator);
} else {
iter = rows.iterator();
}
@@ -249,7 +254,8 @@ public class Deletes {
return isDeleted;
}
- protected abstract CloseableIterator<T> applyDelete(CloseableIterator<T>
items);
+ protected abstract CloseableIterator<T> applyDelete(
+ CloseableIterator<T> items, CloseableIterator<Long> deletePositions);
}
private static class PositionStreamDeleteFilter<T> extends
PositionStreamDeleteIterable<T> {
@@ -265,7 +271,8 @@ public class Deletes {
}
@Override
- protected CloseableIterator<T> applyDelete(CloseableIterator<T> items) {
+ protected CloseableIterator<T> applyDelete(
+ CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {
return new FilterIterator<T>(items) {
@Override
protected boolean shouldKeep(T item) {
@@ -276,6 +283,16 @@ public class Deletes {
return !deleted;
}
+
+ @Override
+ public void close() {
+ try {
+ deletePositions.close();
+ } catch (IOException e) {
+ LOG.warn("Error closing delete file", e);
+ }
+ super.close();
+ }
};
}
}
@@ -293,15 +310,38 @@ public class Deletes {
}
@Override
- protected CloseableIterator<T> applyDelete(CloseableIterator<T> items) {
- return CloseableIterator.transform(
- items,
- row -> {
- if (isDeleted(row)) {
- markDeleted.accept(row);
- }
- return row;
- });
+ protected CloseableIterator<T> applyDelete(
+ CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {
+
+ return new CloseableIterator<T>() {
+ @Override
+ public void close() {
+ try {
+ deletePositions.close();
+ } catch (IOException e) {
+ LOG.warn("Error closing delete file", e);
+ }
+ try {
+ items.close();
+ } catch (IOException e) {
+ LOG.warn("Error closing data file", e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return items.hasNext();
+ }
+
+ @Override
+ public T next() {
+ T row = items.next();
+ if (isDeleted(row)) {
+ markDeleted.accept(row);
+ }
+ return row;
+ }
+ };
}
}
diff --git
a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java
b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java
index 9c384184b0..16a5695104 100644
--- a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java
+++ b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java
@@ -20,12 +20,16 @@ package org.apache.iceberg.deletes;
import static org.assertj.core.api.Assertions.assertThat;
+import java.io.IOException;
+import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TestHelpers.Row;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -321,4 +325,105 @@ public class TestPositionFilter {
.as("Filter should produce expected rows")
.containsExactlyElementsOf(Lists.newArrayList(1L, 2L, 5L, 6L, 8L));
}
+
+ @Test
+ public void testClosePositionStreamRowDeleteMarker() {
+ List<Long> deletes = Lists.newArrayList(1L, 2L);
+
+ List<StructLike> records =
+ Lists.newArrayList(
+ Row.of(29, "a", 1L), Row.of(43, "b", 2L), Row.of(61, "c", 3L),
Row.of(89, "d", 4L));
+
+ CheckingClosableIterable<StructLike> data = new
CheckingClosableIterable<>(records);
+ CheckingClosableIterable<Long> deletePositions = new
CheckingClosableIterable<>(deletes);
+
+ CloseableIterable<StructLike> posDeletesIterable =
+ Deletes.streamingFilter(data, row -> row.get(2, Long.class),
deletePositions);
+
+ // end iterator is always wrapped with FilterIterator
+ CloseableIterable<StructLike> eqDeletesIterable =
+ Deletes.filterDeleted(posDeletesIterable, i -> false, new
DeleteCounter());
+ List<StructLike> result = Lists.newArrayList(eqDeletesIterable.iterator());
+
+ // as first two records deleted, expect only last two records
+ assertThat(Iterables.transform(result, row -> row.get(2, Long.class)))
+ .as("Filter should produce expected rows")
+ .containsExactlyElementsOf(Lists.newArrayList(3L, 4L));
+
+ assertThat(data.isClosed).isTrue();
+ assertThat(deletePositions.isClosed).isTrue();
+ }
+
+ @Test
+ public void testDeleteMarkerFileClosed() {
+
+ List<Long> deletes = Lists.newArrayList(1L, 2L);
+
+ List<StructLike> records =
+ Lists.newArrayList(
+ Row.of(29, "a", 1L, false),
+ Row.of(43, "b", 2L, false),
+ Row.of(61, "c", 3L, false),
+ Row.of(89, "d", 4L, false));
+
+ CheckingClosableIterable<StructLike> data = new
CheckingClosableIterable<>(records);
+ CheckingClosableIterable<Long> deletePositions = new
CheckingClosableIterable<>(deletes);
+
+ CloseableIterable<StructLike> resultIterable =
+ Deletes.streamingMarker(
+ data, row -> row.get(2, Long.class), deletePositions, row ->
row.set(3, true));
+
+ // end iterator is always wrapped with FilterIterator
+ CloseableIterable<StructLike> eqDeletesIterable =
+ Deletes.filterDeleted(resultIterable, i -> false, new DeleteCounter());
+ List<StructLike> result = Lists.newArrayList(eqDeletesIterable.iterator());
+
+ // as first two records deleted, expect only those two records marked
+ assertThat(Iterables.transform(result, row -> row.get(3, Boolean.class)))
+ .as("Filter should produce expected rows")
+ .containsExactlyElementsOf(Lists.newArrayList(true, true, false,
false));
+
+ assertThat(data.isClosed).isTrue();
+ assertThat(deletePositions.isClosed).isTrue();
+ }
+
+ private static class CheckingClosableIterable<E> implements
CloseableIterable<E> {
+ AtomicBoolean isClosed = new AtomicBoolean(false);
+ final Iterable<E> iterable;
+
+ CheckingClosableIterable(Iterable<E> iterable) {
+ this.iterable = iterable;
+ }
+
+ public boolean isClosed() {
+ return isClosed.get();
+ }
+
+ @Override
+ public void close() throws IOException {
+ isClosed.set(true);
+ }
+
+ @Override
+ public CloseableIterator<E> iterator() {
+ Iterator<E> it = iterable.iterator();
+ return new CloseableIterator<E>() {
+
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override
+ public E next() {
+ return it.next();
+ }
+
+ @Override
+ public void close() {
+ isClosed.set(true);
+ }
+ };
+ }
+ }
}