This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch fix_closed_channel_issue
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 85576146cc08235188ffd4f1cd477be70cbf2717
Author: CGF <[email protected]>
AuthorDate: Tue Mar 19 11:48:29 2019 +0800

    fix channel close bug
---
 .../iotdb/db/engine/filenode/FileNodeProcessor.java    | 18 +++++++++++++-----
 .../iotdb/db/query/control/FileReaderManager.java      |  2 +-
 .../iotdb/db/query/factory/SeriesReaderFactory.java    | 11 +++++++++++
 3 files changed, 25 insertions(+), 6 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 6f56ebc..67a5ea2 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -57,7 +57,6 @@ import 
org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
 import org.apache.iotdb.db.engine.pool.MergeManager;
 import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
-import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
@@ -74,6 +73,7 @@ import org.apache.iotdb.db.monitor.IStatistic;
 import org.apache.iotdb.db.monitor.MonitorConstants;
 import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
 import org.apache.iotdb.db.query.reader.IReader;
 import org.apache.iotdb.db.utils.MemUtils;
@@ -747,7 +747,7 @@ public class FileNodeProcessor extends Processor implements 
IStatistic {
    * query data.
    */
   public <T extends Comparable<T>> QueryDataSource query(String deviceId, 
String measurementId,
-      QueryContext context) throws FileNodeProcessorException {
+                                                         QueryContext context) 
throws FileNodeProcessorException {
     // query overflow data
     MeasurementSchema mSchema;
     TSDataType dataType;
@@ -1469,11 +1469,13 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
               .and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
                   TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
           SingleSeriesExpression seriesFilter = new 
SingleSeriesExpression(path, timeFilter);
+
           IReader seriesReader = SeriesReaderFactory.getInstance()
               .createSeriesReaderForMerge(backupIntervalFile,
                   overflowSeriesDataSource, seriesFilter, context);
+
           numOfChunk += queryAndWriteSeries(seriesReader, path, seriesFilter, 
dataType,
-              startTimeMap, endTimeMap);
+              startTimeMap, endTimeMap, backupIntervalFile, 
overflowSeriesDataSource);
         }
         if (mergeIsChunkGroupHasData) {
           // end the new rowGroupMetadata
@@ -1503,7 +1505,8 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
 
   private int queryAndWriteSeries(IReader seriesReader, Path path,
       SingleSeriesExpression seriesFilter, TSDataType dataType,
-      Map<String, Long> startTimeMap, Map<String, Long> endTimeMap)
+      Map<String, Long> startTimeMap, Map<String, Long> endTimeMap,
+      TsFileResource tsFileResource, OverflowSeriesDataSource 
overflowSeriesDataSource)
       throws IOException {
     int numOfChunk = 0;
     try {
@@ -1549,7 +1552,12 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
         seriesWriterImpl.writeToFileWriter(mergeFileWriter);
       }
     } finally {
-      seriesReader.close();
+      
FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource.getFilePath(),
+              true);
+      for (OverflowInsertFile overflowInsertFile : 
overflowSeriesDataSource.getOverflowInsertFileList()) {
+        
FileReaderManager.getInstance().decreaseFileReaderReference(overflowInsertFile.getFilePath(),
+                false);
+      }
     }
     return numOfChunk;
   }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index 90e1932..461ff0b 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -91,8 +91,8 @@ public class FileReaderManager implements IService {
 
     executorService.scheduleAtFixedRate(() -> {
       synchronized (this) {
-        clearMap(unclosedFileReaderMap, unclosedReferenceMap);
         clearMap(closedFileReaderMap, closedReferenceMap);
+        clearMap(unclosedFileReaderMap, unclosedReferenceMap);
       }
     }, 0, examinePeriod, TimeUnit.MILLISECONDS);
   }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index 0e14b4b..2329540 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -79,6 +79,11 @@ public class SeriesReaderFactory {
     for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource
         .getOverflowInsertFileList()) {
 
+      // add current overflowInsertFile reference to FileReaderManager
+      // to avoid that this reader is cleared in fix time
+      
FileReaderManager.getInstance().increaseFileReaderReference(overflowInsertFile.getFilePath(),
+              false);
+
       // store only one opened file stream into manager, to avoid too many 
opened files
       TsFileSequenceReader unClosedTsFileReader = 
FileReaderManager.getInstance()
           .get(overflowInsertFile.getFilePath(), false);
@@ -160,6 +165,12 @@ public class SeriesReaderFactory {
       SingleSeriesExpression singleSeriesExpression,
       QueryContext context)
       throws IOException {
+
+    // add current tsfile reference to FileReaderManager
+    // to avoid that this reader is cleared in fix time
+    
FileReaderManager.getInstance().increaseFileReaderReference(fileNode.getFilePath(),
+            true);
+
     TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
         .get(fileNode.getFilePath(), true);
     ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileSequenceReader);

Reply via email to