This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e057a10  [HUDI-2715] The BitCaskDiskMap iterator may cause memory leak 
(#3951)
e057a10 is described below

commit e057a10499729301ebe96d7cd54902b113af1811
Author: Danny Chan <[email protected]>
AuthorDate: Tue Nov 9 15:40:00 2021 +0800

    [HUDI-2715] The BitCaskDiskMap iterator may cause memory leak (#3951)
---
 .../hudi/table/action/compact/HoodieCompactor.java |  3 ++-
 .../apache/hudi/common/util/ClosableIterator.java  | 31 ++++++++++++++++++++++
 .../common/util/collection/BitCaskDiskMap.java     | 10 ++++++-
 .../common/util/collection/LazyFileIterable.java   |  7 ++---
 .../table/format/mor/MergeOnReadInputFormat.java   |  7 +----
 5 files changed, 47 insertions(+), 11 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index ad05876..419f88e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -181,6 +181,7 @@ public abstract class HoodieCompactor<T extends 
HoodieRecordPayload, I, K, O> im
         
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
         .build();
     if (!scanner.iterator().hasNext()) {
+      scanner.close();
       return new ArrayList<>();
     }
 
@@ -198,6 +199,7 @@ public abstract class HoodieCompactor<T extends 
HoodieRecordPayload, I, K, O> im
       result = compactionHandler.handleInsert(instantTime, 
operation.getPartitionPath(), operation.getFileId(),
           scanner.getRecords());
     }
+    scanner.close();
     Iterable<List<WriteStatus>> resultIterable = () -> result;
     return StreamSupport.stream(resultIterable.spliterator(), 
false).flatMap(Collection::stream).peek(s -> {
       
s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
@@ -212,7 +214,6 @@ public abstract class HoodieCompactor<T extends 
HoodieRecordPayload, I, K, O> im
       RuntimeStats runtimeStats = new RuntimeStats();
       
runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
       s.getStat().setRuntimeStats(runtimeStats);
-      scanner.close();
     }).collect(toList());
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClosableIterator.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClosableIterator.java
new file mode 100644
index 0000000..9e1d0c2
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClosableIterator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.util.Iterator;
+
+/**
+ * An iterator that give a chance to release resources.
+ *
+ * @param <R> The return type
+ */
+public interface ClosableIterator<R> extends Iterator<R>, AutoCloseable {
+  @Override
+  void close(); // override to not throw exception
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
index 5f78fa3..289901d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.util.collection;
 
 import org.apache.hudi.common.fs.SizeAwareDataOutputStream;
 import org.apache.hudi.common.util.BufferedRandomAccessFile;
+import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.SerializationUtils;
 import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.exception.HoodieException;
@@ -38,9 +39,11 @@ import java.io.InputStream;
 import java.io.RandomAccessFile;
 import java.io.Serializable;
 import java.util.AbstractMap;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
@@ -87,6 +90,8 @@ public final class BitCaskDiskMap<T extends Serializable, R 
extends Serializable
   private final ThreadLocal<BufferedRandomAccessFile> randomAccessFile = new 
ThreadLocal<>();
   private final Queue<BufferedRandomAccessFile> openedAccessFiles = new 
ConcurrentLinkedQueue<>();
 
+  private final List<ClosableIterator<R>> iterators = new ArrayList<>();
+
   public BitCaskDiskMap(String baseFilePath, boolean isCompressionEnabled) 
throws IOException {
     super(baseFilePath, ExternalSpillableMap.DiskMapType.BITCASK.name());
     this.valueMetadataMap = new ConcurrentHashMap<>();
@@ -150,7 +155,9 @@ public final class BitCaskDiskMap<T extends Serializable, R 
extends Serializable
    */
   @Override
   public Iterator<R> iterator() {
-    return new LazyFileIterable(filePath, valueMetadataMap, 
isCompressionEnabled).iterator();
+    ClosableIterator<R> iterator = new LazyFileIterable(filePath, 
valueMetadataMap, isCompressionEnabled).iterator();
+    this.iterators.add(iterator);
+    return iterator;
   }
 
   /**
@@ -275,6 +282,7 @@ public final class BitCaskDiskMap<T extends Serializable, R 
extends Serializable
         }
       }
       writeOnlyFile.delete();
+      this.iterators.forEach(ClosableIterator::close);
     } catch (Exception e) {
       // delete the file for any sort of exception
       writeOnlyFile.delete();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java
index 33d07d5..49d8144 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.util.collection;
 
 import org.apache.hudi.common.util.BufferedRandomAccessFile;
+import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.exception.HoodieException;
 
 import java.io.IOException;
@@ -53,7 +54,7 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
   }
 
   @Override
-  public Iterator<R> iterator() {
+  public ClosableIterator<R> iterator() {
     try {
       return new LazyFileIterator<>(filePath, inMemoryMetadataOfSpilledData);
     } catch (IOException io) {
@@ -64,7 +65,7 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
   /**
    * Iterator implementation for the iterable defined above.
    */
-  public class LazyFileIterator<T, R> implements Iterator<R> {
+  public class LazyFileIterator<T, R> implements ClosableIterator<R> {
 
     private final String filePath;
     private BufferedRandomAccessFile readOnlyFileHandle;
@@ -111,7 +112,7 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
       action.accept(next());
     }
 
-    private void close() {
+    public void close() {
       closeHandle();
       Runtime.getRuntime().removeShutdownHook(shutdownThread);
     }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 566d4d3..2bf5bd5 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
@@ -447,12 +448,6 @@ public class MergeOnReadInputFormat
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
-
-  private interface ClosableIterator<E> extends Iterator<E>, AutoCloseable {
-    @Override
-    void close(); // override to not throw exception
-  }
-
   private interface RecordIterator {
     boolean reachedEnd() throws IOException;
 

Reply via email to