This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-pipe-file-handle-leak
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 32b6f3aa974c3cd7f65ed1bdc3476efdd743d20a
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Sep 25 14:50:27 2023 +0800

    Pipe: fix file handle leak when processing with extractor.pattern
---
 .../java/org/apache/iotdb/pipe/api/PipeConnector.java     | 10 +++++++---
 .../java/org/apache/iotdb/pipe/api/PipeProcessor.java     | 10 +++++++---
 .../api/event/dml/insertion/TsFileInsertionEvent.java     |  2 +-
 .../connector/protocol/airgap/IoTDBAirGapConnector.java   |  8 ++++++--
 .../protocol/thrift/async/IoTDBThriftAsyncConnector.java  |  8 ++++++--
 .../connector/protocol/websocket/WebSocketConnector.java  | 12 ++++++++----
 .../event/common/tsfile/PipeTsFileInsertionEvent.java     | 15 ++++++++++++++-
 .../event/common/tsfile/TsFileInsertionDataContainer.java |  1 +
 8 files changed, 50 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
index d95034a207b..3ceb6f73f56 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
@@ -128,9 +128,13 @@ public interface PipeConnector extends PipePlugin {
    * @throws Exception the user can throw errors if necessary
    */
   default void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
-    for (final TabletInsertionEvent tabletInsertionEvent :
-        tsFileInsertionEvent.toTabletInsertionEvents()) {
-      transfer(tabletInsertionEvent);
+    try {
+      for (final TabletInsertionEvent tabletInsertionEvent :
+          tsFileInsertionEvent.toTabletInsertionEvents()) {
+        transfer(tabletInsertionEvent);
+      }
+    } finally {
+      tsFileInsertionEvent.close();
     }
   }
 
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
index b8388992a17..c5d0cfc4e7c 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
@@ -105,9 +105,13 @@ public interface PipeProcessor extends PipePlugin {
    */
   default void process(TsFileInsertionEvent tsFileInsertionEvent, 
EventCollector eventCollector)
       throws Exception {
-    for (final TabletInsertionEvent tabletInsertionEvent :
-        tsFileInsertionEvent.toTabletInsertionEvents()) {
-      process(tabletInsertionEvent, eventCollector);
+    try {
+      for (final TabletInsertionEvent tabletInsertionEvent :
+          tsFileInsertionEvent.toTabletInsertionEvents()) {
+        process(tabletInsertionEvent, eventCollector);
+      }
+    } finally {
+      tsFileInsertionEvent.close();
     }
   }
 
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
index 815199e6467..b03d8877fb4 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.pipe.api.event.Event;
  * {@link TsFileInsertionEvent} is used to define the event of writing TsFile. 
Event data stores in
  * disks, which is compressed and encoded, and requires IO cost for 
computational processing.
  */
-public interface TsFileInsertionEvent extends Event {
+public interface TsFileInsertionEvent extends Event, AutoCloseable {
 
   /**
    * The method is used to convert the TsFileInsertionEvent into several 
TabletInsertionEvents.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index f6414ef7265..add1c166687 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -237,8 +237,12 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
     }
 
     if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) {
-      for (final TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
-        transfer(event);
+      try {
+        for (final TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
+          transfer(event);
+        }
+      } finally {
+        tsFileInsertionEvent.close();
       }
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 376d85f354b..42cdd8e7818 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -292,8 +292,12 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
     }
 
     if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) {
-      for (final TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
-        transfer(event);
+      try {
+        for (final TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
+          transfer(event);
+        }
+      } finally {
+        tsFileInsertionEvent.close();
       }
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 37b12170582..9ee87b1df54 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -117,10 +117,14 @@ public class WebSocketConnector implements PipeConnector {
           tsFileInsertionEvent);
       return;
     }
-    for (TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
-      long commitId = commitIdGenerator.incrementAndGet();
-      ((EnrichedEvent) 
event).increaseReferenceCount(WebSocketConnector.class.getName());
-      server.addEvent(new Pair<>(commitId, event));
+    try {
+      for (TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
+        long commitId = commitIdGenerator.incrementAndGet();
+        ((EnrichedEvent) 
event).increaseReferenceCount(WebSocketConnector.class.getName());
+        server.addEvent(new Pair<>(commitId, event));
+      }
+    } finally {
+      tsFileInsertionEvent.close();
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 83642dd7417..0049542ffac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -185,19 +185,32 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
       }
       return dataContainer.toTabletInsertionEvents();
     } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      close();
+
       final String errorMsg =
           String.format(
               "Interrupted when waiting for closing TsFile %s.", 
resource.getTsFilePath());
       LOGGER.warn(errorMsg, e);
-      Thread.currentThread().interrupt();
       throw new PipeException(errorMsg);
     } catch (IOException e) {
+      close();
+
       final String errorMsg = String.format("Read TsFile %s error.", 
resource.getTsFilePath());
       LOGGER.warn(errorMsg, e);
       throw new PipeException(errorMsg);
     }
   }
 
+  /** Release the resource of data container. */
+  @Override
+  public void close() {
+    if (dataContainer != null) {
+      dataContainer.close();
+      dataContainer = null;
+    }
+  }
+
   /////////////////////////// Object ///////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 725aab5b3a1..897e833e30d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -163,6 +163,7 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
           public boolean hasNext() {
             while (tabletIterator == null || !tabletIterator.hasNext()) {
               if (!deviceMeasurementsMapIterator.hasNext()) {
+                close();
                 return false;
               }
 

Reply via email to