This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 70f9239cde Core: Fix leak of DeleteFile streams (#8132)
70f9239cde is described below

commit 70f9239cde0ca96adef70e91c97b0e7f1d1ee086
Author: Szehon Ho <[email protected]>
AuthorDate: Mon Aug 7 10:46:35 2023 -0700

    Core: Fix leak of DeleteFile streams (#8132)
---
 .../java/org/apache/iceberg/deletes/Deletes.java   |  64 ++++++++++---
 .../apache/iceberg/deletes/TestPositionFilter.java | 105 +++++++++++++++++++++
 2 files changed, 157 insertions(+), 12 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java 
b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
index beec06e045..2dee8b8f58 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
@@ -39,8 +39,13 @@ import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.Filter;
 import org.apache.iceberg.util.SortedMerge;
 import org.apache.iceberg.util.StructLikeSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class Deletes {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Deletes.class);
+
   private static final Schema POSITION_DELETE_SCHEMA =
       new Schema(MetadataColumns.DELETE_FILE_PATH, 
MetadataColumns.DELETE_FILE_POS);
 
@@ -219,7 +224,7 @@ public class Deletes {
       CloseableIterator<T> iter;
       if (deletePosIterator.hasNext()) {
         nextDeletePos = deletePosIterator.next();
-        iter = applyDelete(rows.iterator());
+        iter = applyDelete(rows.iterator(), deletePosIterator);
       } else {
         iter = rows.iterator();
       }
@@ -249,7 +254,8 @@ public class Deletes {
       return isDeleted;
     }
 
-    protected abstract CloseableIterator<T> applyDelete(CloseableIterator<T> 
items);
+    protected abstract CloseableIterator<T> applyDelete(
+        CloseableIterator<T> items, CloseableIterator<Long> deletePositions);
   }
 
   private static class PositionStreamDeleteFilter<T> extends 
PositionStreamDeleteIterable<T> {
@@ -265,7 +271,8 @@ public class Deletes {
     }
 
     @Override
-    protected CloseableIterator<T> applyDelete(CloseableIterator<T> items) {
+    protected CloseableIterator<T> applyDelete(
+        CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {
       return new FilterIterator<T>(items) {
         @Override
         protected boolean shouldKeep(T item) {
@@ -276,6 +283,16 @@ public class Deletes {
 
           return !deleted;
         }
+
+        @Override
+        public void close() {
+          try {
+            deletePositions.close();
+          } catch (IOException e) {
+            LOG.warn("Error closing delete file", e);
+          }
+          super.close();
+        }
       };
     }
   }
@@ -293,15 +310,38 @@ public class Deletes {
     }
 
     @Override
-    protected CloseableIterator<T> applyDelete(CloseableIterator<T> items) {
-      return CloseableIterator.transform(
-          items,
-          row -> {
-            if (isDeleted(row)) {
-              markDeleted.accept(row);
-            }
-            return row;
-          });
+    protected CloseableIterator<T> applyDelete(
+        CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {
+
+      return new CloseableIterator<T>() {
+        @Override
+        public void close() {
+          try {
+            deletePositions.close();
+          } catch (IOException e) {
+            LOG.warn("Error closing delete file", e);
+          }
+          try {
+            items.close();
+          } catch (IOException e) {
+            LOG.warn("Error closing data file", e);
+          }
+        }
+
+        @Override
+        public boolean hasNext() {
+          return items.hasNext();
+        }
+
+        @Override
+        public T next() {
+          T row = items.next();
+          if (isDeleted(row)) {
+            markDeleted.accept(row);
+          }
+          return row;
+        }
+      };
     }
   }
 
diff --git 
a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java 
b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java
index 9c384184b0..16a5695104 100644
--- a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java
+++ b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java
@@ -20,12 +20,16 @@ package org.apache.iceberg.deletes;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
 import org.apache.avro.util.Utf8;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.TestHelpers.Row;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -321,4 +325,105 @@ public class TestPositionFilter {
         .as("Filter should produce expected rows")
         .containsExactlyElementsOf(Lists.newArrayList(1L, 2L, 5L, 6L, 8L));
   }
+
+  @Test
+  public void testClosePositionStreamRowDeleteMarker() {
+    List<Long> deletes = Lists.newArrayList(1L, 2L);
+
+    List<StructLike> records =
+        Lists.newArrayList(
+            Row.of(29, "a", 1L), Row.of(43, "b", 2L), Row.of(61, "c", 3L), 
Row.of(89, "d", 4L));
+
+    CheckingClosableIterable<StructLike> data = new 
CheckingClosableIterable<>(records);
+    CheckingClosableIterable<Long> deletePositions = new 
CheckingClosableIterable<>(deletes);
+
+    CloseableIterable<StructLike> posDeletesIterable =
+        Deletes.streamingFilter(data, row -> row.get(2, Long.class), 
deletePositions);
+
+    // end iterator is always wrapped with FilterIterator
+    CloseableIterable<StructLike> eqDeletesIterable =
+        Deletes.filterDeleted(posDeletesIterable, i -> false, new 
DeleteCounter());
+    List<StructLike> result = Lists.newArrayList(eqDeletesIterable.iterator());
+
+    // as first two records deleted, expect only last two records
+    assertThat(Iterables.transform(result, row -> row.get(2, Long.class)))
+        .as("Filter should produce expected rows")
+        .containsExactlyElementsOf(Lists.newArrayList(3L, 4L));
+
+    assertThat(data.isClosed).isTrue();
+    assertThat(deletePositions.isClosed).isTrue();
+  }
+
+  @Test
+  public void testDeleteMarkerFileClosed() {
+
+    List<Long> deletes = Lists.newArrayList(1L, 2L);
+
+    List<StructLike> records =
+        Lists.newArrayList(
+            Row.of(29, "a", 1L, false),
+            Row.of(43, "b", 2L, false),
+            Row.of(61, "c", 3L, false),
+            Row.of(89, "d", 4L, false));
+
+    CheckingClosableIterable<StructLike> data = new 
CheckingClosableIterable<>(records);
+    CheckingClosableIterable<Long> deletePositions = new 
CheckingClosableIterable<>(deletes);
+
+    CloseableIterable<StructLike> resultIterable =
+        Deletes.streamingMarker(
+            data, row -> row.get(2, Long.class), deletePositions, row -> 
row.set(3, true));
+
+    // end iterator is always wrapped with FilterIterator
+    CloseableIterable<StructLike> eqDeletesIterable =
+        Deletes.filterDeleted(resultIterable, i -> false, new DeleteCounter());
+    List<StructLike> result = Lists.newArrayList(eqDeletesIterable.iterator());
+
+    // as first two records deleted, expect only those two records marked
+    assertThat(Iterables.transform(result, row -> row.get(3, Boolean.class)))
+        .as("Filter should produce expected rows")
+        .containsExactlyElementsOf(Lists.newArrayList(true, true, false, 
false));
+
+    assertThat(data.isClosed).isTrue();
+    assertThat(deletePositions.isClosed).isTrue();
+  }
+
+  private static class CheckingClosableIterable<E> implements 
CloseableIterable<E> {
+    AtomicBoolean isClosed = new AtomicBoolean(false);
+    final Iterable<E> iterable;
+
+    CheckingClosableIterable(Iterable<E> iterable) {
+      this.iterable = iterable;
+    }
+
+    public boolean isClosed() {
+      return isClosed.get();
+    }
+
+    @Override
+    public void close() throws IOException {
+      isClosed.set(true);
+    }
+
+    @Override
+    public CloseableIterator<E> iterator() {
+      Iterator<E> it = iterable.iterator();
+      return new CloseableIterator<E>() {
+
+        @Override
+        public boolean hasNext() {
+          return it.hasNext();
+        }
+
+        @Override
+        public E next() {
+          return it.next();
+        }
+
+        @Override
+        public void close() {
+          isClosed.set(true);
+        }
+      };
+    }
+  }
 }

Reply via email to