This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 209c66c  Add force stop storage engine interface (#1289)
209c66c is described below

commit 209c66c9426c5549c8e0803ad99ba5d3b75ba907
Author: Jialin Qiao <[email protected]>
AuthorDate: Fri Jun 19 15:58:34 2020 +0800

    Add force stop storage engine interface (#1289)
    
    * add testStop for restart IT
    Co-authored-by: HTHou <[email protected]>
---
 docs/UserGuide/Client/Status Codes.md              |   2 +
 docs/zh/UserGuide/Client/Status Codes.md           |   2 +
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  30 ++-
 .../engine/storagegroup/StorageGroupProcessor.java |  19 ++
 .../db/engine/storagegroup/TsFileProcessor.java    |  12 +
 .../ShutdownException.java}                        |  36 ++-
 .../org/apache/iotdb/db/metadata/MManager.java     |   1 +
 .../java/org/apache/iotdb/db/service/IService.java |   3 +
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   8 +
 .../apache/iotdb/db/service/RegisterManager.java   |  15 ++
 .../writelog/manager/MultiFileLogNodeManager.java  |  57 ++---
 .../iotdb/db/writelog/node/WriteLogNode.java       |   1 +
 .../db/integration/IoTDBRecoverUnclosedIT.java     | 270 +++++++++++++++++++++
 .../iotdb/db/integration/IoTDBRestartIT.java       | 148 ++++++++++-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |  10 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   3 +-
 16 files changed, 548 insertions(+), 69 deletions(-)

diff --git a/docs/UserGuide/Client/Status Codes.md 
b/docs/UserGuide/Client/Status Codes.md
index eafdd0b..55245a4 100644
--- a/docs/UserGuide/Client/Status Codes.md     
+++ b/docs/UserGuide/Client/Status Codes.md     
@@ -84,6 +84,8 @@ Here is a list of Status Code and related message:
 |502|READ_ONLY_SYSTEM_ERROR|Operating system is read only|
 |503|DISK_SPACE_INSUFFICIENT_ERROR|Disk space is insufficient|
 |504|START_UP_ERROR|Meet error while starting up|
+|505|SHUT_DOWN_ERROR|Meet error while shutdown|
+|506|MULTIPLE_ERROR|Meet error when executing multiple statements|
 |600|WRONG_LOGIN_PASSWORD_ERROR|Username or password is wrong|
 |601|NOT_LOGIN_ERROR|Has not logged in|
 |602|NO_PERMISSION_ERROR|No permissions for this operation|
diff --git a/docs/zh/UserGuide/Client/Status Codes.md 
b/docs/zh/UserGuide/Client/Status Codes.md
index de81671..f509e63 100644
--- a/docs/zh/UserGuide/Client/Status Codes.md  
+++ b/docs/zh/UserGuide/Client/Status Codes.md  
@@ -84,6 +84,8 @@ try {
 |502|READ_ONLY_SYSTEM_ERROR|系统只读|
 |503|DISK_SPACE_INSUFFICIENT_ERROR|磁盘空间不足|
 |504|START_UP_ERROR|启动错误|
+|505|SHUT_DOWN_ERROR|关机错误|
+|506|MULTIPLE_ERROR|多行语句执行错误|
 |600|WRONG_LOGIN_PASSWORD_ERROR|用户名或密码错误|
 |601|NOT_LOGIN_ERROR|没有登录|
 |602|NO_PERMISSION_ERROR|没有操作权限|
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 544ea7f..60eda4b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -48,7 +48,9 @@ import 
org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.BatchInsertionException;
 import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.exception.ShutdownException;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -65,7 +67,6 @@ import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.UpgradeUtils;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.slf4j.Logger;
@@ -228,6 +229,26 @@ public class StorageEngine implements IService {
   }
 
   @Override
+  public void shutdown(long millseconds) throws ShutdownException {
+    try {
+      forceCloseAllProcessor();
+    } catch (TsFileProcessorException e) {
+      throw new ShutdownException(e);
+    }
+    if (ttlCheckThread != null) {
+      ttlCheckThread.shutdownNow();
+      try {
+        ttlCheckThread.awaitTermination(30, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        logger.warn("TTL check thread still doesn't exit after 30s");
+        Thread.currentThread().interrupt();
+      }
+    }
+    recoveryThreadPool.shutdownNow();
+    this.reset();
+  }
+
+  @Override
   public ServiceType getID() {
     return ServiceType.STORAGE_ENGINE_SERVICE;
   }
@@ -318,6 +339,13 @@ public class StorageEngine implements IService {
     }
   }
 
+  public void forceCloseAllProcessor() throws TsFileProcessorException {
+    logger.info("Start closing all storage group processor");
+    for (StorageGroupProcessor processor : processorMap.values()) {
+      processor.forceCloseAllWorkingTsFileProcessors();
+    }
+  }
+
   public void asyncCloseProcessor(String storageGroupName, boolean isSeq)
       throws StorageGroupNotSetException {
     StorageGroupProcessor processor = processorMap.get(storageGroupName);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 029126b..6f051cc 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1193,6 +1193,25 @@ public class StorageGroupProcessor {
     }
   }
 
+  public void forceCloseAllWorkingTsFileProcessors() throws 
TsFileProcessorException {
+    writeLock();
+    try {
+      logger.info("force close all processors in storage group: {}", 
storageGroupName);
+      // to avoid concurrent modification problem, we need a new array list
+      for (TsFileProcessor tsFileProcessor : new ArrayList<>(
+          workSequenceTsFileProcessors.values())) {
+        tsFileProcessor.putMemTableBackAndClose();
+      }
+      // to avoid concurrent modification problem, we need a new array list
+      for (TsFileProcessor tsFileProcessor : new ArrayList<>(
+          workUnsequenceTsFileProcessors.values())) {
+        tsFileProcessor.putMemTableBackAndClose();
+      }
+    } finally {
+      writeUnlock();
+    }
+  }
+
   // TODO need a read lock, please consider the concurrency with flush manager 
threads.
   public QueryDataSource query(String deviceId, String measurementId, 
QueryContext context,
       QueryFileManager filePathsManager, Filter timeFilter) throws 
QueryProcessException {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 6d90ed8..1e82581 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -779,4 +779,16 @@ public class TsFileProcessor {
   public void setTimeRangeId(long timeRangeId) {
     this.timeRangeId = timeRangeId;
   }
+
+  public void putMemTableBackAndClose() throws TsFileProcessorException {
+    if (workMemTable != null) {
+      workMemTable.release();
+      MemTablePool.getInstance().putBack(workMemTable, storageGroupName);
+    }
+    try {
+      writer.close();
+    } catch (IOException e) {
+      throw new TsFileProcessorException(e);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IService.java 
b/server/src/main/java/org/apache/iotdb/db/exception/ShutdownException.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/service/IService.java
copy to 
server/src/main/java/org/apache/iotdb/db/exception/ShutdownException.java
index ff6100d..1333980 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IService.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/ShutdownException.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,28 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.service;
+package org.apache.iotdb.db.exception;
 
-import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.rpc.TSStatusCode;
 
-public interface IService {
+public class ShutdownException extends IoTDBException {
 
-  /**
-   * Start current service.
-   */
-  void start() throws StartupException;
 
-  /**
-   * Stop current service. If current service uses thread or thread pool,
-   * current service should guarantee to putBack thread or thread pool.
-   */
-  void stop();
+  public ShutdownException(String message, int errorCode) {
+    super(message, errorCode);
+  }
 
-  default void waitAndStop(long millseconds) {stop();}
+  public ShutdownException(Throwable cause) {
+    super(cause.getMessage(), TSStatusCode.SHUT_DOWN_ERROR.getStatusCode());
+  }
 
-  /**
-   * Get the name of the the service.
-   * @return current service name
-   */
-  ServiceType getID();
+  public ShutdownException(String message, Throwable cause, int errorCode) {
+    super(message, cause, errorCode);
+  }
+
+  public ShutdownException(Throwable cause, int errorCode) {
+    super(cause, errorCode);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java 
b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 23b292b..c80ef9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -176,6 +176,7 @@ public class MManager {
     try {
       tagLogFile = new TagLogFile(config.getSchemaDir(), 
MetadataConstant.TAG_LOG);
 
+      isRecovering = true;
       initFromLog(logFile);
 
       if (config.isEnableParameterAdapter()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IService.java 
b/server/src/main/java/org/apache/iotdb/db/service/IService.java
index ff6100d..96c4918 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IService.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.service;
 
+import org.apache.iotdb.db.exception.ShutdownException;
 import org.apache.iotdb.db.exception.StartupException;
 
 public interface IService {
@@ -35,6 +36,8 @@ public interface IService {
 
   default void waitAndStop(long millseconds) {stop();}
 
+  default void shutdown(long millseconds) throws ShutdownException 
{waitAndStop(millseconds);}
+
   /**
    * Get the name of the the service.
    * @return current service name
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java 
b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 0988c17..eedf20d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -146,6 +146,14 @@ public class IoTDB implements IoTDBMBean {
     deactivate();
   }
 
+  public void shutdown() throws Exception {
+    logger.info("Deactivating IoTDB...");
+    MManager.getInstance().clear();
+    registerManager.shutdownAll();
+    JMXService.deregisterMBean(mbeanName);
+    logger.info("IoTDB is deactivated.");
+  }
+
   private void setUncaughtExceptionHandler() {
     Thread.setDefaultUncaughtExceptionHandler(new 
IoTDBDefaultThreadExceptionHandler());
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/RegisterManager.java 
b/server/src/main/java/org/apache/iotdb/db/service/RegisterManager.java
index 25c0840..59368a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegisterManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegisterManager.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.service;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+
+import org.apache.iotdb.db.exception.ShutdownException;
 import org.apache.iotdb.db.exception.StartupException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,4 +66,17 @@ public class RegisterManager {
     iServices.clear();
     logger.info("deregister all service.");
   }
+  
+  /**
+   * stop all service and clear iService list.
+   */
+  public void shutdownAll() throws ShutdownException {
+    //we stop JMXServer at last
+    Collections.reverse(iServices);
+    for (IService service : iServices) {
+      service.shutdown(10000);
+    }
+    iServices.clear();
+    logger.info("deregister all service.");
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
 
b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
index 57126e5..40e8fd9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
@@ -21,10 +21,11 @@ package org.apache.iotdb.db.writelog.manager;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.iotdb.db.concurrent.ThreadName;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
@@ -42,11 +43,10 @@ public class MultiFileLogNodeManager implements 
WriteLogNodeManager, IService {
   private static final Logger logger = 
LoggerFactory.getLogger(MultiFileLogNodeManager.class);
   private Map<String, WriteLogNode> nodeMap;
 
-  private Thread forceThread;
+  private ScheduledExecutorService executorService;
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  private final Runnable forceTask = () -> {
-      while (true) {
+  private final void forceTask(){
         if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
           logger.warn("system mode is read-only, the force flush WAL task is 
stopped");
           return;
@@ -63,15 +63,7 @@ public class MultiFileLogNodeManager implements 
WriteLogNodeManager, IService {
             logger.error("Cannot force {}, because ", node, e);
           }
         }
-        try {
-          Thread.sleep(config.getForceWalPeriodInMs());
-        } catch (InterruptedException e) {
-          logger.info("WAL force thread exits.");
-          Thread.currentThread().interrupt();
-          break;
-        }
-      }
-  };
+  }
 
   private MultiFileLogNodeManager() {
     nodeMap = new ConcurrentHashMap<>();
@@ -105,18 +97,6 @@ public class MultiFileLogNodeManager implements 
WriteLogNodeManager, IService {
 
   @Override
   public void close() {
-    if (!isActivated(forceThread)) {
-      logger.debug("MultiFileLogNodeManager has not yet started");
-      return;
-    }
-    logger.info("LogNodeManager starts closing..");
-    if (isActivated(forceThread)) {
-      forceThread.interrupt();
-      logger.info("Waiting for force thread to stop");
-      while (forceThread.isAlive()) {
-        // wait for forceThread
-      }
-    }
     logger.info("{} nodes to be closed", nodeMap.size());
     for (WriteLogNode node : nodeMap.values()) {
       try {
@@ -135,14 +115,10 @@ public class MultiFileLogNodeManager implements 
WriteLogNodeManager, IService {
       if (!config.isEnableWal()) {
         return;
       }
-      if (!isActivated(forceThread)) {
-        if (config.getForceWalPeriodInMs() > 0) {
-          InstanceHolder.instance.forceThread = new 
Thread(InstanceHolder.instance.forceTask,
-              ThreadName.WAL_FORCE_DAEMON.getName());
-          InstanceHolder.instance.forceThread.start();
-        }
-      } else {
-        logger.debug("MultiFileLogNodeManager has already started");
+      if (config.getForceWalPeriodInMs() > 0) {
+        executorService = Executors.newSingleThreadScheduledExecutor();
+        executorService.scheduleAtFixedRate(this::forceTask, 
config.getForceWalPeriodInMs(),
+            config.getForceWalPeriodInMs(), TimeUnit.MILLISECONDS);
       }
     } catch (Exception e) {
       throw new StartupException(this.getID().getName(), e.getMessage());
@@ -154,6 +130,15 @@ public class MultiFileLogNodeManager implements 
WriteLogNodeManager, IService {
     if (!config.isEnableWal()) {
       return;
     }
+    if (executorService != null) {
+      executorService.shutdown();
+      try {
+        executorService.awaitTermination(30, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        logger.warn("force flush wal thread still doesn't exit after 30s");
+        Thread.currentThread().interrupt();
+      }
+    }
     close();
   }
 
@@ -162,10 +147,6 @@ public class MultiFileLogNodeManager implements 
WriteLogNodeManager, IService {
     return ServiceType.WAL_SERVICE;
   }
 
-  private boolean isActivated(Thread thread) {
-    return thread != null && thread.isAlive();
-  }
-
   private static class InstanceHolder {
     private InstanceHolder(){}
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java 
b/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
index 0fd3129..a93117b 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
@@ -83,4 +83,5 @@ public interface WriteLogNode {
    * @return an ILogReader which can iterate each log in this log node.
    */
   ILogReader getLogReader();
+
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRecoverUnclosedIT.java
 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRecoverUnclosedIT.java
new file mode 100644
index 0000000..ec9f97d
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRecoverUnclosedIT.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration;
+
+import static org.apache.iotdb.db.constant.TestConstant.count;
+import static org.apache.iotdb.db.constant.TestConstant.max_value;
+import static org.apache.iotdb.db.constant.TestConstant.min_time;
+import static org.apache.iotdb.db.constant.TestConstant.min_value;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBRecoverUnclosedIT {
+
+  private static final String TIMESTAMP_STR = "Time";
+  private static final String TEMPERATURE_STR = 
"root.ln.wf01.wt01.temperature";
+  private static String[] creationSqls = new String[]{
+      "SET STORAGE GROUP TO root.vehicle.d0",
+      "SET STORAGE GROUP TO root.vehicle.d1",
+
+      "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+      "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
+      "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+      "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, 
ENCODING=PLAIN",
+      "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, 
ENCODING=PLAIN"
+  };
+  private static String[] dataSet2 = new String[]{
+      "SET STORAGE GROUP TO root.ln.wf01.wt01",
+      "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, 
ENCODING=PLAIN",
+      "CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=FLOAT, 
ENCODING=PLAIN",
+      "CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32, 
ENCODING=PLAIN",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(1, 1.1, false, 11)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(2, 2.2, true, 22)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(3, 3.3, false, 33 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(4, 4.4, false, 44)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(5, 5.5, false, 55)"
+  };
+  private final String d0s0 = "root.vehicle.d0.s0";
+  private final String d0s1 = "root.vehicle.d0.s1";
+  private final String d0s2 = "root.vehicle.d0.s2";
+  private final String d0s3 = "root.vehicle.d0.s3";
+  private String insertTemplate = "INSERT INTO 
root.vehicle.d0(timestamp,s0,s1,s2,s3,s4)"
+      + " VALUES(%d,%d,%d,%f,%s,%s)";
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    prepareData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void test() throws SQLException, IOException, StartupException {
+    String[] retArray = new String[]{
+        "0,2",
+        "0,4",
+        "0,3"
+    };
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute(
+          "select count(temperature) from root.ln.wf01.wt01 where time > 3");
+
+      Assert.assertTrue(hasResultSet);
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet();) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans = resultSet.getString(TIMESTAMP_STR) + "," +
+              resultSet.getString(count(TEMPERATURE_STR));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(1, cnt);
+      }
+
+      hasResultSet = statement.execute(
+          "select min_time(temperature) from root.ln.wf01.wt01 where time > 
3");
+
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        while (resultSet.next()) {
+          String ans = resultSet.getString(TIMESTAMP_STR) + "," +
+              resultSet.getString(min_time(TEMPERATURE_STR));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(2, cnt);
+      }
+
+      hasResultSet = statement.execute(
+          "select min_time(temperature) from root.ln.wf01.wt01 where 
temperature > 3");
+
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        while (resultSet.next()) {
+          String ans = resultSet.getString(TIMESTAMP_STR) + "," +
+              resultSet.getString(min_time(TEMPERATURE_STR));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(3, cnt);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    try {
+      EnvironmentUtils.restartDaemon();
+    } catch (Exception e) {
+      Assert.fail();
+    }
+    insertMoreData();
+    try {
+      EnvironmentUtils.restartDaemon();
+    } catch (Exception e) {
+      Assert.fail();
+    }
+
+    // maxminValueTest
+    retArray = new String[]{
+        "0,8499,500.0",
+        "0,2499,500.0"
+    };
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("select 
max_value(s0),min_value(s2) " +
+          "from root.vehicle.d0 where time >= 100 and time < 9000");
+
+      Assert.assertTrue(hasResultSet);
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(max_value(d0s0))
+                  + "," + resultSet.getString(min_value(d0s2));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(1, cnt);
+      }
+
+      hasResultSet = statement.execute("select max_value(s0),min_value(s2) " +
+          "from root.vehicle.d0 where time < 2500");
+
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(max_value(d0s0))
+                  + "," + resultSet.getString(min_value(d0s2));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(2, cnt);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private void prepareData() throws SQLException {
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()) {
+      for (String sql : creationSqls) {
+        statement.execute(sql);
+      }
+
+      for (String sql : dataSet2) {
+        statement.execute(sql);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  private void insertMoreData() throws SQLException {
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()) {
+
+      // prepare BufferWrite file
+      for (int i = 5000; i < 7000; i++) {
+        statement.execute(String
+            .format(Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "\'" 
+ i + "\'", "true"));
+      }
+      for (int i = 7500; i < 8500; i++) {
+        statement.execute(String
+            .format(Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "\'" 
+ i + "\'", "false"));
+      }
+      // prepare Unseq-File
+      for (int i = 500; i < 1500; i++) {
+        statement.execute(String
+            .format(Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "\'" 
+ i + "\'", "true"));
+      }
+      for (int i = 3000; i < 6500; i++) {
+        statement.execute(String
+            .format(Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "\'" 
+ i + "\'", "false"));
+      }
+
+      // prepare BufferWrite cache
+      for (int i = 9000; i < 10000; i++) {
+        statement.execute(String
+            .format(Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "\'" 
+ i + "\'", "true"));
+      }
+      // prepare Overflow cache
+      for (int i = 2000; i < 2500; i++) {
+        statement.execute(String
+            .format(Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "\'" 
+ i + "\'", "false"));
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
index e0a657e..e0b84f0 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
@@ -31,6 +31,7 @@ import java.sql.Statement;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class IoTDBRestartIT {
@@ -41,7 +42,7 @@ public class IoTDBRestartIT {
     EnvironmentUtils.envSetUp();
     Class.forName(Config.JDBC_DRIVER_NAME);
 
-    try(Connection connection = DriverManager
+    try (Connection connection = DriverManager
         .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
             "root");
         Statement statement = connection.createStatement()){
@@ -49,18 +50,26 @@ public class IoTDBRestartIT {
       statement.execute("flush");
     }
 
-    EnvironmentUtils.restartDaemon();
+    try {
+      EnvironmentUtils.restartDaemon();
+    } catch (Exception e) {
+      Assert.fail();
+    }
 
-    try(Connection connection = DriverManager
+    try (Connection connection = DriverManager
         .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
             "root");
         Statement statement = connection.createStatement()){
       statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(2,1.0)");
     }
 
-    EnvironmentUtils.restartDaemon();
+    try {
+      EnvironmentUtils.restartDaemon();
+    } catch (Exception e) {
+      Assert.fail();
+    }
 
-    try(Connection connection = DriverManager
+    try (Connection connection = DriverManager
         .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
             "root");
         Statement statement = connection.createStatement()){
@@ -92,7 +101,7 @@ public class IoTDBRestartIT {
     EnvironmentUtils.envSetUp();
     Class.forName(Config.JDBC_DRIVER_NAME);
 
-    try(Connection connection = DriverManager
+    try (Connection connection = DriverManager
         .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
             "root");
         Statement statement = connection.createStatement()){
@@ -101,9 +110,13 @@ public class IoTDBRestartIT {
       statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(3,3)");
     }
 
-    EnvironmentUtils.restartDaemon();
+    try {
+      EnvironmentUtils.restartDaemon();
+    } catch (Exception e) {
+      Assert.fail();
+    }
 
-    try(Connection connection = DriverManager
+    try (Connection connection = DriverManager
         .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
             "root");
         Statement statement = connection.createStatement()){
@@ -142,4 +155,123 @@ public class IoTDBRestartIT {
 
     EnvironmentUtils.cleanEnv();
   }
+
+  @Test
+  public void testRestartQueryLargerThanEndTime()
+      throws SQLException, ClassNotFoundException, IOException, 
StorageEngineException {
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()){
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(1,1)");
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(2,2)");
+    }
+
+    try {
+      EnvironmentUtils.restartDaemon();
+    } catch (Exception e) {
+      Assert.fail();
+    }
+
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(3,1)");
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(4,2)");
+    }
+
+    try {
+      EnvironmentUtils.restartDaemon();
+    } catch (Exception e) {
+      Assert.fail();
+    }
+
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("SELECT s1 FROM root.turbine.d1 
where time > 3");
+      assertTrue(hasResultSet);
+      String[] exp = new String[]{
+          "4,2.0",
+      };
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String result = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(2);
+        assertEquals(exp[cnt], result);
+        cnt++;
+      }
+      assertEquals(1, cnt);
+
+    }
+
+    EnvironmentUtils.cleanEnv();
+  }
+
+
+
+  @Test
+  public void testRestartEndTime()
+      throws SQLException, ClassNotFoundException, IOException, 
StorageEngineException {
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()){
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(1,1)");
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(2,2)");
+    }
+
+    try {
+      EnvironmentUtils.restartDaemon();
+    } catch (Exception e) {
+      Assert.fail();
+    }
+
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("insert into root.turbine.d1(timestamp,s2) 
values(1,1)");
+      statement.execute("insert into root.turbine.d1(timestamp,s2) 
values(2,2)");
+    }
+
+    try {
+      EnvironmentUtils.restartDaemon();
+    } catch (Exception e) {
+      Assert.fail();
+    }
+
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("SELECT s2 FROM 
root.turbine.d1");
+      assertTrue(hasResultSet);
+      String[] exp = new String[]{
+          "1,1.0",
+          "2,2.0"
+      };
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String result = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(2);
+        assertEquals(exp[cnt], result);
+        cnt++;
+      }
+      assertEquals(2, cnt);
+
+    }
+
+    EnvironmentUtils.cleanEnv();
+  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java 
b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 6e1ff67..9ae753a 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -213,6 +213,12 @@ public class EnvironmentUtils {
     }
   }
 
+  public static void shutdownDaemon() throws Exception {
+    if(daemon != null) {
+      daemon.shutdown();
+    }
+  }
+
   public static void activeDaemon() {
     if(daemon != null) {
       daemon.active();
@@ -228,8 +234,8 @@ public class EnvironmentUtils {
     }
   }
 
-  public static void restartDaemon() {
-    stopDaemon();
+  public static void restartDaemon() throws Exception {
+    shutdownDaemon();
     reactiveDaemon();
   }
 
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 48a064f..08aa3ad 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -63,7 +63,8 @@ public enum TSStatusCode {
   READ_ONLY_SYSTEM_ERROR(502),
   DISK_SPACE_INSUFFICIENT_ERROR(503),
   START_UP_ERROR(504),
-  MULTIPLE_ERROR(505),
+  SHUT_DOWN_ERROR(505),
+  MULTIPLE_ERROR(506),
 
   WRONG_LOGIN_PASSWORD_ERROR(600),
   NOT_LOGIN_ERROR(601),

Reply via email to