This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-parallel-connector in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 266d251c7bc2e1a693e85cc695b70d91a2c07cac Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Jun 8 01:07:39 2023 +0800 TsFileInsertionDataContain: release file when exception occurred --- .../tsfile/TsFileInsertionDataContainer.java | 42 ++++++++++++++-------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java index 6426e8dccad..c56c33fc035 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java @@ -54,11 +54,16 @@ public class TsFileInsertionDataContainer implements AutoCloseable { public TsFileInsertionDataContainer(File tsFile, String pattern) throws IOException { this.pattern = pattern; - tsFileSequenceReader = new TsFileSequenceReader(tsFile.getAbsolutePath()); - tsFileReader = new TsFileReader(tsFileSequenceReader); - - deviceMeasurementsMapIterator = filterDeviceMeasurementsMapByPattern().entrySet().iterator(); - measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap(); + try { + tsFileSequenceReader = new TsFileSequenceReader(tsFile.getAbsolutePath()); + tsFileReader = new TsFileReader(tsFileSequenceReader); + + deviceMeasurementsMapIterator = filterDeviceMeasurementsMapByPattern().entrySet().iterator(); + measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap(); + } catch (IOException e) { + close(); + throw e; + } } private Map<String, List<String>> filterDeviceMeasurementsMapByPattern() throws IOException { @@ -116,11 +121,13 @@ public class TsFileInsertionDataContainer implements AutoCloseable { @Override public TabletInsertionEvent next() { if (!hasNext()) { + close(); throw new NoSuchElementException(); } while (tabletIterator == null || !tabletIterator.hasNext()) { if (!deviceMeasurementsMapIterator.hasNext()) { + close(); throw new NoSuchElementException(); } @@ -131,6 +138,7 @@ public class TsFileInsertionDataContainer implements AutoCloseable { new TsFileInsertionDataTabletIterator( tsFileReader, measurementDataTypeMap, entry.getKey(), entry.getValue()); } catch (IOException e) { + close(); throw new PipeException("failed to create TsFileInsertionDataTabletIterator", e); } } @@ -139,11 +147,7 @@ public class TsFileInsertionDataContainer implements AutoCloseable { new PipeRawTabletInsertionEvent(tabletIterator.next()); if (!hasNext()) { - try { - close(); - } catch (Exception e) { - LOGGER.warn("Failed to close TsFileInsertionDataContainer", e); - } + close(); } return next; @@ -152,12 +156,20 @@ public class TsFileInsertionDataContainer implements AutoCloseable { } @Override - public void close() throws Exception { - if (tsFileReader != null) { - tsFileReader.close(); + public void close() { + try { + if (tsFileReader != null) { + tsFileReader.close(); + } + } catch (IOException e) { + LOGGER.warn("Failed to close TsFileReader", e); } - if (tsFileSequenceReader != null) { - tsFileSequenceReader.close(); + try { + if (tsFileSequenceReader != null) { + tsFileSequenceReader.close(); + } + } catch (IOException e) { + LOGGER.warn("Failed to close TsFileSequenceReader", e); } } }
