This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch DisableAlign in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit a52c63d5ab7be97bf4bdd009aa66c783a7f2b5ed Author: JackieTien97 <[email protected]> AuthorDate: Tue Jan 14 15:56:58 2020 +0800 s --- .../org/apache/iotdb/db/service/TSServiceImpl.java | 210 +++++++++------------ 1 file changed, 86 insertions(+), 124 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 3f4c4ca..78c0082 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -18,27 +18,6 @@ */ package org.apache.iotdb.db.service; -import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE; -import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE; -import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP; -import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL; -import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER; - -import java.io.IOException; -import java.sql.SQLException; -import java.sql.Statement; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Vector; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; import org.antlr.v4.runtime.misc.ParseCancellationException; import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.auth.AuthorityChecker; @@ -67,12 +46,7 @@ import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; -import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; -import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; -import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan; -import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan; -import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; -import org.apache.iotdb.db.qp.physical.sys.ShowPlan; +import org.apache.iotdb.db.qp.physical.sys.*; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.dataset.NewEngineDataSetWithoutValueFilter; @@ -81,34 +55,7 @@ import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder; import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; import org.apache.iotdb.db.utils.QueryDataSetUtils; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.iotdb.service.rpc.thrift.ServerProperties; -import org.apache.iotdb.service.rpc.thrift.TSBatchInsertionReq; -import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq; -import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq; -import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq; -import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq; -import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq; -import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq; -import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp; -import org.apache.iotdb.service.rpc.thrift.TSExecuteInsertRowInBatchResp; -import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq; -import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; -import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq; -import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp; -import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq; -import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp; -import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp; -import org.apache.iotdb.service.rpc.thrift.TSIService; -import org.apache.iotdb.service.rpc.thrift.TSInsertInBatchReq; -import org.apache.iotdb.service.rpc.thrift.TSInsertReq; -import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq; -import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp; -import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion; -import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; -import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet; -import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq; -import org.apache.iotdb.service.rpc.thrift.TSStatus; -import org.apache.iotdb.service.rpc.thrift.TSStatusType; +import org.apache.iotdb.service.rpc.thrift.*; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; @@ -121,6 +68,17 @@ import org.apache.thrift.server.ServerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.ZoneId; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.iotdb.db.conf.IoTDBConstant.*; + /** * Thrift RPC implementation at server side. */ @@ -196,7 +154,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return TSDataType.DOUBLE; default: throw new QueryProcessException( - "aggregate does not support " + aggrType + " function."); + "aggregate does not support " + aggrType + " function."); } } return MManager.getInstance().getSeriesType(path); @@ -205,7 +163,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { @Override public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException { logger.info("{}: receive open session request from username {}", IoTDBConstant.GLOBAL_DB_NAME, - req.getUsername()); + req.getUsername()); boolean status; IAuthorizer authorizer; @@ -232,10 +190,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { tsStatus = getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR); } TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, - TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1); + TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1); resp.setSessionId(sessionId); logger.info("{}: Login status: {}. User : {}", IoTDBConstant.GLOBAL_DB_NAME, - tsStatus.getStatusType().getMessage(), req.getUsername()); + tsStatus.getStatusType().getMessage(), req.getUsername()); return resp; } @@ -271,9 +229,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } if (!exceptions.isEmpty()) { return new TSStatus( - getStatus(TSStatusCode.CLOSE_OPERATION_ERROR, - String.format("%d errors in closeOperation, see server logs for detail", - exceptions.size()))); + getStatus(TSStatusCode.CLOSE_OPERATION_ERROR, + String.format("%d errors in closeOperation, see server logs for detail", + exceptions.size()))); } return new TSStatus(tsStatus); @@ -375,7 +333,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } } catch (QueryProcessException | MetadataException | OutOfMemoryError e) { logger - .error(String.format("Failed to fetch timeseries %s's metadata", req.getColumnPath()), e); + .error(String.format("Failed to fetch timeseries %s's metadata", req.getColumnPath()), e); status = getStatus(TSStatusCode.METADATA_ERROR, e.getMessage()); resp.setStatus(status); return resp; @@ -417,7 +375,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { switch (statement) { case "merge": StorageEngine.getInstance() - .mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge()); + .mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge()); return true; case "full merge": StorageEngine.getInstance().mergeAll(true); @@ -463,16 +421,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { for (String statement : statements) { long t2 = System.currentTimeMillis(); isAllSuccessful = - executeStatementInBatch(statement, batchErrorMessage, result, - req.getSessionId()) && isAllSuccessful; + executeStatementInBatch(statement, batchErrorMessage, result, + req.getSessionId()) && isAllSuccessful; Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2); } if (isAllSuccessful) { return getTSBatchExecuteStatementResp(getStatus(TSStatusCode.SUCCESS_STATUS, - "Execute batch statements successfully"), result); + "Execute batch statements successfully"), result); } else { return getTSBatchExecuteStatementResp(getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, - batchErrorMessage.toString()), result); + batchErrorMessage.toString()), result); } } finally { Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_JDBC_BATCH, t1); @@ -482,16 +440,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { // execute one statement of a batch. Currently, query is not allowed in a batch statement and // on finding queries in a batch, such query will be ignored and an error will be generated private boolean executeStatementInBatch(String statement, StringBuilder batchErrorMessage, - List<Integer> result, long sessionId) { + List<Integer> result, long sessionId) { try { PhysicalPlan physicalPlan = processor - .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(sessionId)); + .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(sessionId)); if (physicalPlan.isQuery()) { throw new QueryInBatchStatementException(statement); } TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan, sessionId); if (resp.getStatus().getStatusType().getCode() == TSStatusCode.SUCCESS_STATUS - .getStatusCode()) { + .getStatusCode()) { result.add(Statement.SUCCESS_NO_INFO); } else { result.add(Statement.EXECUTE_FAILED); @@ -510,8 +468,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return false; } catch (QueryProcessException e) { logger.info( - "Error occurred when executing {}, meet error while parsing SQL to physical plan: {}", - statement, e.getMessage()); + "Error occurred when executing {}, meet error while parsing SQL to physical plan: {}", + statement, e.getMessage()); result.add(Statement.EXECUTE_FAILED); batchErrorMessage.append(TSStatusCode.SQL_PARSE_ERROR.getStatusCode()).append("\n"); return false; @@ -539,13 +497,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { if (execAdminCommand(statement, req.getSessionId())) { return getTSExecuteStatementResp( - getStatus(TSStatusCode.SUCCESS_STATUS, "ADMIN_COMMAND_SUCCESS")); + getStatus(TSStatusCode.SUCCESS_STATUS, "ADMIN_COMMAND_SUCCESS")); } PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statement, - sessionIdZoneIdMap.get(req.getSessionId())); + sessionIdZoneIdMap.get(req.getSessionId())); if (physicalPlan.isQuery()) { resp = executeQueryStatement(req.statementId, physicalPlan, req.fetchSize, - sessionIdUsernameMap.get(req.getSessionId())); + sessionIdUsernameMap.get(req.getSessionId())); long endTime = System.currentTimeMillis(); sqlArgument = new SqlArgument(resp, physicalPlan, statement, startTime, endTime); sqlArgumentsList.add(sqlArgument); @@ -562,15 +520,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } catch (SQLParserException e) { logger.error("check metadata error: ", e); return getTSExecuteStatementResp(getStatus(TSStatusCode.METADATA_ERROR, - "Check metadata error: " + e.getMessage())); + "Check metadata error: " + e.getMessage())); } catch (QueryProcessException e) { logger.info(ERROR_PARSING_SQL, e.getMessage()); return getTSExecuteStatementResp(getStatus(TSStatusCode.SQL_PARSE_ERROR, - "Statement format is not right: " + e.getMessage())); + "Statement format is not right: " + e.getMessage())); } catch (StorageEngineException e) { logger.info(ERROR_PARSING_SQL, e.getMessage()); return getTSExecuteStatementResp(getStatus(TSStatusCode.READ_ONLY_SYSTEM_ERROR, - e.getMessage())); + e.getMessage())); } } @@ -579,7 +537,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { * AuthorPlan */ private TSExecuteStatementResp executeQueryStatement(long statementId, PhysicalPlan plan, - int fetchSize, String username) { + int fetchSize, String username) { long t1 = System.currentTimeMillis(); try { TSExecuteStatementResp resp; // column headers @@ -627,7 +585,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } catch (Exception e) { logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e); return getTSExecuteStatementResp( - getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage())); + getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage())); } finally { Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, t1); } @@ -644,7 +602,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { PhysicalPlan physicalPlan; try { physicalPlan = processor - .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId())); + .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId())); } catch (QueryProcessException | SQLParserException e) { logger.info(ERROR_PARSING_SQL, e.getMessage()); return getTSExecuteStatementResp(getStatus(TSStatusCode.SQL_PARSE_ERROR, e.getMessage())); @@ -652,14 +610,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { if (!physicalPlan.isQuery()) { return getTSExecuteStatementResp(getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, - "Statement is not a query statement.")); + "Statement is not a query statement.")); } return executeQueryStatement(req.statementId, physicalPlan, req.fetchSize, - sessionIdUsernameMap.get(req.getSessionId())); + sessionIdUsernameMap.get(req.getSessionId())); } private TSExecuteStatementResp getShowQueryColumnHeaders(ShowPlan showPlan) - throws QueryProcessException { + throws QueryProcessException { switch (showPlan.getShowContentType()) { case TTL: return StaticResps.TTL_RESP; @@ -686,7 +644,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { default: logger.error("Unsupported show content type: {}", showPlan.getShowContentType()); throw new QueryProcessException( - "Unsupported show content type:" + showPlan.getShowContentType()); + "Unsupported show content type:" + showPlan.getShowContentType()); } } @@ -705,7 +663,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return StaticResps.LIST_USER_PRIVILEGE_RESP; default: return getTSExecuteStatementResp(getStatus(TSStatusCode.SQL_PARSE_ERROR, - String.format("%s is not an auth query", authorPlan.getAuthorType()))); + String.format("%s is not an auth query", authorPlan.getAuthorType()))); } } @@ -714,7 +672,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { * get ResultSet schema */ private TSExecuteStatementResp getQueryColumnHeaders(PhysicalPlan physicalPlan, String username) - throws AuthException, TException, QueryProcessException { + throws AuthException, TException, QueryProcessException { List<String> respColumns = new ArrayList<>(); List<String> columnsTypes = new ArrayList<>(); @@ -722,7 +680,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { // check permissions if (!checkAuthorization(physicalPlan.getPaths(), physicalPlan, username)) { return getTSExecuteStatementResp(getStatus(TSStatusCode.NO_PERMISSION_ERROR, - "No permissions for this operation " + physicalPlan.getOperatorType())); + "No permissions for this operation " + physicalPlan.getOperatorType())); } TSExecuteStatementResp resp = getTSExecuteStatementResp(getStatus(TSStatusCode.SUCCESS_STATUS)); @@ -734,7 +692,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { // set dataTypeList in TSExecuteStatementResp. Note this is without deduplication. resp.setColumns(respColumns); resp.setDataTypeList(columnsTypes); - } + } else { getWideQueryHeaders(plan, respColumns, columnsTypes); resp.setColumns(respColumns); @@ -745,7 +703,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { // wide means not group by device private void getWideQueryHeaders(QueryPlan plan, List<String> respColumns, - List<String> columnTypes) throws TException, QueryProcessException { + List<String> columnTypes) throws TException, QueryProcessException { // Restore column header of aggregate to func(column_name), only // support single aggregate function for now List<Path> paths = plan.getPaths(); @@ -778,7 +736,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } private void getGroupByDeviceQueryHeaders(QueryPlan plan, List<String> respColumns, - List<String> columnTypes) { + List<String> columnTypes) { // set columns in TSExecuteStatementResp. Note this is without deduplication. List<String> measurementColumns = plan.getMeasurements(); respColumns.add(SQLConstant.GROUPBY_DEVICE_COLUMN_NAME); @@ -823,19 +781,19 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { if (!queryId2DataSet.containsKey(req.queryId)) { return getTSFetchResultsResp( - getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed query")); + getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed query")); } QueryDataSet queryDataSet = queryId2DataSet.get(req.queryId); if (req.isAlign) { TSQueryDataSet result = fillRpcReturnData(req.fetchSize, queryDataSet, - sessionIdUsernameMap.get(req.sessionId)); + sessionIdUsernameMap.get(req.sessionId)); boolean hasResultSet = result.bufferForTime().limit() != 0; if (!hasResultSet) { queryId2DataSet.remove(req.queryId); } TSFetchResultsResp resp = getTSFetchResultsResp(getStatus(TSStatusCode.SUCCESS_STATUS, - "FetchResult successfully. Has more result: " + hasResultSet)); + "FetchResult successfully. Has more result: " + hasResultSet)); resp.setHasResultSet(hasResultSet); resp.setQueryDataSet(result); resp.setIsAlign(true); @@ -843,13 +801,19 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } else { TSQueryNonAlignDataSet nonAlignResult = fillRpcNonAlignReturnData(req.fetchSize, queryDataSet, - sessionIdUsernameMap.get(req.sessionId)); - boolean hasResultSet = nonAlignResult.getTimeList().get(0).limit() != 0; + sessionIdUsernameMap.get(req.sessionId)); + boolean hasResultSet = false; + for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) { + if (timeBuffer.limit() != 0) { + hasResultSet = true; + break; + } + } if (!hasResultSet) { queryId2DataSet.remove(req.queryId); } TSFetchResultsResp resp = getTSFetchResultsResp(getStatus(TSStatusCode.SUCCESS_STATUS, - "FetchResult successfully. Has more result: " + hasResultSet)); + "FetchResult successfully. Has more result: " + hasResultSet)); resp.setHasResultSet(hasResultSet); resp.setNonAlignQueryDataSet(nonAlignResult); resp.setIsAlign(false); @@ -877,7 +841,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { encoder = new GroupedLSBWatermarkEncoder(config); } else { throw new UnSupportedDataTypeException(String.format( - "Watermark method is not supported yet: %s", config.getWatermarkMethodName())); + "Watermark method is not supported yet: %s", config.getWatermarkMethodName())); } if (queryDataSet instanceof NewEngineDataSetWithoutValueFilter) { // optimize for query without value filter @@ -895,9 +859,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } return result; } - - private TSQueryNonAlignDataSet fillRpcNonAlignReturnData(int fetchSize, QueryDataSet queryDataSet, - String userName) throws TException, AuthException, IOException, InterruptedException { + + private TSQueryNonAlignDataSet fillRpcNonAlignReturnData(int fetchSize, QueryDataSet queryDataSet, + String userName) throws TException, AuthException, IOException, InterruptedException { IAuthorizer authorizer; try { authorizer = LocalFileAuthorizer.getInstance(); @@ -905,30 +869,30 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { throw new TException(e); } TSQueryNonAlignDataSet result; - + if (config.isEnableWatermark() && authorizer.isUserUseWaterMark(userName)) { WatermarkEncoder encoder; if (config.getWatermarkMethodName().equals(IoTDBConfig.WATERMARK_GROUPED_LSB)) { encoder = new GroupedLSBWatermarkEncoder(config); } else { throw new UnSupportedDataTypeException(String.format( - "Watermark method is not supported yet: %s", config.getWatermarkMethodName())); + "Watermark method is not supported yet: %s", config.getWatermarkMethodName())); } result = ((NonAlignEngineDataSet) queryDataSet).fillBuffer(fetchSize, encoder); } else { result = ((NonAlignEngineDataSet) queryDataSet).fillBuffer(fetchSize, null); } return result; - + } - - + + /** * create QueryDataSet and buffer it for fetchResults */ private QueryDataSet createQueryDataSet(long queryId, PhysicalPlan physicalPlan) throws - QueryProcessException, QueryFilterOptimizationException, StorageEngineException, IOException, MetadataException, SQLException { + QueryProcessException, QueryFilterOptimizationException, StorageEngineException, IOException, MetadataException, SQLException { QueryContext context = new QueryContext(queryId); QueryDataSet queryDataSet = processor.getExecutor().processQuery(physicalPlan, context); @@ -948,7 +912,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } catch (Exception e) { logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e); return getTSExecuteStatementResp( - getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage())); + getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage())); } } @@ -968,7 +932,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { private boolean executeNonQuery(PhysicalPlan plan) throws QueryProcessException { if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) { throw new QueryProcessException( - "Current system mode is read-only, does not support non-query operation"); + "Current system mode is read-only, does not support non-query operation"); } return processor.getExecutor().processNonQuery(plan); } @@ -985,7 +949,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { if (physicalPlan.isQuery()) { return getTSExecuteStatementResp(getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, - "Statement is a query statement.")); + "Statement is a query statement.")); } return executeUpdateStatement(physicalPlan, sessionId); @@ -1001,7 +965,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } private boolean checkAuthorization(List<Path> paths, PhysicalPlan plan, String username) - throws AuthException { + throws AuthException { String targetUser = null; if (plan instanceof AuthorPlan) { targetUser = ((AuthorPlan) plan).getUserName(); @@ -1017,7 +981,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } private TSExecuteBatchStatementResp getTSBatchExecuteStatementResp(TSStatus status, - List<Integer> result) { + List<Integer> result) { TSExecuteBatchStatementResp resp = new TSExecuteBatchStatementResp(); TSStatus tsStatus = new TSStatus(status); resp.setStatus(tsStatus); @@ -1077,7 +1041,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MAX_TIME); properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MIN_TIME); properties - .setTimestampPrecision(IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision()); + .setTimestampPrecision(IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision()); return properties; } @@ -1186,10 +1150,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { BatchInsertPlan batchInsertPlan = new BatchInsertPlan(req.deviceId, req.measurements); batchInsertPlan.setTimes(QueryDataSetUtils.readTimesFromBuffer(req.timestamps, req.size)); batchInsertPlan.setColumns(QueryDataSetUtils - .readValuesFromBuffer(req.values, req.types, req.measurements.size(), req.size)); + .readValuesFromBuffer(req.values, req.types, req.measurements.size(), req.size)); batchInsertPlan.setRowCount(req.size); - batchInsertPlan.setTimeBuffer(req.timestamps); - batchInsertPlan.setValueBuffer(req.values); batchInsertPlan.setDataTypes(req.types); boolean isAllSuccessful = true; @@ -1209,16 +1171,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { if (isAllSuccessful) { logger.debug("Insert one RowBatch successfully"); return getTSBatchExecuteStatementResp(getStatus(TSStatusCode.SUCCESS_STATUS), - Arrays.asList(results)); + Arrays.asList(results)); } else { logger.debug("Insert one RowBatch failed!"); return getTSBatchExecuteStatementResp(getStatus(TSStatusCode.INTERNAL_SERVER_ERROR), - Arrays.asList(results)); + Arrays.asList(results)); } } catch (Exception e) { logger.info("{}: error occurs when executing statements", IoTDBConstant.GLOBAL_DB_NAME, e); return getTSBatchExecuteStatementResp( - getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()), null); + getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()), null); } finally { Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1); } @@ -1264,8 +1226,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return getStatus(TSStatusCode.NOT_LOGIN_ERROR); } CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan(new Path(req.getPath()), - TSDataType.values()[req.getDataType()], TSEncoding.values()[req.getEncoding()], - CompressionType.values()[req.compressor], new HashMap<>()); + TSDataType.values()[req.getDataType()], TSEncoding.values()[req.getEncoding()], + CompressionType.values()[req.compressor], new HashMap<>()); TSStatus status = checkAuthority(plan, req.getSessionId()); if (status != null) { return new TSStatus(status); @@ -1303,7 +1265,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { try { if (!checkAuthorization(paths, plan, sessionIdUsernameMap.get(sessionId))) { return getStatus(TSStatusCode.NO_PERMISSION_ERROR, - "No permissions for this operation " + plan.getOperatorType().toString()); + "No permissions for this operation " + plan.getOperatorType().toString()); } } catch (AuthException e) { logger.error("meet error while checking authorization.", e); @@ -1322,7 +1284,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } return execRet ? getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully") - : getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR); + : getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR); } private long generateQueryId(boolean isDataQuery) {
