This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch DisableAlign
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit a52c63d5ab7be97bf4bdd009aa66c783a7f2b5ed
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Jan 14 15:56:58 2020 +0800

    s
---
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 210 +++++++++------------
 1 file changed, 86 insertions(+), 124 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3f4c4ca..78c0082 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -18,27 +18,6 @@
  */
 package org.apache.iotdb.db.service;
 
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Vector;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
 import org.antlr.v4.runtime.misc.ParseCancellationException;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -67,12 +46,7 @@ import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.NewEngineDataSetWithoutValueFilter;
@@ -81,34 +55,7 @@ import 
org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TSBatchInsertionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteInsertRowInBatchResp;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertInBatchReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.TSStatusType;
+import org.apache.iotdb.service.rpc.thrift.*;
 import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -121,6 +68,17 @@ import org.apache.thrift.server.ServerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.ZoneId;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.*;
+
 /**
  * Thrift RPC implementation at server side.
  */
@@ -196,7 +154,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
           return TSDataType.DOUBLE;
         default:
           throw new QueryProcessException(
-              "aggregate does not support " + aggrType + " function.");
+                  "aggregate does not support " + aggrType + " function.");
       }
     }
     return MManager.getInstance().getSeriesType(path);
@@ -205,7 +163,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
   @Override
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException 
{
     logger.info("{}: receive open session request from username {}", 
IoTDBConstant.GLOBAL_DB_NAME,
-        req.getUsername());
+            req.getUsername());
 
     boolean status;
     IAuthorizer authorizer;
@@ -232,10 +190,10 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       tsStatus = getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR);
     }
     TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus,
-        TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1);
+            TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1);
     resp.setSessionId(sessionId);
     logger.info("{}: Login status: {}. User : {}", 
IoTDBConstant.GLOBAL_DB_NAME,
-        tsStatus.getStatusType().getMessage(), req.getUsername());
+            tsStatus.getStatusType().getMessage(), req.getUsername());
 
     return resp;
   }
@@ -271,9 +229,9 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     }
     if (!exceptions.isEmpty()) {
       return new TSStatus(
-          getStatus(TSStatusCode.CLOSE_OPERATION_ERROR,
-              String.format("%d errors in closeOperation, see server logs for 
detail",
-                  exceptions.size())));
+              getStatus(TSStatusCode.CLOSE_OPERATION_ERROR,
+                      String.format("%d errors in closeOperation, see server 
logs for detail",
+                              exceptions.size())));
     }
 
     return new TSStatus(tsStatus);
@@ -375,7 +333,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       }
     } catch (QueryProcessException | MetadataException | OutOfMemoryError e) {
       logger
-          .error(String.format("Failed to fetch timeseries %s's metadata", 
req.getColumnPath()), e);
+              .error(String.format("Failed to fetch timeseries %s's metadata", 
req.getColumnPath()), e);
       status = getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
       resp.setStatus(status);
       return resp;
@@ -417,7 +375,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     switch (statement) {
       case "merge":
         StorageEngine.getInstance()
-            
.mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+                
.mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
         return true;
       case "full merge":
         StorageEngine.getInstance().mergeAll(true);
@@ -463,16 +421,16 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       for (String statement : statements) {
         long t2 = System.currentTimeMillis();
         isAllSuccessful =
-            executeStatementInBatch(statement, batchErrorMessage, result,
-                req.getSessionId()) && isAllSuccessful;
+                executeStatementInBatch(statement, batchErrorMessage, result,
+                        req.getSessionId()) && isAllSuccessful;
         
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, 
t2);
       }
       if (isAllSuccessful) {
         return 
getTSBatchExecuteStatementResp(getStatus(TSStatusCode.SUCCESS_STATUS,
-            "Execute batch statements successfully"), result);
+                "Execute batch statements successfully"), result);
       } else {
         return 
getTSBatchExecuteStatementResp(getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
-            batchErrorMessage.toString()), result);
+                batchErrorMessage.toString()), result);
       }
     } finally {
       Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_JDBC_BATCH, 
t1);
@@ -482,16 +440,16 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
   // execute one statement of a batch. Currently, query is not allowed in a 
batch statement and
   // on finding queries in a batch, such query will be ignored and an error 
will be generated
   private boolean executeStatementInBatch(String statement, StringBuilder 
batchErrorMessage,
-      List<Integer> result, long sessionId) {
+                                          List<Integer> result, long 
sessionId) {
     try {
       PhysicalPlan physicalPlan = processor
-          .parseSQLToPhysicalPlan(statement, 
sessionIdZoneIdMap.get(sessionId));
+              .parseSQLToPhysicalPlan(statement, 
sessionIdZoneIdMap.get(sessionId));
       if (physicalPlan.isQuery()) {
         throw new QueryInBatchStatementException(statement);
       }
       TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan, 
sessionId);
       if (resp.getStatus().getStatusType().getCode() == 
TSStatusCode.SUCCESS_STATUS
-          .getStatusCode()) {
+              .getStatusCode()) {
         result.add(Statement.SUCCESS_NO_INFO);
       } else {
         result.add(Statement.EXECUTE_FAILED);
@@ -510,8 +468,8 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       return false;
     } catch (QueryProcessException e) {
       logger.info(
-          "Error occurred when executing {}, meet error while parsing SQL to 
physical plan: {}",
-          statement, e.getMessage());
+              "Error occurred when executing {}, meet error while parsing SQL 
to physical plan: {}",
+              statement, e.getMessage());
       result.add(Statement.EXECUTE_FAILED);
       
batchErrorMessage.append(TSStatusCode.SQL_PARSE_ERROR.getStatusCode()).append("\n");
       return false;
@@ -539,13 +497,13 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
 
       if (execAdminCommand(statement, req.getSessionId())) {
         return getTSExecuteStatementResp(
-            getStatus(TSStatusCode.SUCCESS_STATUS, "ADMIN_COMMAND_SUCCESS"));
+                getStatus(TSStatusCode.SUCCESS_STATUS, 
"ADMIN_COMMAND_SUCCESS"));
       }
       PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statement,
-          sessionIdZoneIdMap.get(req.getSessionId()));
+              sessionIdZoneIdMap.get(req.getSessionId()));
       if (physicalPlan.isQuery()) {
         resp = executeQueryStatement(req.statementId, physicalPlan, 
req.fetchSize,
-            sessionIdUsernameMap.get(req.getSessionId()));
+                sessionIdUsernameMap.get(req.getSessionId()));
         long endTime = System.currentTimeMillis();
         sqlArgument = new SqlArgument(resp, physicalPlan, statement, 
startTime, endTime);
         sqlArgumentsList.add(sqlArgument);
@@ -562,15 +520,15 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     } catch (SQLParserException e) {
       logger.error("check metadata error: ", e);
       return getTSExecuteStatementResp(getStatus(TSStatusCode.METADATA_ERROR,
-          "Check metadata error: " + e.getMessage()));
+              "Check metadata error: " + e.getMessage()));
     } catch (QueryProcessException e) {
       logger.info(ERROR_PARSING_SQL, e.getMessage());
       return getTSExecuteStatementResp(getStatus(TSStatusCode.SQL_PARSE_ERROR,
-          "Statement format is not right: " + e.getMessage()));
+              "Statement format is not right: " + e.getMessage()));
     } catch (StorageEngineException e) {
       logger.info(ERROR_PARSING_SQL, e.getMessage());
       return 
getTSExecuteStatementResp(getStatus(TSStatusCode.READ_ONLY_SYSTEM_ERROR,
-          e.getMessage()));
+              e.getMessage()));
     }
   }
 
@@ -579,7 +537,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
    * AuthorPlan
    */
   private TSExecuteStatementResp executeQueryStatement(long statementId, 
PhysicalPlan plan,
-      int fetchSize, String username) {
+                                                       int fetchSize, String 
username) {
     long t1 = System.currentTimeMillis();
     try {
       TSExecuteStatementResp resp; // column headers
@@ -627,7 +585,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     } catch (Exception e) {
       logger.error("{}: Internal server error: ", 
IoTDBConstant.GLOBAL_DB_NAME, e);
       return getTSExecuteStatementResp(
-          getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
+              getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
     } finally {
       Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, t1);
     }
@@ -644,7 +602,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     PhysicalPlan physicalPlan;
     try {
       physicalPlan = processor
-          .parseSQLToPhysicalPlan(statement, 
sessionIdZoneIdMap.get(req.getSessionId()));
+              .parseSQLToPhysicalPlan(statement, 
sessionIdZoneIdMap.get(req.getSessionId()));
     } catch (QueryProcessException | SQLParserException e) {
       logger.info(ERROR_PARSING_SQL, e.getMessage());
       return getTSExecuteStatementResp(getStatus(TSStatusCode.SQL_PARSE_ERROR, 
e.getMessage()));
@@ -652,14 +610,14 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
 
     if (!physicalPlan.isQuery()) {
       return 
getTSExecuteStatementResp(getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
-          "Statement is not a query statement."));
+              "Statement is not a query statement."));
     }
     return executeQueryStatement(req.statementId, physicalPlan, req.fetchSize,
-        sessionIdUsernameMap.get(req.getSessionId()));
+            sessionIdUsernameMap.get(req.getSessionId()));
   }
 
   private TSExecuteStatementResp getShowQueryColumnHeaders(ShowPlan showPlan)
-      throws QueryProcessException {
+          throws QueryProcessException {
     switch (showPlan.getShowContentType()) {
       case TTL:
         return StaticResps.TTL_RESP;
@@ -686,7 +644,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       default:
         logger.error("Unsupported show content type: {}", 
showPlan.getShowContentType());
         throw new QueryProcessException(
-            "Unsupported show content type:" + showPlan.getShowContentType());
+                "Unsupported show content type:" + 
showPlan.getShowContentType());
     }
   }
 
@@ -705,7 +663,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
         return StaticResps.LIST_USER_PRIVILEGE_RESP;
       default:
         return 
getTSExecuteStatementResp(getStatus(TSStatusCode.SQL_PARSE_ERROR,
-            String.format("%s is not an auth query", 
authorPlan.getAuthorType())));
+                String.format("%s is not an auth query", 
authorPlan.getAuthorType())));
     }
   }
 
@@ -714,7 +672,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
    * get ResultSet schema
    */
   private TSExecuteStatementResp getQueryColumnHeaders(PhysicalPlan 
physicalPlan, String username)
-      throws AuthException, TException, QueryProcessException {
+          throws AuthException, TException, QueryProcessException {
 
     List<String> respColumns = new ArrayList<>();
     List<String> columnsTypes = new ArrayList<>();
@@ -722,7 +680,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     // check permissions
     if (!checkAuthorization(physicalPlan.getPaths(), physicalPlan, username)) {
       return 
getTSExecuteStatementResp(getStatus(TSStatusCode.NO_PERMISSION_ERROR,
-          "No permissions for this operation " + 
physicalPlan.getOperatorType()));
+              "No permissions for this operation " + 
physicalPlan.getOperatorType()));
     }
 
     TSExecuteStatementResp resp = 
getTSExecuteStatementResp(getStatus(TSStatusCode.SUCCESS_STATUS));
@@ -734,7 +692,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       // set dataTypeList in TSExecuteStatementResp. Note this is without 
deduplication.
       resp.setColumns(respColumns);
       resp.setDataTypeList(columnsTypes);
-    } 
+    }
     else {
       getWideQueryHeaders(plan, respColumns, columnsTypes);
       resp.setColumns(respColumns);
@@ -745,7 +703,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
 
   // wide means not group by device
   private void getWideQueryHeaders(QueryPlan plan, List<String> respColumns,
-      List<String> columnTypes) throws TException, QueryProcessException {
+                                   List<String> columnTypes) throws 
TException, QueryProcessException {
     // Restore column header of aggregate to func(column_name), only
     // support single aggregate function for now
     List<Path> paths = plan.getPaths();
@@ -778,7 +736,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
   }
 
   private void getGroupByDeviceQueryHeaders(QueryPlan plan, List<String> 
respColumns,
-      List<String> columnTypes) {
+                                            List<String> columnTypes) {
     // set columns in TSExecuteStatementResp. Note this is without 
deduplication.
     List<String> measurementColumns = plan.getMeasurements();
     respColumns.add(SQLConstant.GROUPBY_DEVICE_COLUMN_NAME);
@@ -823,19 +781,19 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
 
       if (!queryId2DataSet.containsKey(req.queryId)) {
         return getTSFetchResultsResp(
-            getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed 
query"));
+                getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not 
executed query"));
       }
 
       QueryDataSet queryDataSet = queryId2DataSet.get(req.queryId);
       if (req.isAlign) {
         TSQueryDataSet result = fillRpcReturnData(req.fetchSize, queryDataSet,
-            sessionIdUsernameMap.get(req.sessionId));
+                sessionIdUsernameMap.get(req.sessionId));
         boolean hasResultSet = result.bufferForTime().limit() != 0;
         if (!hasResultSet) {
           queryId2DataSet.remove(req.queryId);
         }
         TSFetchResultsResp resp = 
getTSFetchResultsResp(getStatus(TSStatusCode.SUCCESS_STATUS,
-            "FetchResult successfully. Has more result: " + hasResultSet));
+                "FetchResult successfully. Has more result: " + hasResultSet));
         resp.setHasResultSet(hasResultSet);
         resp.setQueryDataSet(result);
         resp.setIsAlign(true);
@@ -843,13 +801,19 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       }
       else {
         TSQueryNonAlignDataSet nonAlignResult = 
fillRpcNonAlignReturnData(req.fetchSize, queryDataSet,
-            sessionIdUsernameMap.get(req.sessionId));
-        boolean hasResultSet = nonAlignResult.getTimeList().get(0).limit() != 
0;
+                sessionIdUsernameMap.get(req.sessionId));
+        boolean hasResultSet = false;
+        for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) {
+          if (timeBuffer.limit() != 0) {
+            hasResultSet = true;
+            break;
+          }
+        }
         if (!hasResultSet) {
           queryId2DataSet.remove(req.queryId);
         }
         TSFetchResultsResp resp = 
getTSFetchResultsResp(getStatus(TSStatusCode.SUCCESS_STATUS,
-            "FetchResult successfully. Has more result: " + hasResultSet));
+                "FetchResult successfully. Has more result: " + hasResultSet));
         resp.setHasResultSet(hasResultSet);
         resp.setNonAlignQueryDataSet(nonAlignResult);
         resp.setIsAlign(false);
@@ -877,7 +841,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
         encoder = new GroupedLSBWatermarkEncoder(config);
       } else {
         throw new UnSupportedDataTypeException(String.format(
-            "Watermark method is not supported yet: %s", 
config.getWatermarkMethodName()));
+                "Watermark method is not supported yet: %s", 
config.getWatermarkMethodName()));
       }
       if (queryDataSet instanceof NewEngineDataSetWithoutValueFilter) {
         // optimize for query without value filter
@@ -895,9 +859,9 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     }
     return result;
   }
-  
-  private TSQueryNonAlignDataSet fillRpcNonAlignReturnData(int fetchSize, 
QueryDataSet queryDataSet, 
-      String userName) throws TException, AuthException, IOException, 
InterruptedException {
+
+  private TSQueryNonAlignDataSet fillRpcNonAlignReturnData(int fetchSize, 
QueryDataSet queryDataSet,
+                                                           String userName) 
throws TException, AuthException, IOException, InterruptedException {
     IAuthorizer authorizer;
     try {
       authorizer = LocalFileAuthorizer.getInstance();
@@ -905,30 +869,30 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       throw new TException(e);
     }
     TSQueryNonAlignDataSet result;
-    
+
     if (config.isEnableWatermark() && authorizer.isUserUseWaterMark(userName)) 
{
       WatermarkEncoder encoder;
       if 
(config.getWatermarkMethodName().equals(IoTDBConfig.WATERMARK_GROUPED_LSB)) {
         encoder = new GroupedLSBWatermarkEncoder(config);
       } else {
         throw new UnSupportedDataTypeException(String.format(
-            "Watermark method is not supported yet: %s", 
config.getWatermarkMethodName()));
+                "Watermark method is not supported yet: %s", 
config.getWatermarkMethodName()));
       }
       result = ((NonAlignEngineDataSet) queryDataSet).fillBuffer(fetchSize, 
encoder);
     } else {
       result = ((NonAlignEngineDataSet) queryDataSet).fillBuffer(fetchSize, 
null);
     }
     return result;
-    
+
   }
-  
-  
+
+
 
   /**
    * create QueryDataSet and buffer it for fetchResults
    */
   private QueryDataSet createQueryDataSet(long queryId, PhysicalPlan 
physicalPlan) throws
-      QueryProcessException, QueryFilterOptimizationException, 
StorageEngineException, IOException, MetadataException, SQLException {
+          QueryProcessException, QueryFilterOptimizationException, 
StorageEngineException, IOException, MetadataException, SQLException {
 
     QueryContext context = new QueryContext(queryId);
     QueryDataSet queryDataSet = 
processor.getExecutor().processQuery(physicalPlan, context);
@@ -948,7 +912,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     } catch (Exception e) {
       logger.error("{}: server Internal Error: ", 
IoTDBConstant.GLOBAL_DB_NAME, e);
       return getTSExecuteStatementResp(
-          getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
+              getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
     }
   }
 
@@ -968,7 +932,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
   private boolean executeNonQuery(PhysicalPlan plan) throws 
QueryProcessException {
     if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
       throw new QueryProcessException(
-          "Current system mode is read-only, does not support non-query 
operation");
+              "Current system mode is read-only, does not support non-query 
operation");
     }
     return processor.getExecutor().processNonQuery(plan);
   }
@@ -985,7 +949,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
 
     if (physicalPlan.isQuery()) {
       return 
getTSExecuteStatementResp(getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
-          "Statement is a query statement."));
+              "Statement is a query statement."));
     }
 
     return executeUpdateStatement(physicalPlan, sessionId);
@@ -1001,7 +965,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
   }
 
   private boolean checkAuthorization(List<Path> paths, PhysicalPlan plan, 
String username)
-      throws AuthException {
+          throws AuthException {
     String targetUser = null;
     if (plan instanceof AuthorPlan) {
       targetUser = ((AuthorPlan) plan).getUserName();
@@ -1017,7 +981,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
   }
 
   private TSExecuteBatchStatementResp getTSBatchExecuteStatementResp(TSStatus 
status,
-      List<Integer> result) {
+                                                                     
List<Integer> result) {
     TSExecuteBatchStatementResp resp = new TSExecuteBatchStatementResp();
     TSStatus tsStatus = new TSStatus(status);
     resp.setStatus(tsStatus);
@@ -1077,7 +1041,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     
properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MAX_TIME);
     
properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MIN_TIME);
     properties
-        
.setTimestampPrecision(IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision());
+            
.setTimestampPrecision(IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision());
     return properties;
   }
 
@@ -1186,10 +1150,8 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       BatchInsertPlan batchInsertPlan = new BatchInsertPlan(req.deviceId, 
req.measurements);
       
batchInsertPlan.setTimes(QueryDataSetUtils.readTimesFromBuffer(req.timestamps, 
req.size));
       batchInsertPlan.setColumns(QueryDataSetUtils
-          .readValuesFromBuffer(req.values, req.types, 
req.measurements.size(), req.size));
+              .readValuesFromBuffer(req.values, req.types, 
req.measurements.size(), req.size));
       batchInsertPlan.setRowCount(req.size);
-      batchInsertPlan.setTimeBuffer(req.timestamps);
-      batchInsertPlan.setValueBuffer(req.values);
       batchInsertPlan.setDataTypes(req.types);
 
       boolean isAllSuccessful = true;
@@ -1209,16 +1171,16 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       if (isAllSuccessful) {
         logger.debug("Insert one RowBatch successfully");
         return 
getTSBatchExecuteStatementResp(getStatus(TSStatusCode.SUCCESS_STATUS),
-            Arrays.asList(results));
+                Arrays.asList(results));
       } else {
         logger.debug("Insert one RowBatch failed!");
         return 
getTSBatchExecuteStatementResp(getStatus(TSStatusCode.INTERNAL_SERVER_ERROR),
-            Arrays.asList(results));
+                Arrays.asList(results));
       }
     } catch (Exception e) {
       logger.info("{}: error occurs when executing statements", 
IoTDBConstant.GLOBAL_DB_NAME, e);
       return getTSBatchExecuteStatementResp(
-          getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()), 
null);
+              getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()), 
null);
     } finally {
       
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, 
t1);
     }
@@ -1264,8 +1226,8 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       return getStatus(TSStatusCode.NOT_LOGIN_ERROR);
     }
     CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan(new 
Path(req.getPath()),
-        TSDataType.values()[req.getDataType()], 
TSEncoding.values()[req.getEncoding()],
-        CompressionType.values()[req.compressor], new HashMap<>());
+            TSDataType.values()[req.getDataType()], 
TSEncoding.values()[req.getEncoding()],
+            CompressionType.values()[req.compressor], new HashMap<>());
     TSStatus status = checkAuthority(plan, req.getSessionId());
     if (status != null) {
       return new TSStatus(status);
@@ -1303,7 +1265,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     try {
       if (!checkAuthorization(paths, plan, 
sessionIdUsernameMap.get(sessionId))) {
         return getStatus(TSStatusCode.NO_PERMISSION_ERROR,
-            "No permissions for this operation " + 
plan.getOperatorType().toString());
+                "No permissions for this operation " + 
plan.getOperatorType().toString());
       }
     } catch (AuthException e) {
       logger.error("meet error while checking authorization.", e);
@@ -1322,7 +1284,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     }
 
     return execRet ? getStatus(TSStatusCode.SUCCESS_STATUS, "Execute 
successfully")
-        : getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
+            : getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
   }
 
   private long generateQueryId(boolean isDataQuery) {

Reply via email to