This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rc/1.2.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b0e0a1d4d3a3fc2378fd8237a286f99bf8d864a8
Author: HTHou <[email protected]>
AuthorDate: Tue Sep 26 19:26:16 2023 +0800

    Revert "Pipe: fix file handle leak when processing with extractor.pattern 
(#11209) (#11213)"
    
    This reverts commit 779faef77c2e9cc11a3f608c5a96ac86a9f440f1.
---
 .../java/org/apache/iotdb/pipe/api/PipeConnector.java     | 10 +++-------
 .../java/org/apache/iotdb/pipe/api/PipeProcessor.java     | 10 +++-------
 .../api/event/dml/insertion/TsFileInsertionEvent.java     |  2 +-
 .../connector/protocol/airgap/IoTDBAirGapConnector.java   |  8 ++------
 .../protocol/thrift/async/IoTDBThriftAsyncConnector.java  |  8 ++------
 .../connector/protocol/websocket/WebSocketConnector.java  | 12 ++++--------
 .../event/common/tsfile/PipeTsFileInsertionEvent.java     | 15 +--------------
 .../event/common/tsfile/TsFileInsertionDataContainer.java |  1 -
 8 files changed, 16 insertions(+), 50 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
index 3ceb6f73f56..d95034a207b 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
@@ -128,13 +128,9 @@ public interface PipeConnector extends PipePlugin {
    * @throws Exception the user can throw errors if necessary
    */
   default void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
-    try {
-      for (final TabletInsertionEvent tabletInsertionEvent :
-          tsFileInsertionEvent.toTabletInsertionEvents()) {
-        transfer(tabletInsertionEvent);
-      }
-    } finally {
-      tsFileInsertionEvent.close();
+    for (final TabletInsertionEvent tabletInsertionEvent :
+        tsFileInsertionEvent.toTabletInsertionEvents()) {
+      transfer(tabletInsertionEvent);
     }
   }
 
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
index c5d0cfc4e7c..b8388992a17 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
@@ -105,13 +105,9 @@ public interface PipeProcessor extends PipePlugin {
    */
   default void process(TsFileInsertionEvent tsFileInsertionEvent, 
EventCollector eventCollector)
       throws Exception {
-    try {
-      for (final TabletInsertionEvent tabletInsertionEvent :
-          tsFileInsertionEvent.toTabletInsertionEvents()) {
-        process(tabletInsertionEvent, eventCollector);
-      }
-    } finally {
-      tsFileInsertionEvent.close();
+    for (final TabletInsertionEvent tabletInsertionEvent :
+        tsFileInsertionEvent.toTabletInsertionEvents()) {
+      process(tabletInsertionEvent, eventCollector);
     }
   }
 
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
index b03d8877fb4..815199e6467 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.pipe.api.event.Event;
  * {@link TsFileInsertionEvent} is used to define the event of writing TsFile. 
Event data stores in
  * disks, which is compressed and encoded, and requires IO cost for 
computational processing.
  */
-public interface TsFileInsertionEvent extends Event, AutoCloseable {
+public interface TsFileInsertionEvent extends Event {
 
   /**
    * The method is used to convert the TsFileInsertionEvent into several 
TabletInsertionEvents.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index add1c166687..f6414ef7265 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -237,12 +237,8 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
     }
 
     if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) {
-      try {
-        for (final TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
-          transfer(event);
-        }
-      } finally {
-        tsFileInsertionEvent.close();
+      for (final TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
+        transfer(event);
       }
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 42cdd8e7818..376d85f354b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -292,12 +292,8 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
     }
 
     if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) {
-      try {
-        for (final TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
-          transfer(event);
-        }
-      } finally {
-        tsFileInsertionEvent.close();
+      for (final TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
+        transfer(event);
       }
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 9ee87b1df54..37b12170582 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -117,14 +117,10 @@ public class WebSocketConnector implements PipeConnector {
           tsFileInsertionEvent);
       return;
     }
-    try {
-      for (TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
-        long commitId = commitIdGenerator.incrementAndGet();
-        ((EnrichedEvent) 
event).increaseReferenceCount(WebSocketConnector.class.getName());
-        server.addEvent(new Pair<>(commitId, event));
-      }
-    } finally {
-      tsFileInsertionEvent.close();
+    for (TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
+      long commitId = commitIdGenerator.incrementAndGet();
+      ((EnrichedEvent) 
event).increaseReferenceCount(WebSocketConnector.class.getName());
+      server.addEvent(new Pair<>(commitId, event));
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 0049542ffac..83642dd7417 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -185,32 +185,19 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
       }
       return dataContainer.toTabletInsertionEvents();
     } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      close();
-
       final String errorMsg =
           String.format(
               "Interrupted when waiting for closing TsFile %s.", 
resource.getTsFilePath());
       LOGGER.warn(errorMsg, e);
+      Thread.currentThread().interrupt();
       throw new PipeException(errorMsg);
     } catch (IOException e) {
-      close();
-
       final String errorMsg = String.format("Read TsFile %s error.", 
resource.getTsFilePath());
       LOGGER.warn(errorMsg, e);
       throw new PipeException(errorMsg);
     }
   }
 
-  /** Release the resource of data container. */
-  @Override
-  public void close() {
-    if (dataContainer != null) {
-      dataContainer.close();
-      dataContainer = null;
-    }
-  }
-
   /////////////////////////// Object ///////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 6ef981b60f3..6c118880e77 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -163,7 +163,6 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
           public boolean hasNext() {
             while (tabletIterator == null || !tabletIterator.hasNext()) {
               if (!deviceMeasurementsMapIterator.hasNext()) {
-                close();
                 return false;
               }
 

Reply via email to