This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-file-handle-leak in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 32b6f3aa974c3cd7f65ed1bdc3476efdd743d20a Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Sep 25 14:50:27 2023 +0800 Pipe: fix file handle leak when processing with extractor.pattern --- .../java/org/apache/iotdb/pipe/api/PipeConnector.java | 10 +++++++--- .../java/org/apache/iotdb/pipe/api/PipeProcessor.java | 10 +++++++--- .../api/event/dml/insertion/TsFileInsertionEvent.java | 2 +- .../connector/protocol/airgap/IoTDBAirGapConnector.java | 8 ++++++-- .../protocol/thrift/async/IoTDBThriftAsyncConnector.java | 8 ++++++-- .../connector/protocol/websocket/WebSocketConnector.java | 12 ++++++++---- .../event/common/tsfile/PipeTsFileInsertionEvent.java | 15 ++++++++++++++- .../event/common/tsfile/TsFileInsertionDataContainer.java | 1 + 8 files changed, 50 insertions(+), 16 deletions(-) diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java index d95034a207b..3ceb6f73f56 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java @@ -128,9 +128,13 @@ public interface PipeConnector extends PipePlugin { * @throws Exception the user can throw errors if necessary */ default void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception { - for (final TabletInsertionEvent tabletInsertionEvent : - tsFileInsertionEvent.toTabletInsertionEvents()) { - transfer(tabletInsertionEvent); + try { + for (final TabletInsertionEvent tabletInsertionEvent : + tsFileInsertionEvent.toTabletInsertionEvents()) { + transfer(tabletInsertionEvent); + } + } finally { + tsFileInsertionEvent.close(); } } diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java index b8388992a17..c5d0cfc4e7c 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java @@ -105,9 +105,13 @@ public interface PipeProcessor extends PipePlugin { */ default void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) throws Exception { - for (final TabletInsertionEvent tabletInsertionEvent : - tsFileInsertionEvent.toTabletInsertionEvents()) { - process(tabletInsertionEvent, eventCollector); + try { + for (final TabletInsertionEvent tabletInsertionEvent : + tsFileInsertionEvent.toTabletInsertionEvents()) { + process(tabletInsertionEvent, eventCollector); + } + } finally { + tsFileInsertionEvent.close(); } } diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java index 815199e6467..b03d8877fb4 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java @@ -25,7 +25,7 @@ import org.apache.iotdb.pipe.api.event.Event; * {@link TsFileInsertionEvent} is used to define the event of writing TsFile. Event data stores in * disks, which is compressed and encoded, and requires IO cost for computational processing. */ -public interface TsFileInsertionEvent extends Event { +public interface TsFileInsertionEvent extends Event, AutoCloseable { /** * The method is used to convert the TsFileInsertionEvent into several TabletInsertionEvents. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java index f6414ef7265..add1c166687 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java @@ -237,8 +237,12 @@ public class IoTDBAirGapConnector extends IoTDBConnector { } if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) { - for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { - transfer(event); + try { + for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { + transfer(event); + } + } finally { + tsFileInsertionEvent.close(); } return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java index 376d85f354b..42cdd8e7818 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java @@ -292,8 +292,12 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector { } if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) { - for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { - transfer(event); + try { + for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { + transfer(event); + } + } finally { + tsFileInsertionEvent.close(); } return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java index 37b12170582..9ee87b1df54 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java @@ -117,10 +117,14 @@ public class WebSocketConnector implements PipeConnector { tsFileInsertionEvent); return; } - for (TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { - long commitId = commitIdGenerator.incrementAndGet(); - ((EnrichedEvent) event).increaseReferenceCount(WebSocketConnector.class.getName()); - server.addEvent(new Pair<>(commitId, event)); + try { + for (TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { + long commitId = commitIdGenerator.incrementAndGet(); + ((EnrichedEvent) event).increaseReferenceCount(WebSocketConnector.class.getName()); + server.addEvent(new Pair<>(commitId, event)); + } + } finally { + tsFileInsertionEvent.close(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 83642dd7417..0049542ffac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -185,19 +185,32 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns } return dataContainer.toTabletInsertionEvents(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + close(); + final String errorMsg = String.format( "Interrupted when waiting for closing TsFile %s.", resource.getTsFilePath()); LOGGER.warn(errorMsg, e); - Thread.currentThread().interrupt(); throw new PipeException(errorMsg); } catch (IOException e) { + close(); + final String errorMsg = String.format("Read TsFile %s error.", resource.getTsFilePath()); LOGGER.warn(errorMsg, e); throw new PipeException(errorMsg); } } + /** Release the resource of data container. */ + @Override + public void close() { + if (dataContainer != null) { + dataContainer.close(); + dataContainer = null; + } + } + /////////////////////////// Object /////////////////////////// @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java index 725aab5b3a1..897e833e30d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java @@ -163,6 +163,7 @@ public class TsFileInsertionDataContainer implements AutoCloseable { public boolean hasNext() { while (tabletIterator == null || !tabletIterator.hasNext()) { if (!deviceMeasurementsMapIterator.hasNext()) { + close(); return false; }
