This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 822b6dd  [IOTDB-651] Last query bug (#1172)
822b6dd is described below

commit 822b6dd3c906fb7cb223958e90ab111e3bad938d
Author: Jackie Tien <[email protected]>
AuthorDate: Fri May 8 18:18:25 2020 +0800

    [IOTDB-651] Last query bug (#1172)
    
    * Last query bug and memory leak
---
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 110 +++++++++++++++------
 1 file changed, 81 insertions(+), 29 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index bfce48f..69eb575 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -18,6 +18,24 @@
  */
 package org.apache.iotdb.db.service;
 
+import static org.apache.iotdb.db.conf.IoTDBConfig.PATH_PATTERN;
+import static 
org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType.TIMESERIES;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import org.antlr.v4.runtime.misc.ParseCancellationException;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -42,9 +60,19 @@ import org.apache.iotdb.db.qp.executor.IPlanExecutor;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
-import org.apache.iotdb.db.qp.physical.sys.*;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.NonAlignEngineDataSet;
@@ -55,7 +83,34 @@ import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.service.rpc.thrift.ServerProperties;
+import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
+import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
+import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
+import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
+import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -68,18 +123,6 @@ import org.apache.thrift.server.ServerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.sql.SQLException;
-import java.time.ZoneId;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
-import static org.apache.iotdb.db.conf.IoTDBConfig.PATH_PATTERN;
-import static 
org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType.TIMESERIES;
-
 
 /**
  * Thrift RPC implementation at server side.
@@ -351,7 +394,8 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       }
     } catch (Exception e) {
       logger.error("{}: server Internal Error: ", 
IoTDBConstant.GLOBAL_DB_NAME, e);
-      return 
RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage());
+      return RpcUtils
+          .getTSBatchExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage());
     } finally {
       Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_JDBC_BATCH, 
t1);
     }
@@ -393,12 +437,13 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     } catch (QueryInBatchStatementException e) {
       logger.info("Error occurred when executing {}, query statement not 
allowed: ", statement, e);
       result.add(
-          RpcUtils.getStatus(TSStatusCode.QUERY_NOT_ALLOWED, "query statement 
not allowed: " + statement));
+          RpcUtils.getStatus(TSStatusCode.QUERY_NOT_ALLOWED,
+              "query statement not allowed: " + statement));
       return false;
-    }  catch (Exception e) {
+    } catch (Exception e) {
       logger.error("{}: server Internal Error: ", 
IoTDBConstant.GLOBAL_DB_NAME, e);
       result.add(RpcUtils.getStatus(
-              TSStatusCode.INTERNAL_SERVER_ERROR, "server Internal Error: " + 
e.getMessage()));
+          TSStatusCode.INTERNAL_SERVER_ERROR, "server Internal Error: " + 
e.getMessage()));
     }
     return true;
   }
@@ -415,8 +460,9 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       PhysicalPlan physicalPlan =
           processor.parseSQLToPhysicalPlan(statement, 
sessionIdZoneIdMap.get(req.getSessionId()));
       if (physicalPlan.isQuery()) {
-        return internalExecuteQueryStatement(statement, req.statementId, 
physicalPlan, req.fetchSize,
-                sessionIdUsernameMap.get(req.getSessionId()));
+        return internalExecuteQueryStatement(statement, req.statementId, 
physicalPlan,
+            req.fetchSize,
+            sessionIdUsernameMap.get(req.getSessionId()));
       } else {
         return executeUpdateStatement(physicalPlan, req.getSessionId());
       }
@@ -475,7 +521,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     } catch (Exception e) {
       logger.error("{}: server Internal Error: ", 
IoTDBConstant.GLOBAL_DB_NAME, e);
       return RpcUtils.getTSExecuteStatementResp(
-              RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage()));
+          RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage()));
     }
   }
 
@@ -513,13 +559,16 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
 
       // create and cache dataset
       QueryDataSet newDataSet = createQueryDataSet(queryId, plan);
-      if (plan instanceof QueryPlan && !((QueryPlan) plan).isAlignByTime() && 
newDataSet instanceof NonAlignEngineDataSet) {
+      if (plan instanceof QueryPlan && !((QueryPlan) plan).isAlignByTime()
+          && newDataSet instanceof NonAlignEngineDataSet) {
         TSQueryNonAlignDataSet result = fillRpcNonAlignReturnData(fetchSize, 
newDataSet, username);
         resp.setNonAlignQueryDataSet(result);
       } else {
         if (plan instanceof ShowPlan && ((ShowPlan) plan).getShowContentType() 
== TIMESERIES) {
-          
resp.setColumns(newDataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()));
-          
resp.setDataTypeList(newDataSet.getDataTypes().stream().map(Enum::toString).collect(Collectors.toList()));
+          resp.setColumns(
+              
newDataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()));
+          resp.setDataTypeList(
+              
newDataSet.getDataTypes().stream().map(Enum::toString).collect(Collectors.toList()));
         }
         TSQueryDataSet result = fillRpcReturnData(fetchSize, newDataSet, 
username);
         resp.setQueryDataSet(result);
@@ -640,7 +689,9 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     if (plan instanceof AlignByDevicePlan) {
       getAlignByDeviceQueryHeaders((AlignByDevicePlan) plan, respColumns, 
columnsTypes);
     } else if (plan instanceof LastQueryPlan) {
-      return StaticResps.LAST_RESP;
+      // Last Query should return different respond instead of the static one
+      // because the query dataset and query id is different although the 
header of last query is same.
+      return StaticResps.LAST_RESP.deepCopy();
     } else {
       getWideQueryHeaders(plan, respColumns, columnsTypes);
       resp.setColumnNameIndexMap(plan.getPathToIndex());
@@ -1153,7 +1204,8 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
             QueryDataSetUtils.readTimesFromBuffer(req.timestampsList.get(i), 
req.sizeList.get(i)));
         insertTabletPlan.setColumns(
             QueryDataSetUtils.readValuesFromBuffer(
-                req.valuesList.get(i), req.typesList.get(i), 
req.measurementsList.get(i).size(), req.sizeList.get(i)));
+                req.valuesList.get(i), req.typesList.get(i), 
req.measurementsList.get(i).size(),
+                req.sizeList.get(i)));
         insertTabletPlan.setRowCount(req.sizeList.get(i));
         insertTabletPlan.setDataTypes(req.typesList.get(i));
 
@@ -1340,7 +1392,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     } catch (AuthException e) {
       logger.error("meet error while checking authorization.", e);
       return RpcUtils.getStatus(TSStatusCode.UNINITIALIZED_AUTH_ERROR, 
e.getMessage());
-    }  catch (Exception e) {
+    } catch (Exception e) {
       logger.error("{}: server Internal Error: ", 
IoTDBConstant.GLOBAL_DB_NAME, e);
       return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage());
     }
@@ -1354,7 +1406,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     } catch (QueryProcessException e) {
       logger.debug("meet error while processing non-query. ", e);
       return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
-    }  catch (Exception e) {
+    } catch (Exception e) {
       logger.error("{}: server Internal Error: ", 
IoTDBConstant.GLOBAL_DB_NAME, e);
       return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage());
     }

Reply via email to