This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new abc189c  fix channel close bug in merge process (#103)
abc189c is described below

commit abc189cd3389e839485f548cc2aa32d43c21c8e0
Author: Beyyes <[email protected]>
AuthorDate: Wed Mar 27 21:45:34 2019 +0800

    fix channel close bug in merge process (#103)
    
    * fix channel close bug, add file reader remove function invoking in merge 
process and make SysTimeVersionControllerTest more stable
---
 .../db/engine/filenode/FileNodeProcessor.java      |  36 ++++--
 .../db/engine/overflow/io/OverflowProcessor.java   |   2 +
 .../db/engine/overflow/io/OverflowResource.java    |   1 +
 .../iotdb/db/query/control/FileReaderManager.java  |   2 +-
 .../db/query/control/OpenedFilePathsManager.java   |   4 +-
 .../db/query/factory/SeriesReaderFactory.java      |   1 +
 .../version/SysTimeVersionControllerTest.java      |  14 ++-
 .../db/integration/IoTDBFlushQueryMergeTest.java   | 130 +++++++++++++++++++++
 8 files changed, 176 insertions(+), 14 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 419648d..72348b6 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -59,6 +59,7 @@ import 
org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
 import org.apache.iotdb.db.engine.pool.MergeManager;
 import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
 import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
@@ -76,6 +77,7 @@ import org.apache.iotdb.db.monitor.IStatistic;
 import org.apache.iotdb.db.monitor.MonitorConstants;
 import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
 import org.apache.iotdb.db.query.reader.IReader;
 import org.apache.iotdb.db.sync.conf.Constans;
@@ -748,7 +750,7 @@ public class FileNodeProcessor extends Processor implements 
IStatistic {
    * query data.
    */
   public <T extends Comparable<T>> QueryDataSource query(String deviceId, 
String measurementId,
-      QueryContext context) throws FileNodeProcessorException {
+         QueryContext context) throws FileNodeProcessorException {
     // query overflow data
     MeasurementSchema mSchema;
     TSDataType dataType;
@@ -1352,6 +1354,7 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
 
         // overflow switch from merge to work
         overflowProcessor.switchMergeToWork();
+
         // write status to file
         isMerging = FileNodeProcessorStatus.NONE;
         synchronized (fileNodeProcessorStore) {
@@ -1406,15 +1409,19 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
     }
   }
 
-  private void deleteBufferWriteFiles(List<File> bufferwriteDirList, 
Set<String> bufferFiles) {
+  private void deleteBufferWriteFiles(List<File> bufferwriteDirList, 
Set<String> bufferFiles)
+      throws IOException {
     for (File bufferwriteDir : bufferwriteDirList) {
       File[] files = bufferwriteDir.listFiles();
       if (files == null) {
         continue;
       }
       for (File file : files) {
-        if (!bufferFiles.contains(file.getPath()) && !file.delete()) {
-          LOGGER.warn("Cannot delete BufferWrite file {}", file.getPath());
+        if (!bufferFiles.contains(file.getPath())) {
+          
FileReaderManager.getInstance().closeFileAndRemoveReader(file.getPath());
+          if (!file.delete()) {
+            LOGGER.warn("Cannot delete BufferWrite file {}", file.getPath());
+          }
         }
       }
     }
@@ -1442,6 +1449,8 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
     mergeDeleteLock.lock();
     QueryContext context = new QueryContext();
     try {
+      
FileReaderManager.getInstance().increaseFileReaderReference(backupIntervalFile.getFilePath(),
+          true);
       for (String deviceId : backupIntervalFile.getStartTimeMap().keySet()) {
         // query one deviceId
         List<Path> pathList = new ArrayList<>();
@@ -1471,11 +1480,17 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
               .and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
                   TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
           SingleSeriesExpression seriesFilter = new 
SingleSeriesExpression(path, timeFilter);
+
+          for (OverflowInsertFile overflowInsertFile : 
overflowSeriesDataSource.getOverflowInsertFileList()) {
+            
FileReaderManager.getInstance().increaseFileReaderReference(overflowInsertFile.getFilePath(),
+                false);
+          }
+
           IReader seriesReader = SeriesReaderFactory.getInstance()
               .createSeriesReaderForMerge(backupIntervalFile,
                   overflowSeriesDataSource, seriesFilter, context);
           numOfChunk += queryAndWriteSeries(seriesReader, path, seriesFilter, 
dataType,
-              startTimeMap, endTimeMap);
+              startTimeMap, endTimeMap, overflowSeriesDataSource);
         }
         if (mergeIsChunkGroupHasData) {
           // end the new rowGroupMetadata
@@ -1485,6 +1500,9 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
         }
       }
     } finally {
+      
FileReaderManager.getInstance().decreaseFileReaderReference(backupIntervalFile.getFilePath(),
+          true);
+
       if (mergeDeleteLock.isLocked()) {
         mergeDeleteLock.unlock();
       }
@@ -1505,7 +1523,8 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
 
   private int queryAndWriteSeries(IReader seriesReader, Path path,
       SingleSeriesExpression seriesFilter, TSDataType dataType,
-      Map<String, Long> startTimeMap, Map<String, Long> endTimeMap)
+      Map<String, Long> startTimeMap, Map<String, Long> endTimeMap,
+      OverflowSeriesDataSource overflowSeriesDataSource)
       throws IOException {
     int numOfChunk = 0;
     try {
@@ -1551,7 +1570,10 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
         seriesWriterImpl.writeToFileWriter(mergeFileWriter);
       }
     } finally {
-      seriesReader.close();
+      for (OverflowInsertFile overflowInsertFile : 
overflowSeriesDataSource.getOverflowInsertFileList()) {
+        
FileReaderManager.getInstance().decreaseFileReaderReference(overflowInsertFile.getFilePath(),
+                false);
+      }
     }
     return numOfChunk;
   }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index b4669ec..0debfe4 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.OverflowProcessorException;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.utils.ImmediateFuture;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -409,6 +410,7 @@ public class OverflowProcessor extends Processor {
 
   public void switchMergeToWork() throws IOException {
     if (mergeResource != null) {
+      
FileReaderManager.getInstance().closeFileAndRemoveReader(mergeResource.getInsertFilePath());
       mergeResource.close();
       mergeResource.deleteResource();
       mergeResource = null;
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
index 67916b2..fbaf317 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index 90e1932..461ff0b 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -91,8 +91,8 @@ public class FileReaderManager implements IService {
 
     executorService.scheduleAtFixedRate(() -> {
       synchronized (this) {
-        clearMap(unclosedFileReaderMap, unclosedReferenceMap);
         clearMap(closedFileReaderMap, closedReferenceMap);
+        clearMap(unclosedFileReaderMap, unclosedReferenceMap);
       }
     }, 0, examinePeriod, TimeUnit.MILLISECONDS);
   }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
index 585d85d..bc6ee38 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
@@ -94,11 +94,11 @@ public class OpenedFilePathsManager {
       jobIdContainer.remove();
 
       for (String filePath : closedFilePathsMap.get(jobId)) {
-        FileReaderManager.getInstance().decreaseFileReaderReference(filePath, 
false);
+        FileReaderManager.getInstance().decreaseFileReaderReference(filePath, 
true);
       }
       closedFilePathsMap.remove(jobId);
       for (String filePath : unclosedFilePathsMap.get(jobId)) {
-        FileReaderManager.getInstance().decreaseFileReaderReference(filePath, 
true);
+        FileReaderManager.getInstance().decreaseFileReaderReference(filePath, 
false);
       }
       unclosedFilePathsMap.remove(jobId);
     }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index 0e14b4b..42e81f2 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -160,6 +160,7 @@ public class SeriesReaderFactory {
       SingleSeriesExpression singleSeriesExpression,
       QueryContext context)
       throws IOException {
+
     TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
         .get(fileNode.getFilePath(), true);
     ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileSequenceReader);
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java
index 1912d8a..f494da7 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java
@@ -29,9 +29,15 @@ public class SysTimeVersionControllerTest {
   public void test() {
     VersionController versionController = SysTimeVersionController.INSTANCE;
     long diff = versionController.currVersion() - System.currentTimeMillis();
-    // TODO these comparisons can fail in very rare conditions, how to fix?
-    assertTrue(diff >= -2 && diff <= 2);
-    diff = versionController.nextVersion() - System.currentTimeMillis();
-    assertTrue(diff >= -2 && diff <= 2);
+    // to aovid the test failure on a poor machine, we bear 200ms difference 
here.
+    assertTrue(diff >= -200 && diff <= 200);
+    diff = versionController.nextVersion();
+    try {
+      Thread.sleep(200);
+      diff -= System.currentTimeMillis();
+      assertTrue(diff >= -1000 && diff <= -200);
+    } catch (InterruptedException e) {
+      //do nothing
+    }
   }
 }
\ No newline at end of file
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
new file mode 100644
index 0000000..39691bc
--- /dev/null
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import java.sql.*;
+import static org.junit.Assert.fail;
+
+public class IoTDBFlushQueryMergeTest {
+
+  private static IoTDB daemon;
+  private static String[] sqls = new String[]{
+      "SET STORAGE GROUP TO root.vehicle.d0",
+      "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+
+      "insert into root.vehicle.d0(timestamp,s0) values(1,101)",
+      "insert into root.vehicle.d0(timestamp,s0) values(2,198)",
+      "insert into root.vehicle.d0(timestamp,s0) values(100,99)",
+      "insert into root.vehicle.d0(timestamp,s0) values(101,99)",
+      "insert into root.vehicle.d0(timestamp,s0) values(102,80)",
+      "insert into root.vehicle.d0(timestamp,s0) values(103,99)",
+      "insert into root.vehicle.d0(timestamp,s0) values(104,90)",
+      "insert into root.vehicle.d0(timestamp,s0) values(105,99)",
+      "insert into root.vehicle.d0(timestamp,s0) values(106,99)",
+      "flush",
+      "insert into root.vehicle.d0(timestamp,s0) values(2,10000)",
+      "insert into root.vehicle.d0(timestamp,s0) values(50,10000)",
+      "insert into root.vehicle.d0(timestamp,s0) values(1000,22222)",
+
+  };
+
+  private static IoTDBConfig iotDBConfig = 
IoTDBDescriptor.getInstance().getConfig();
+  private static long overflowFileSizeThreshold;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.closeMemControl();
+    daemon = IoTDB.getInstance();
+    daemon.active();
+    EnvironmentUtils.envSetUp();
+    overflowFileSizeThreshold = iotDBConfig.getOverflowFileSizeThreshold();
+    iotDBConfig.setOverflowFileSizeThreshold(0);
+    insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    daemon.stop();
+    iotDBConfig.setOverflowFileSizeThreshold(overflowFileSizeThreshold);
+    EnvironmentUtils.cleanEnv();
+  }
+
+  private static void insertData() throws ClassNotFoundException, SQLException 
{
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    Connection connection = null;
+    try {
+      connection = DriverManager
+          .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", 
"root");
+      Statement statement = connection.createStatement();
+      for (String sql : sqls) {
+        statement.execute(sql);
+      }
+      statement.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  @Test
+  public void selectAllSQLTest() throws ClassNotFoundException, SQLException {
+
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    Connection connection = null;
+    try {
+      connection = DriverManager
+          .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", 
"root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select * from root");
+      Assert.assertTrue(hasResultSet);
+
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        cnt++;
+      }
+      statement.close();
+
+      statement = connection.createStatement();
+      statement.execute("merge");
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+}

Reply via email to