This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 822b6dd [IOTDB-651] Last query bug (#1172)
822b6dd is described below
commit 822b6dd3c906fb7cb223958e90ab111e3bad938d
Author: Jackie Tien <[email protected]>
AuthorDate: Fri May 8 18:18:25 2020 +0800
[IOTDB-651] Last query bug (#1172)
* Last query bug and memory leak
---
.../org/apache/iotdb/db/service/TSServiceImpl.java | 110 +++++++++++++++------
1 file changed, 81 insertions(+), 29 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index bfce48f..69eb575 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -18,6 +18,24 @@
*/
package org.apache.iotdb.db.service;
+import static org.apache.iotdb.db.conf.IoTDBConfig.PATH_PATTERN;
+import static
org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType.TIMESERIES;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import org.antlr.v4.runtime.misc.ParseCancellationException;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -42,9 +60,19 @@ import org.apache.iotdb.db.qp.executor.IPlanExecutor;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
-import org.apache.iotdb.db.qp.physical.sys.*;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.NonAlignEngineDataSet;
@@ -55,7 +83,34 @@ import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.service.rpc.thrift.ServerProperties;
+import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
+import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
+import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
+import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
+import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
import
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -68,18 +123,6 @@ import org.apache.thrift.server.ServerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.sql.SQLException;
-import java.time.ZoneId;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
-import static org.apache.iotdb.db.conf.IoTDBConfig.PATH_PATTERN;
-import static
org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType.TIMESERIES;
-
/**
* Thrift RPC implementation at server side.
@@ -351,7 +394,8 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
}
} catch (Exception e) {
logger.error("{}: server Internal Error: ",
IoTDBConstant.GLOBAL_DB_NAME, e);
- return
RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
+ return RpcUtils
+ .getTSBatchExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_JDBC_BATCH,
t1);
}
@@ -393,12 +437,13 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
} catch (QueryInBatchStatementException e) {
logger.info("Error occurred when executing {}, query statement not
allowed: ", statement, e);
result.add(
- RpcUtils.getStatus(TSStatusCode.QUERY_NOT_ALLOWED, "query statement
not allowed: " + statement));
+ RpcUtils.getStatus(TSStatusCode.QUERY_NOT_ALLOWED,
+ "query statement not allowed: " + statement));
return false;
- } catch (Exception e) {
+ } catch (Exception e) {
logger.error("{}: server Internal Error: ",
IoTDBConstant.GLOBAL_DB_NAME, e);
result.add(RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR, "server Internal Error: " +
e.getMessage()));
+ TSStatusCode.INTERNAL_SERVER_ERROR, "server Internal Error: " +
e.getMessage()));
}
return true;
}
@@ -415,8 +460,9 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(statement,
sessionIdZoneIdMap.get(req.getSessionId()));
if (physicalPlan.isQuery()) {
- return internalExecuteQueryStatement(statement, req.statementId,
physicalPlan, req.fetchSize,
- sessionIdUsernameMap.get(req.getSessionId()));
+ return internalExecuteQueryStatement(statement, req.statementId,
physicalPlan,
+ req.fetchSize,
+ sessionIdUsernameMap.get(req.getSessionId()));
} else {
return executeUpdateStatement(physicalPlan, req.getSessionId());
}
@@ -475,7 +521,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
} catch (Exception e) {
logger.error("{}: server Internal Error: ",
IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getTSExecuteStatementResp(
- RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage()));
+ RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage()));
}
}
@@ -513,13 +559,16 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
// create and cache dataset
QueryDataSet newDataSet = createQueryDataSet(queryId, plan);
- if (plan instanceof QueryPlan && !((QueryPlan) plan).isAlignByTime() &&
newDataSet instanceof NonAlignEngineDataSet) {
+ if (plan instanceof QueryPlan && !((QueryPlan) plan).isAlignByTime()
+ && newDataSet instanceof NonAlignEngineDataSet) {
TSQueryNonAlignDataSet result = fillRpcNonAlignReturnData(fetchSize,
newDataSet, username);
resp.setNonAlignQueryDataSet(result);
} else {
if (plan instanceof ShowPlan && ((ShowPlan) plan).getShowContentType()
== TIMESERIES) {
-
resp.setColumns(newDataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()));
-
resp.setDataTypeList(newDataSet.getDataTypes().stream().map(Enum::toString).collect(Collectors.toList()));
+ resp.setColumns(
+
newDataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()));
+ resp.setDataTypeList(
+
newDataSet.getDataTypes().stream().map(Enum::toString).collect(Collectors.toList()));
}
TSQueryDataSet result = fillRpcReturnData(fetchSize, newDataSet,
username);
resp.setQueryDataSet(result);
@@ -640,7 +689,9 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
if (plan instanceof AlignByDevicePlan) {
getAlignByDeviceQueryHeaders((AlignByDevicePlan) plan, respColumns,
columnsTypes);
} else if (plan instanceof LastQueryPlan) {
- return StaticResps.LAST_RESP;
+ // Last Query should return different respond instead of the static one
+ // because the query dataset and query id is different although the
header of last query is same.
+ return StaticResps.LAST_RESP.deepCopy();
} else {
getWideQueryHeaders(plan, respColumns, columnsTypes);
resp.setColumnNameIndexMap(plan.getPathToIndex());
@@ -1153,7 +1204,8 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
QueryDataSetUtils.readTimesFromBuffer(req.timestampsList.get(i),
req.sizeList.get(i)));
insertTabletPlan.setColumns(
QueryDataSetUtils.readValuesFromBuffer(
- req.valuesList.get(i), req.typesList.get(i),
req.measurementsList.get(i).size(), req.sizeList.get(i)));
+ req.valuesList.get(i), req.typesList.get(i),
req.measurementsList.get(i).size(),
+ req.sizeList.get(i)));
insertTabletPlan.setRowCount(req.sizeList.get(i));
insertTabletPlan.setDataTypes(req.typesList.get(i));
@@ -1340,7 +1392,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
} catch (AuthException e) {
logger.error("meet error while checking authorization.", e);
return RpcUtils.getStatus(TSStatusCode.UNINITIALIZED_AUTH_ERROR,
e.getMessage());
- } catch (Exception e) {
+ } catch (Exception e) {
logger.error("{}: server Internal Error: ",
IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
}
@@ -1354,7 +1406,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
} catch (QueryProcessException e) {
logger.debug("meet error while processing non-query. ", e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
- } catch (Exception e) {
+ } catch (Exception e) {
logger.error("{}: server Internal Error: ",
IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
}