This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch fix_closed_channel_issue in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 85576146cc08235188ffd4f1cd477be70cbf2717 Author: CGF <[email protected]> AuthorDate: Tue Mar 19 11:48:29 2019 +0800 fix channel close bug --- .../iotdb/db/engine/filenode/FileNodeProcessor.java | 18 +++++++++++++----- .../iotdb/db/query/control/FileReaderManager.java | 2 +- .../iotdb/db/query/factory/SeriesReaderFactory.java | 11 +++++++++++ 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java index 6f56ebc..67a5ea2 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java @@ -57,7 +57,6 @@ import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor; import org.apache.iotdb.db.engine.pool.MergeManager; import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource; -import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile; @@ -74,6 +73,7 @@ import org.apache.iotdb.db.monitor.IStatistic; import org.apache.iotdb.db.monitor.MonitorConstants; import org.apache.iotdb.db.monitor.StatMonitor; import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; import org.apache.iotdb.db.query.reader.IReader; import org.apache.iotdb.db.utils.MemUtils; @@ -747,7 +747,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { * query data. */ public <T extends Comparable<T>> QueryDataSource query(String deviceId, String measurementId, - QueryContext context) throws FileNodeProcessorException { + QueryContext context) throws FileNodeProcessorException { // query overflow data MeasurementSchema mSchema; TSDataType dataType; @@ -1469,11 +1469,13 @@ public class FileNodeProcessor extends Processor implements IStatistic { .and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)), TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId))); SingleSeriesExpression seriesFilter = new SingleSeriesExpression(path, timeFilter); + IReader seriesReader = SeriesReaderFactory.getInstance() .createSeriesReaderForMerge(backupIntervalFile, overflowSeriesDataSource, seriesFilter, context); + numOfChunk += queryAndWriteSeries(seriesReader, path, seriesFilter, dataType, - startTimeMap, endTimeMap); + startTimeMap, endTimeMap, backupIntervalFile, overflowSeriesDataSource); } if (mergeIsChunkGroupHasData) { // end the new rowGroupMetadata @@ -1503,7 +1505,8 @@ public class FileNodeProcessor extends Processor implements IStatistic { private int queryAndWriteSeries(IReader seriesReader, Path path, SingleSeriesExpression seriesFilter, TSDataType dataType, - Map<String, Long> startTimeMap, Map<String, Long> endTimeMap) + Map<String, Long> startTimeMap, Map<String, Long> endTimeMap, + TsFileResource tsFileResource, OverflowSeriesDataSource overflowSeriesDataSource) throws IOException { int numOfChunk = 0; try { @@ -1549,7 +1552,12 @@ public class FileNodeProcessor extends Processor implements IStatistic { seriesWriterImpl.writeToFileWriter(mergeFileWriter); } } finally { - seriesReader.close(); + FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource.getFilePath(), + true); + for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource.getOverflowInsertFileList()) { + FileReaderManager.getInstance().decreaseFileReaderReference(overflowInsertFile.getFilePath(), + false); + } } return numOfChunk; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java index 90e1932..461ff0b 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java @@ -91,8 +91,8 @@ public class FileReaderManager implements IService { executorService.scheduleAtFixedRate(() -> { synchronized (this) { - clearMap(unclosedFileReaderMap, unclosedReferenceMap); clearMap(closedFileReaderMap, closedReferenceMap); + clearMap(unclosedFileReaderMap, unclosedReferenceMap); } }, 0, examinePeriod, TimeUnit.MILLISECONDS); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java index 0e14b4b..2329540 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java @@ -79,6 +79,11 @@ public class SeriesReaderFactory { for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource .getOverflowInsertFileList()) { + // add current overflowInsertFile reference to FileReaderManager + // to avoid that this reader is cleared in fix time + FileReaderManager.getInstance().increaseFileReaderReference(overflowInsertFile.getFilePath(), + false); + // store only one opened file stream into manager, to avoid too many opened files TsFileSequenceReader unClosedTsFileReader = FileReaderManager.getInstance() .get(overflowInsertFile.getFilePath(), false); @@ -160,6 +165,12 @@ public class SeriesReaderFactory { SingleSeriesExpression singleSeriesExpression, QueryContext context) throws IOException { + + // add current tsfile reference to FileReaderManager + // to avoid that this reader is cleared in fix time + FileReaderManager.getInstance().increaseFileReaderReference(fileNode.getFilePath(), + true); + TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance() .get(fileNode.getFilePath(), true); ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileSequenceReader);
