This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch rc/1.2.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b0e0a1d4d3a3fc2378fd8237a286f99bf8d864a8 Author: HTHou <[email protected]> AuthorDate: Tue Sep 26 19:26:16 2023 +0800 Revert "Pipe: fix file handle leak when processing with extractor.pattern (#11209) (#11213)" This reverts commit 779faef77c2e9cc11a3f608c5a96ac86a9f440f1. --- .../java/org/apache/iotdb/pipe/api/PipeConnector.java | 10 +++------- .../java/org/apache/iotdb/pipe/api/PipeProcessor.java | 10 +++------- .../api/event/dml/insertion/TsFileInsertionEvent.java | 2 +- .../connector/protocol/airgap/IoTDBAirGapConnector.java | 8 ++------ .../protocol/thrift/async/IoTDBThriftAsyncConnector.java | 8 ++------ .../connector/protocol/websocket/WebSocketConnector.java | 12 ++++-------- .../event/common/tsfile/PipeTsFileInsertionEvent.java | 15 +-------------- .../event/common/tsfile/TsFileInsertionDataContainer.java | 1 - 8 files changed, 16 insertions(+), 50 deletions(-) diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java index 3ceb6f73f56..d95034a207b 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java @@ -128,13 +128,9 @@ public interface PipeConnector extends PipePlugin { * @throws Exception the user can throw errors if necessary */ default void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception { - try { - for (final TabletInsertionEvent tabletInsertionEvent : - tsFileInsertionEvent.toTabletInsertionEvents()) { - transfer(tabletInsertionEvent); - } - } finally { - tsFileInsertionEvent.close(); + for (final TabletInsertionEvent tabletInsertionEvent : + tsFileInsertionEvent.toTabletInsertionEvents()) { + transfer(tabletInsertionEvent); } } diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java index c5d0cfc4e7c..b8388992a17 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java @@ -105,13 +105,9 @@ public interface PipeProcessor extends PipePlugin { */ default void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) throws Exception { - try { - for (final TabletInsertionEvent tabletInsertionEvent : - tsFileInsertionEvent.toTabletInsertionEvents()) { - process(tabletInsertionEvent, eventCollector); - } - } finally { - tsFileInsertionEvent.close(); + for (final TabletInsertionEvent tabletInsertionEvent : + tsFileInsertionEvent.toTabletInsertionEvents()) { + process(tabletInsertionEvent, eventCollector); } } diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java index b03d8877fb4..815199e6467 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java @@ -25,7 +25,7 @@ import org.apache.iotdb.pipe.api.event.Event; * {@link TsFileInsertionEvent} is used to define the event of writing TsFile. Event data stores in * disks, which is compressed and encoded, and requires IO cost for computational processing. */ -public interface TsFileInsertionEvent extends Event, AutoCloseable { +public interface TsFileInsertionEvent extends Event { /** * The method is used to convert the TsFileInsertionEvent into several TabletInsertionEvents. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java index add1c166687..f6414ef7265 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java @@ -237,12 +237,8 @@ public class IoTDBAirGapConnector extends IoTDBConnector { } if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) { - try { - for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { - transfer(event); - } - } finally { - tsFileInsertionEvent.close(); + for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { + transfer(event); } return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java index 42cdd8e7818..376d85f354b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java @@ -292,12 +292,8 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector { } if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) { - try { - for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { - transfer(event); - } - } finally { - tsFileInsertionEvent.close(); + for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { + transfer(event); } return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java index 9ee87b1df54..37b12170582 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java @@ -117,14 +117,10 @@ public class WebSocketConnector implements PipeConnector { tsFileInsertionEvent); return; } - try { - for (TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { - long commitId = commitIdGenerator.incrementAndGet(); - ((EnrichedEvent) event).increaseReferenceCount(WebSocketConnector.class.getName()); - server.addEvent(new Pair<>(commitId, event)); - } - } finally { - tsFileInsertionEvent.close(); + for (TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { + long commitId = commitIdGenerator.incrementAndGet(); + ((EnrichedEvent) event).increaseReferenceCount(WebSocketConnector.class.getName()); + server.addEvent(new Pair<>(commitId, event)); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 0049542ffac..83642dd7417 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -185,32 +185,19 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns } return dataContainer.toTabletInsertionEvents(); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - close(); - final String errorMsg = String.format( "Interrupted when waiting for closing TsFile %s.", resource.getTsFilePath()); LOGGER.warn(errorMsg, e); + Thread.currentThread().interrupt(); throw new PipeException(errorMsg); } catch (IOException e) { - close(); - final String errorMsg = String.format("Read TsFile %s error.", resource.getTsFilePath()); LOGGER.warn(errorMsg, e); throw new PipeException(errorMsg); } } - /** Release the resource of data container. */ - @Override - public void close() { - if (dataContainer != null) { - dataContainer.close(); - dataContainer = null; - } - } - /////////////////////////// Object /////////////////////////// @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java index 6ef981b60f3..6c118880e77 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java @@ -163,7 +163,6 @@ public class TsFileInsertionDataContainer implements AutoCloseable { public boolean hasNext() { while (tabletIterator == null || !tabletIterator.hasNext()) { if (!deviceMeasurementsMapIterator.hasNext()) { - close(); return false; }
