This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 779faef77c2 Pipe: fix file handle leak when processing with
extractor.pattern (#11209) (#11213)
779faef77c2 is described below
commit 779faef77c2e9cc11a3f608c5a96ac86a9f440f1
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Sep 25 17:31:08 2023 +0800
Pipe: fix file handle leak when processing with extractor.pattern (#11209)
(#11213)
(cherry picked from commit 3f8c344b3a837a01fbbf3e46b99da0c4dcd2a303)
---
.../java/org/apache/iotdb/pipe/api/PipeConnector.java | 10 +++++++---
.../java/org/apache/iotdb/pipe/api/PipeProcessor.java | 10 +++++++---
.../api/event/dml/insertion/TsFileInsertionEvent.java | 2 +-
.../connector/protocol/airgap/IoTDBAirGapConnector.java | 8 ++++++--
.../protocol/thrift/async/IoTDBThriftAsyncConnector.java | 8 ++++++--
.../connector/protocol/websocket/WebSocketConnector.java | 12 ++++++++----
.../event/common/tsfile/PipeTsFileInsertionEvent.java | 15 ++++++++++++++-
.../event/common/tsfile/TsFileInsertionDataContainer.java | 1 +
8 files changed, 50 insertions(+), 16 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
index d95034a207b..3ceb6f73f56 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
@@ -128,9 +128,13 @@ public interface PipeConnector extends PipePlugin {
* @throws Exception the user can throw errors if necessary
*/
default void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
- for (final TabletInsertionEvent tabletInsertionEvent :
- tsFileInsertionEvent.toTabletInsertionEvents()) {
- transfer(tabletInsertionEvent);
+ try {
+ for (final TabletInsertionEvent tabletInsertionEvent :
+ tsFileInsertionEvent.toTabletInsertionEvents()) {
+ transfer(tabletInsertionEvent);
+ }
+ } finally {
+ tsFileInsertionEvent.close();
}
}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
index b8388992a17..c5d0cfc4e7c 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
@@ -105,9 +105,13 @@ public interface PipeProcessor extends PipePlugin {
*/
default void process(TsFileInsertionEvent tsFileInsertionEvent,
EventCollector eventCollector)
throws Exception {
- for (final TabletInsertionEvent tabletInsertionEvent :
- tsFileInsertionEvent.toTabletInsertionEvents()) {
- process(tabletInsertionEvent, eventCollector);
+ try {
+ for (final TabletInsertionEvent tabletInsertionEvent :
+ tsFileInsertionEvent.toTabletInsertionEvents()) {
+ process(tabletInsertionEvent, eventCollector);
+ }
+ } finally {
+ tsFileInsertionEvent.close();
}
}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
index 815199e6467..b03d8877fb4 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.pipe.api.event.Event;
* {@link TsFileInsertionEvent} is used to define the event of writing TsFile.
Event data stores in
* disks, which is compressed and encoded, and requires IO cost for
computational processing.
*/
-public interface TsFileInsertionEvent extends Event {
+public interface TsFileInsertionEvent extends Event, AutoCloseable {
/**
* The method is used to convert the TsFileInsertionEvent into several
TabletInsertionEvents.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index f6414ef7265..add1c166687 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -237,8 +237,12 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
}
if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) {
- for (final TabletInsertionEvent event :
tsFileInsertionEvent.toTabletInsertionEvents()) {
- transfer(event);
+ try {
+ for (final TabletInsertionEvent event :
tsFileInsertionEvent.toTabletInsertionEvents()) {
+ transfer(event);
+ }
+ } finally {
+ tsFileInsertionEvent.close();
}
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 376d85f354b..42cdd8e7818 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -292,8 +292,12 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
}
if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) {
- for (final TabletInsertionEvent event :
tsFileInsertionEvent.toTabletInsertionEvents()) {
- transfer(event);
+ try {
+ for (final TabletInsertionEvent event :
tsFileInsertionEvent.toTabletInsertionEvents()) {
+ transfer(event);
+ }
+ } finally {
+ tsFileInsertionEvent.close();
}
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 37b12170582..9ee87b1df54 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -117,10 +117,14 @@ public class WebSocketConnector implements PipeConnector {
tsFileInsertionEvent);
return;
}
- for (TabletInsertionEvent event :
tsFileInsertionEvent.toTabletInsertionEvents()) {
- long commitId = commitIdGenerator.incrementAndGet();
- ((EnrichedEvent)
event).increaseReferenceCount(WebSocketConnector.class.getName());
- server.addEvent(new Pair<>(commitId, event));
+ try {
+ for (TabletInsertionEvent event :
tsFileInsertionEvent.toTabletInsertionEvents()) {
+ long commitId = commitIdGenerator.incrementAndGet();
+ ((EnrichedEvent)
event).increaseReferenceCount(WebSocketConnector.class.getName());
+ server.addEvent(new Pair<>(commitId, event));
+ }
+ } finally {
+ tsFileInsertionEvent.close();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 83642dd7417..0049542ffac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -185,19 +185,32 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
}
return dataContainer.toTabletInsertionEvents();
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ close();
+
final String errorMsg =
String.format(
"Interrupted when waiting for closing TsFile %s.",
resource.getTsFilePath());
LOGGER.warn(errorMsg, e);
- Thread.currentThread().interrupt();
throw new PipeException(errorMsg);
} catch (IOException e) {
+ close();
+
final String errorMsg = String.format("Read TsFile %s error.",
resource.getTsFilePath());
LOGGER.warn(errorMsg, e);
throw new PipeException(errorMsg);
}
}
+ /** Release the resource of data container. */
+ @Override
+ public void close() {
+ if (dataContainer != null) {
+ dataContainer.close();
+ dataContainer = null;
+ }
+ }
+
/////////////////////////// Object ///////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 6c118880e77..6ef981b60f3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -163,6 +163,7 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
public boolean hasNext() {
while (tabletIterator == null || !tabletIterator.hasNext()) {
if (!deviceMeasurementsMapIterator.hasNext()) {
+ close();
return false;
}