This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 209c66c Add force stop storage engine interface (#1289)
209c66c is described below
commit 209c66c9426c5549c8e0803ad99ba5d3b75ba907
Author: Jialin Qiao <[email protected]>
AuthorDate: Fri Jun 19 15:58:34 2020 +0800
Add force stop storage engine interface (#1289)
* add testStop for restart IT
Co-authored-by: HTHou <[email protected]>
---
docs/UserGuide/Client/Status Codes.md | 2 +
docs/zh/UserGuide/Client/Status Codes.md | 2 +
.../org/apache/iotdb/db/engine/StorageEngine.java | 30 ++-
.../engine/storagegroup/StorageGroupProcessor.java | 19 ++
.../db/engine/storagegroup/TsFileProcessor.java | 12 +
.../ShutdownException.java} | 36 ++-
.../org/apache/iotdb/db/metadata/MManager.java | 1 +
.../java/org/apache/iotdb/db/service/IService.java | 3 +
.../java/org/apache/iotdb/db/service/IoTDB.java | 8 +
.../apache/iotdb/db/service/RegisterManager.java | 15 ++
.../writelog/manager/MultiFileLogNodeManager.java | 57 ++---
.../iotdb/db/writelog/node/WriteLogNode.java | 1 +
.../db/integration/IoTDBRecoverUnclosedIT.java | 270 +++++++++++++++++++++
.../iotdb/db/integration/IoTDBRestartIT.java | 148 ++++++++++-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 10 +-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 +-
16 files changed, 548 insertions(+), 69 deletions(-)
diff --git a/docs/UserGuide/Client/Status Codes.md
b/docs/UserGuide/Client/Status Codes.md
index eafdd0b..55245a4 100644
--- a/docs/UserGuide/Client/Status Codes.md
+++ b/docs/UserGuide/Client/Status Codes.md
@@ -84,6 +84,8 @@ Here is a list of Status Code and related message:
|502|READ_ONLY_SYSTEM_ERROR|Operating system is read only|
|503|DISK_SPACE_INSUFFICIENT_ERROR|Disk space is insufficient|
|504|START_UP_ERROR|Meet error while starting up|
+|505|SHUT_DOWN_ERROR|Meet error while shutdown|
+|506|MULTIPLE_ERROR|Meet error when executing multiple statements|
|600|WRONG_LOGIN_PASSWORD_ERROR|Username or password is wrong|
|601|NOT_LOGIN_ERROR|Has not logged in|
|602|NO_PERMISSION_ERROR|No permissions for this operation|
diff --git a/docs/zh/UserGuide/Client/Status Codes.md
b/docs/zh/UserGuide/Client/Status Codes.md
index de81671..f509e63 100644
--- a/docs/zh/UserGuide/Client/Status Codes.md
+++ b/docs/zh/UserGuide/Client/Status Codes.md
@@ -84,6 +84,8 @@ try {
|502|READ_ONLY_SYSTEM_ERROR|系统只读|
|503|DISK_SPACE_INSUFFICIENT_ERROR|磁盘空间不足|
|504|START_UP_ERROR|启动错误|
+|505|SHUT_DOWN_ERROR|关机错误|
+|506|MULTIPLE_ERROR|多行语句执行错误|
|600|WRONG_LOGIN_PASSWORD_ERROR|用户名或密码错误|
|601|NOT_LOGIN_ERROR|没有登录|
|602|NO_PERMISSION_ERROR|没有操作权限|
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 544ea7f..60eda4b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -48,7 +48,9 @@ import
org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.exception.ShutdownException;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -65,7 +67,6 @@ import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.slf4j.Logger;
@@ -228,6 +229,26 @@ public class StorageEngine implements IService {
}
@Override
+ public void shutdown(long millseconds) throws ShutdownException {
+ try {
+ forceCloseAllProcessor();
+ } catch (TsFileProcessorException e) {
+ throw new ShutdownException(e);
+ }
+ if (ttlCheckThread != null) {
+ ttlCheckThread.shutdownNow();
+ try {
+ ttlCheckThread.awaitTermination(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.warn("TTL check thread still doesn't exit after 30s");
+ Thread.currentThread().interrupt();
+ }
+ }
+ recoveryThreadPool.shutdownNow();
+ this.reset();
+ }
+
+ @Override
public ServiceType getID() {
return ServiceType.STORAGE_ENGINE_SERVICE;
}
@@ -318,6 +339,13 @@ public class StorageEngine implements IService {
}
}
+ public void forceCloseAllProcessor() throws TsFileProcessorException {
+ logger.info("Start closing all storage group processor");
+ for (StorageGroupProcessor processor : processorMap.values()) {
+ processor.forceCloseAllWorkingTsFileProcessors();
+ }
+ }
+
public void asyncCloseProcessor(String storageGroupName, boolean isSeq)
throws StorageGroupNotSetException {
StorageGroupProcessor processor = processorMap.get(storageGroupName);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 029126b..6f051cc 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1193,6 +1193,25 @@ public class StorageGroupProcessor {
}
}
+ public void forceCloseAllWorkingTsFileProcessors() throws
TsFileProcessorException {
+ writeLock();
+ try {
+ logger.info("force close all processors in storage group: {}",
storageGroupName);
+ // to avoid concurrent modification problem, we need a new array list
+ for (TsFileProcessor tsFileProcessor : new ArrayList<>(
+ workSequenceTsFileProcessors.values())) {
+ tsFileProcessor.putMemTableBackAndClose();
+ }
+ // to avoid concurrent modification problem, we need a new array list
+ for (TsFileProcessor tsFileProcessor : new ArrayList<>(
+ workUnsequenceTsFileProcessors.values())) {
+ tsFileProcessor.putMemTableBackAndClose();
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
// TODO need a read lock, please consider the concurrency with flush manager
threads.
public QueryDataSource query(String deviceId, String measurementId,
QueryContext context,
QueryFileManager filePathsManager, Filter timeFilter) throws
QueryProcessException {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 6d90ed8..1e82581 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -779,4 +779,16 @@ public class TsFileProcessor {
public void setTimeRangeId(long timeRangeId) {
this.timeRangeId = timeRangeId;
}
+
+ public void putMemTableBackAndClose() throws TsFileProcessorException {
+ if (workMemTable != null) {
+ workMemTable.release();
+ MemTablePool.getInstance().putBack(workMemTable, storageGroupName);
+ }
+ try {
+ writer.close();
+ } catch (IOException e) {
+ throw new TsFileProcessorException(e);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IService.java
b/server/src/main/java/org/apache/iotdb/db/exception/ShutdownException.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/service/IService.java
copy to
server/src/main/java/org/apache/iotdb/db/exception/ShutdownException.java
index ff6100d..1333980 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IService.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/ShutdownException.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,28 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.service;
+package org.apache.iotdb.db.exception;
-import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.rpc.TSStatusCode;
-public interface IService {
+public class ShutdownException extends IoTDBException {
- /**
- * Start current service.
- */
- void start() throws StartupException;
- /**
- * Stop current service. If current service uses thread or thread pool,
- * current service should guarantee to putBack thread or thread pool.
- */
- void stop();
+ public ShutdownException(String message, int errorCode) {
+ super(message, errorCode);
+ }
- default void waitAndStop(long millseconds) {stop();}
+ public ShutdownException(Throwable cause) {
+ super(cause.getMessage(), TSStatusCode.SHUT_DOWN_ERROR.getStatusCode());
+ }
- /**
- * Get the name of the the service.
- * @return current service name
- */
- ServiceType getID();
+ public ShutdownException(String message, Throwable cause, int errorCode) {
+ super(message, cause, errorCode);
+ }
+
+ public ShutdownException(Throwable cause, int errorCode) {
+ super(cause, errorCode);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 23b292b..c80ef9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -176,6 +176,7 @@ public class MManager {
try {
tagLogFile = new TagLogFile(config.getSchemaDir(),
MetadataConstant.TAG_LOG);
+ isRecovering = true;
initFromLog(logFile);
if (config.isEnableParameterAdapter()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IService.java
b/server/src/main/java/org/apache/iotdb/db/service/IService.java
index ff6100d..96c4918 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IService.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.service;
+import org.apache.iotdb.db.exception.ShutdownException;
import org.apache.iotdb.db.exception.StartupException;
public interface IService {
@@ -35,6 +36,8 @@ public interface IService {
default void waitAndStop(long millseconds) {stop();}
+ default void shutdown(long millseconds) throws ShutdownException
{waitAndStop(millseconds);}
+
/**
* Get the name of the the service.
* @return current service name
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 0988c17..eedf20d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -146,6 +146,14 @@ public class IoTDB implements IoTDBMBean {
deactivate();
}
+ public void shutdown() throws Exception {
+ logger.info("Deactivating IoTDB...");
+ MManager.getInstance().clear();
+ registerManager.shutdownAll();
+ JMXService.deregisterMBean(mbeanName);
+ logger.info("IoTDB is deactivated.");
+ }
+
private void setUncaughtExceptionHandler() {
Thread.setDefaultUncaughtExceptionHandler(new
IoTDBDefaultThreadExceptionHandler());
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/RegisterManager.java
b/server/src/main/java/org/apache/iotdb/db/service/RegisterManager.java
index 25c0840..59368a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegisterManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegisterManager.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.service;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+
+import org.apache.iotdb.db.exception.ShutdownException;
import org.apache.iotdb.db.exception.StartupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,4 +66,17 @@ public class RegisterManager {
iServices.clear();
logger.info("deregister all service.");
}
+
+ /**
+ * stop all service and clear iService list.
+ */
+ public void shutdownAll() throws ShutdownException {
+ //we stop JMXServer at last
+ Collections.reverse(iServices);
+ for (IService service : iServices) {
+ service.shutdown(10000);
+ }
+ iServices.clear();
+ logger.info("deregister all service.");
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
index 57126e5..40e8fd9 100644
---
a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
@@ -21,10 +21,11 @@ package org.apache.iotdb.db.writelog.manager;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.iotdb.db.concurrent.ThreadName;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
@@ -42,11 +43,10 @@ public class MultiFileLogNodeManager implements
WriteLogNodeManager, IService {
private static final Logger logger =
LoggerFactory.getLogger(MultiFileLogNodeManager.class);
private Map<String, WriteLogNode> nodeMap;
- private Thread forceThread;
+ private ScheduledExecutorService executorService;
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private final Runnable forceTask = () -> {
- while (true) {
+ private final void forceTask(){
if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
logger.warn("system mode is read-only, the force flush WAL task is
stopped");
return;
@@ -63,15 +63,7 @@ public class MultiFileLogNodeManager implements
WriteLogNodeManager, IService {
logger.error("Cannot force {}, because ", node, e);
}
}
- try {
- Thread.sleep(config.getForceWalPeriodInMs());
- } catch (InterruptedException e) {
- logger.info("WAL force thread exits.");
- Thread.currentThread().interrupt();
- break;
- }
- }
- };
+ }
private MultiFileLogNodeManager() {
nodeMap = new ConcurrentHashMap<>();
@@ -105,18 +97,6 @@ public class MultiFileLogNodeManager implements
WriteLogNodeManager, IService {
@Override
public void close() {
- if (!isActivated(forceThread)) {
- logger.debug("MultiFileLogNodeManager has not yet started");
- return;
- }
- logger.info("LogNodeManager starts closing..");
- if (isActivated(forceThread)) {
- forceThread.interrupt();
- logger.info("Waiting for force thread to stop");
- while (forceThread.isAlive()) {
- // wait for forceThread
- }
- }
logger.info("{} nodes to be closed", nodeMap.size());
for (WriteLogNode node : nodeMap.values()) {
try {
@@ -135,14 +115,10 @@ public class MultiFileLogNodeManager implements
WriteLogNodeManager, IService {
if (!config.isEnableWal()) {
return;
}
- if (!isActivated(forceThread)) {
- if (config.getForceWalPeriodInMs() > 0) {
- InstanceHolder.instance.forceThread = new
Thread(InstanceHolder.instance.forceTask,
- ThreadName.WAL_FORCE_DAEMON.getName());
- InstanceHolder.instance.forceThread.start();
- }
- } else {
- logger.debug("MultiFileLogNodeManager has already started");
+ if (config.getForceWalPeriodInMs() > 0) {
+ executorService = Executors.newSingleThreadScheduledExecutor();
+ executorService.scheduleAtFixedRate(this::forceTask,
config.getForceWalPeriodInMs(),
+ config.getForceWalPeriodInMs(), TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
throw new StartupException(this.getID().getName(), e.getMessage());
@@ -154,6 +130,15 @@ public class MultiFileLogNodeManager implements
WriteLogNodeManager, IService {
if (!config.isEnableWal()) {
return;
}
+ if (executorService != null) {
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.warn("force flush wal thread still doesn't exit after 30s");
+ Thread.currentThread().interrupt();
+ }
+ }
close();
}
@@ -162,10 +147,6 @@ public class MultiFileLogNodeManager implements
WriteLogNodeManager, IService {
return ServiceType.WAL_SERVICE;
}
- private boolean isActivated(Thread thread) {
- return thread != null && thread.isAlive();
- }
-
private static class InstanceHolder {
private InstanceHolder(){}
diff --git
a/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
b/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
index 0fd3129..a93117b 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
@@ -83,4 +83,5 @@ public interface WriteLogNode {
* @return an ILogReader which can iterate each log in this log node.
*/
ILogReader getLogReader();
+
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRecoverUnclosedIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRecoverUnclosedIT.java
new file mode 100644
index 0000000..ec9f97d
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRecoverUnclosedIT.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration;
+
+import static org.apache.iotdb.db.constant.TestConstant.count;
+import static org.apache.iotdb.db.constant.TestConstant.max_value;
+import static org.apache.iotdb.db.constant.TestConstant.min_time;
+import static org.apache.iotdb.db.constant.TestConstant.min_value;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBRecoverUnclosedIT {
+
+ private static final String TIMESTAMP_STR = "Time";
+ private static final String TEMPERATURE_STR =
"root.ln.wf01.wt01.temperature";
+ private static String[] creationSqls = new String[]{
+ "SET STORAGE GROUP TO root.vehicle.d0",
+ "SET STORAGE GROUP TO root.vehicle.d1",
+
+ "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT,
ENCODING=PLAIN",
+ "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN,
ENCODING=PLAIN"
+ };
+ private static String[] dataSet2 = new String[]{
+ "SET STORAGE GROUP TO root.ln.wf01.wt01",
+ "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN,
ENCODING=PLAIN",
+ "CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=FLOAT,
ENCODING=PLAIN",
+ "CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32,
ENCODING=PLAIN",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(1, 1.1, false, 11)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(2, 2.2, true, 22)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(3, 3.3, false, 33 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(4, 4.4, false, 44)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(5, 5.5, false, 55)"
+ };
+ private final String d0s0 = "root.vehicle.d0.s0";
+ private final String d0s1 = "root.vehicle.d0.s1";
+ private final String d0s2 = "root.vehicle.d0.s2";
+ private final String d0s3 = "root.vehicle.d0.s3";
+ private String insertTemplate = "INSERT INTO
root.vehicle.d0(timestamp,s0,s1,s2,s3,s4)"
+ + " VALUES(%d,%d,%d,%f,%s,%s)";
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ prepareData();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void test() throws SQLException, IOException, StartupException {
+ String[] retArray = new String[]{
+ "0,2",
+ "0,4",
+ "0,3"
+ };
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute(
+ "select count(temperature) from root.ln.wf01.wt01 where time > 3");
+
+ Assert.assertTrue(hasResultSet);
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet();) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
+ resultSet.getString(count(TEMPERATURE_STR));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(1, cnt);
+ }
+
+ hasResultSet = statement.execute(
+ "select min_time(temperature) from root.ln.wf01.wt01 where time >
3");
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
+ resultSet.getString(min_time(TEMPERATURE_STR));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(2, cnt);
+ }
+
+ hasResultSet = statement.execute(
+ "select min_time(temperature) from root.ln.wf01.wt01 where
temperature > 3");
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
+ resultSet.getString(min_time(TEMPERATURE_STR));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(3, cnt);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ try {
+ EnvironmentUtils.restartDaemon();
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ insertMoreData();
+ try {
+ EnvironmentUtils.restartDaemon();
+ } catch (Exception e) {
+ Assert.fail();
+ }
+
+ // maxminValueTest
+ retArray = new String[]{
+ "0,8499,500.0",
+ "0,2499,500.0"
+ };
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("select
max_value(s0),min_value(s2) " +
+ "from root.vehicle.d0 where time >= 100 and time < 9000");
+
+ Assert.assertTrue(hasResultSet);
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(max_value(d0s0))
+ + "," + resultSet.getString(min_value(d0s2));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(1, cnt);
+ }
+
+ hasResultSet = statement.execute("select max_value(s0),min_value(s2) " +
+ "from root.vehicle.d0 where time < 2500");
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(max_value(d0s0))
+ + "," + resultSet.getString(min_value(d0s2));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(2, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private void prepareData() throws SQLException {
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()) {
+ for (String sql : creationSqls) {
+ statement.execute(sql);
+ }
+
+ for (String sql : dataSet2) {
+ statement.execute(sql);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void insertMoreData() throws SQLException {
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()) {
+
+ // prepare BufferWrite file
+ for (int i = 5000; i < 7000; i++) {
+ statement.execute(String
+ .format(Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "\'"
+ i + "\'", "true"));
+ }
+ for (int i = 7500; i < 8500; i++) {
+ statement.execute(String
+ .format(Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "\'"
+ i + "\'", "false"));
+ }
+ // prepare Unseq-File
+ for (int i = 500; i < 1500; i++) {
+ statement.execute(String
+ .format(Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "\'"
+ i + "\'", "true"));
+ }
+ for (int i = 3000; i < 6500; i++) {
+ statement.execute(String
+ .format(Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "\'"
+ i + "\'", "false"));
+ }
+
+ // prepare BufferWrite cache
+ for (int i = 9000; i < 10000; i++) {
+ statement.execute(String
+ .format(Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "\'"
+ i + "\'", "true"));
+ }
+ // prepare Overflow cache
+ for (int i = 2000; i < 2500; i++) {
+ statement.execute(String
+ .format(Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "\'"
+ i + "\'", "false"));
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
index e0a657e..e0b84f0 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
@@ -31,6 +31,7 @@ import java.sql.Statement;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
+import org.junit.Assert;
import org.junit.Test;
public class IoTDBRestartIT {
@@ -41,7 +42,7 @@ public class IoTDBRestartIT {
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
- try(Connection connection = DriverManager
+ try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
"root");
Statement statement = connection.createStatement()){
@@ -49,18 +50,26 @@ public class IoTDBRestartIT {
statement.execute("flush");
}
- EnvironmentUtils.restartDaemon();
+ try {
+ EnvironmentUtils.restartDaemon();
+ } catch (Exception e) {
+ Assert.fail();
+ }
- try(Connection connection = DriverManager
+ try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
"root");
Statement statement = connection.createStatement()){
statement.execute("insert into root.turbine.d1(timestamp,s1)
values(2,1.0)");
}
- EnvironmentUtils.restartDaemon();
+ try {
+ EnvironmentUtils.restartDaemon();
+ } catch (Exception e) {
+ Assert.fail();
+ }
- try(Connection connection = DriverManager
+ try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
"root");
Statement statement = connection.createStatement()){
@@ -92,7 +101,7 @@ public class IoTDBRestartIT {
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
- try(Connection connection = DriverManager
+ try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
"root");
Statement statement = connection.createStatement()){
@@ -101,9 +110,13 @@ public class IoTDBRestartIT {
statement.execute("insert into root.turbine.d1(timestamp,s1)
values(3,3)");
}
- EnvironmentUtils.restartDaemon();
+ try {
+ EnvironmentUtils.restartDaemon();
+ } catch (Exception e) {
+ Assert.fail();
+ }
- try(Connection connection = DriverManager
+ try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
"root");
Statement statement = connection.createStatement()){
@@ -142,4 +155,123 @@ public class IoTDBRestartIT {
EnvironmentUtils.cleanEnv();
}
+
+ @Test
+ public void testRestartQueryLargerThanEndTime()
+ throws SQLException, ClassNotFoundException, IOException,
StorageEngineException {
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()){
+ statement.execute("insert into root.turbine.d1(timestamp,s1)
values(1,1)");
+ statement.execute("insert into root.turbine.d1(timestamp,s1)
values(2,2)");
+ }
+
+ try {
+ EnvironmentUtils.restartDaemon();
+ } catch (Exception e) {
+ Assert.fail();
+ }
+
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("insert into root.turbine.d1(timestamp,s1)
values(3,1)");
+ statement.execute("insert into root.turbine.d1(timestamp,s1)
values(4,2)");
+ }
+
+ try {
+ EnvironmentUtils.restartDaemon();
+ } catch (Exception e) {
+ Assert.fail();
+ }
+
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("SELECT s1 FROM root.turbine.d1
where time > 3");
+ assertTrue(hasResultSet);
+ String[] exp = new String[]{
+ "4,2.0",
+ };
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String result = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(2);
+ assertEquals(exp[cnt], result);
+ cnt++;
+ }
+ assertEquals(1, cnt);
+
+ }
+
+ EnvironmentUtils.cleanEnv();
+ }
+
+
+
+ @Test
+ public void testRestartEndTime()
+ throws SQLException, ClassNotFoundException, IOException,
StorageEngineException {
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()){
+ statement.execute("insert into root.turbine.d1(timestamp,s1)
values(1,1)");
+ statement.execute("insert into root.turbine.d1(timestamp,s1)
values(2,2)");
+ }
+
+ try {
+ EnvironmentUtils.restartDaemon();
+ } catch (Exception e) {
+ Assert.fail();
+ }
+
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("insert into root.turbine.d1(timestamp,s2)
values(1,1)");
+ statement.execute("insert into root.turbine.d1(timestamp,s2)
values(2,2)");
+ }
+
+ try {
+ EnvironmentUtils.restartDaemon();
+ } catch (Exception e) {
+ Assert.fail();
+ }
+
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("SELECT s2 FROM
root.turbine.d1");
+ assertTrue(hasResultSet);
+ String[] exp = new String[]{
+ "1,1.0",
+ "2,2.0"
+ };
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String result = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(2);
+ assertEquals(exp[cnt], result);
+ cnt++;
+ }
+ assertEquals(2, cnt);
+
+ }
+
+ EnvironmentUtils.cleanEnv();
+ }
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 6e1ff67..9ae753a 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -213,6 +213,12 @@ public class EnvironmentUtils {
}
}
+ public static void shutdownDaemon() throws Exception {
+ if(daemon != null) {
+ daemon.shutdown();
+ }
+ }
+
public static void activeDaemon() {
if(daemon != null) {
daemon.active();
@@ -228,8 +234,8 @@ public class EnvironmentUtils {
}
}
- public static void restartDaemon() {
- stopDaemon();
+ public static void restartDaemon() throws Exception {
+ shutdownDaemon();
reactiveDaemon();
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 48a064f..08aa3ad 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -63,7 +63,8 @@ public enum TSStatusCode {
READ_ONLY_SYSTEM_ERROR(502),
DISK_SPACE_INSUFFICIENT_ERROR(503),
START_UP_ERROR(504),
- MULTIPLE_ERROR(505),
+ SHUT_DOWN_ERROR(505),
+ MULTIPLE_ERROR(506),
WRONG_LOGIN_PASSWORD_ERROR(600),
NOT_LOGIN_ERROR(601),