This is an automated email from the ASF dual-hosted git repository. suyue pushed a commit to branch time_statstic in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 55009146592c2578e0865add05c650d505cd8e2a Author: suyue <[email protected]> AuthorDate: Thu May 30 18:36:28 2019 +0800 add time cost statstic --- .../org/apache/iotdb/db/concurrent/ThreadName.java | 3 +- .../apache/iotdb/db/cost/stastic/Measurement.java | 94 ++++++++++++++++++++++ .../apache/iotdb/db/cost/stastic/Operation.java | 47 +++++++++++ .../iotdb/db/engine/filenode/FileNodeManager.java | 23 ++++++ .../org/apache/iotdb/db/qp/QueryProcessor.java | 11 ++- .../iotdb/db/qp/executor/OverflowQPExecutor.java | 6 +- .../org/apache/iotdb/db/service/TSServiceImpl.java | 27 ++++++- .../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 1 - 8 files changed, 207 insertions(+), 5 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java index a1c3e44..b153690 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java @@ -37,7 +37,8 @@ public enum ThreadName { INDEX_SERVICE("Index-ServerServiceImpl"), SYNC_CLIENT("Sync-Client"), SYNC_SERVER("Sync-Server"), - SYNC_MONITOR("Sync-Monitor"); + SYNC_MONITOR("Sync-Monitor"), + TIME_COST_STATSTIC("TIME_COST_STATSTIC"); private String name; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Measurement.java b/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Measurement.java new file mode 100644 index 0000000..ff7affc --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Measurement.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.cost.stastic; + +import java.util.Date; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.concurrent.ThreadName; + +public class Measurement { + + private Map<Operation, AtomicLong> operationLatencies; + private Map<Operation, AtomicLong> operationCnt; + private ScheduledExecutorService service; + + public static final Measurement INSTANCE = MeasurementHolder.MEASUREMENT; + + private Measurement() { + operationLatencies = new ConcurrentHashMap<>(); + for (Operation operation : Operation.values()) { + operationLatencies.put(operation, new AtomicLong(0)); + } + + operationCnt = new ConcurrentHashMap<>(); + for (Operation operation : Operation.values()) { + operationCnt.put(operation, new AtomicLong(0)); + } + + service = IoTDBThreadPoolFactory.newScheduledThreadPool(1, + ThreadName.TIME_COST_STATSTIC.getName()); + service.scheduleWithFixedDelay( + new DisplayRunnable(), 30, 60, TimeUnit.SECONDS); + System.out.println("AFTER SERVICE:"+Operation.values()); + } + + public void addOperationLatency(Operation op, long latency) { + operationLatencies.get(op).getAndAdd(latency); + operationCnt.get(op).incrementAndGet(); + } + + public void showMeasurements() { + Date date = new Date(); + System.out.println("--------------------------------"+String.format("%s Measurement (ms)", date.toString())+"-----------------------------------"); + String head = String.format("%-45s%-30s%-30s%-30s","OPERATION", "COUNT", "TOTAL_TIME", "AVG_TIME"); + System.out.println(head); + for(Operation operation : Operation.values()){ + long cnt = operationCnt.get(operation).get(); + long totalInMs = operationLatencies.get(operation).get() / 1000000; + String avg = String.format("%.4f", (totalInMs/(cnt+1e-9))); + String item = String.format("%-45s%-30s%-30s%-30s", operation.name, cnt+"", totalInMs+"", avg); + System.out.println(item); + } + System.out.println( + "-----------------------------------------------------------------------------------------------------------------"); + } + + class DisplayRunnable implements Runnable{ + @Override + public void run() { + showMeasurements(); + } + } + + private static class MeasurementHolder{ + private static final Measurement MEASUREMENT = new Measurement(); + private MeasurementHolder(){} + } + + public static void main(String[] args){ + Measurement.INSTANCE.addOperationLatency(Operation.CHECK_AUTHORIZATION, 90L); + System.out.println("hhhhhh"); + } +} + diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Operation.java b/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Operation.java new file mode 100644 index 0000000..d140fe9 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Operation.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.cost.stastic; + +public enum Operation { + EXECUTE_BATCH_SQL("EXECUTE_BATCH_SQL"), + PARSE_SQL_TO_PHYSICAL_PLAN("1 PARSE_SQL_TO_PHYSICAL_PLAN"), + GENERATE_AST_NODE("1.1 GENERATE_AST_NODE"), + GENERATE_PHYSICAL_PLAN("1.2 GENERATE_PHYSICAL_PLAN"), + EXECUTE_PHYSICAL_PLAN("2 EXECUTE_PHYSICAL_PLAN"), + CHECK_AUTHORIZATION("2.1 CHECK_AUTHORIZATION"), + EXECUTE_NON_QUERY("2.2 EXECUTE_NON_QUERY"), + CONSTRUCT_TSRECORD("2.2.1 CONSTRUCT_TSRECORD"), + GET_FILENODE_PROCESSOR("2.2.2 GET_FILENODE_PROCESSOR(ADD LOCK)"), + INSERT_BUFFER_WRITE_OR_OVERFLOW("2.2.3 INSERT_BUFFER_WRITE_OR_OVERFLOW"), + GET_BUFFER_WRITE_PROFESSOR("2.2.3.1 GET_BUFFER_WRITE_PROFESSOR"), + WRITE_WAL("2.2.3.2 WRITE_WAL"), + WRITE_MEM_TABLE("2.2.3.3 WRITE_MEM_TABLE"), + CONSTRUCT_JDBC_RESULT("2.3 CONSTRUCT_JDBC_RESULT"),; + + public String getName() { + return name; + } + + String name; + + Operation(String name) { + this.name = name; + } + +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java index 516abdc..1eff4f3 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java @@ -36,6 +36,8 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.Directories; +import org.apache.iotdb.db.cost.stastic.Measurement; +import org.apache.iotdb.db.cost.stastic.Operation; import org.apache.iotdb.db.engine.Processor; import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor; import org.apache.iotdb.db.engine.memcontrol.BasicMemController; @@ -282,10 +284,16 @@ public class FileNodeManager implements IStatistic, IService { checkTimestamp(tsRecord); updateStat(isMonitor, tsRecord); + long t0 = System.nanoTime(); FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true); + long t1 = System.nanoTime(); + Measurement.INSTANCE.addOperationLatency(Operation.GET_FILENODE_PROCESSOR, t1-t0); + int insertType; try { + + long t2 = System.nanoTime(); long lastUpdateTime = fileNodeProcessor.getFlushLastUpdateTime(deviceId); if (timestamp < lastUpdateTime) { insertOverflow(fileNodeProcessor, timestamp, tsRecord, isMonitor, deviceId); @@ -294,6 +302,9 @@ public class FileNodeManager implements IStatistic, IService { insertBufferWrite(fileNodeProcessor, timestamp, isMonitor, tsRecord, deviceId); insertType = 2; } + long t3 = System.nanoTime(); + Measurement.INSTANCE.addOperationLatency(Operation.INSERT_BUFFER_WRITE_OR_OVERFLOW, t3-t2); + } catch (FileNodeProcessorException e) { LOGGER.error(String.format("Encounter an error when closing the buffer write processor %s.", fileNodeProcessor.getProcessorName()), e); @@ -394,7 +405,10 @@ public class FileNodeManager implements IStatistic, IService { BufferWriteProcessor bufferWriteProcessor; String filenodeName = fileNodeProcessor.getProcessorName(); try { + long t0 = System.nanoTime(); bufferWriteProcessor = fileNodeProcessor.getBufferWriteProcessor(filenodeName, timestamp); + long t1 = System.nanoTime(); + Measurement.INSTANCE.addOperationLatency(Operation.GET_BUFFER_WRITE_PROFESSOR,t1-t0); } catch (FileNodeProcessorException e) { LOGGER.error("Get the bufferwrite processor failed, the filenode is {}, insert time is {}", filenodeName, timestamp); @@ -417,9 +431,16 @@ public class FileNodeManager implements IStatistic, IService { throw new FileNodeManagerException(e); } } + // write wal + long t2 = System.nanoTime(); writeLog(tsRecord, isMonitor, bufferWriteProcessor.getLogNode()); + long t3 = System.nanoTime(); + Measurement.INSTANCE.addOperationLatency(Operation.WRITE_WAL,t3-t2); + + // Write data + long t4 = System.nanoTime(); long prevStartTime = fileNodeProcessor.getIntervalFileNodeStartTime(deviceId); long prevUpdateTime = fileNodeProcessor.getLastUpdateTime(deviceId); @@ -437,6 +458,8 @@ public class FileNodeManager implements IStatistic, IService { } throw new FileNodeManagerException(e); } + long t5 = System.nanoTime(); + Measurement.INSTANCE.addOperationLatency(Operation.WRITE_MEM_TABLE,t5-t4); if (bufferWriteProcessor .getFileSize() > IoTDBDescriptor.getInstance() diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java index 74a1abf..acf976c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java @@ -21,6 +21,8 @@ package org.apache.iotdb.db.qp; import java.time.ZoneId; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.cost.stastic.Measurement; +import org.apache.iotdb.db.cost.stastic.Operation; import org.apache.iotdb.db.exception.ArgsErrorException; import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.exception.qp.IllegalASTFormatException; @@ -68,11 +70,18 @@ public class QueryProcessor { public PhysicalPlan parseSQLToPhysicalPlan(String sqlStr, ZoneId zoneId) throws QueryProcessorException, ArgsErrorException, ProcessorException { + long t0 = System.nanoTime(); AstNode astNode = parseSQLToAST(sqlStr); + long t1 = System.nanoTime(); + Measurement.INSTANCE.addOperationLatency(Operation.GENERATE_AST_NODE, t1-t0); + long t2 = System.nanoTime(); Operator operator = parseASTToOperator(astNode, zoneId); operator = logicalOptimize(operator, executor); PhysicalGenerator physicalGenerator = new PhysicalGenerator(executor); - return physicalGenerator.transformToPhysicalPlan(operator); + PhysicalPlan qp = physicalGenerator.transformToPhysicalPlan(operator); + long t3 = System.nanoTime(); + Measurement.INSTANCE.addOperationLatency(Operation.GENERATE_PHYSICAL_PLAN, t3-t2); + return qp; } /** diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java index 9ca337e..bc34438 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java @@ -31,6 +31,8 @@ import org.apache.iotdb.db.auth.entity.PathPrivilege; import org.apache.iotdb.db.auth.entity.PrivilegeType; import org.apache.iotdb.db.auth.entity.Role; import org.apache.iotdb.db.auth.entity.User; +import org.apache.iotdb.db.cost.stastic.Measurement; +import org.apache.iotdb.db.cost.stastic.Operation; import org.apache.iotdb.db.engine.filenode.FileNodeManager; import org.apache.iotdb.db.exception.ArgsErrorException; import org.apache.iotdb.db.exception.FileNodeManagerException; @@ -269,8 +271,8 @@ public class OverflowQPExecutor extends QueryProcessExecutor { String[] insertValues) throws ProcessorException { try { + long t0 = System.nanoTime(); TSRecord tsRecord = new TSRecord(insertTime, deviceId); - MNode node = mManager.getNodeByDeviceIdFromCache(deviceId); for (int i = 0; i < measurementList.length; i++) { @@ -292,6 +294,8 @@ public class OverflowQPExecutor extends QueryProcessExecutor { DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementList[i], value); tsRecord.addTuple(dataPoint); } + long t1 = System.nanoTime(); + Measurement.INSTANCE.addOperationLatency(Operation.CONSTRUCT_TSRECORD,t1-t0); return fileNodeManager.insert(tsRecord, false); } catch (PathErrorException | FileNodeManagerException e) { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 592b009..47de5bb 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -35,6 +35,8 @@ import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.cost.stastic.Measurement; +import org.apache.iotdb.db.cost.stastic.Operation; import org.apache.iotdb.db.engine.filenode.FileNodeManager; import org.apache.iotdb.db.exception.ArgsErrorException; import org.apache.iotdb.db.exception.FileNodeManagerException; @@ -434,6 +436,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { @Override public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req) throws TException { + long st = System.nanoTime(); try { if (!checkLogin()) { LOGGER.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME); @@ -446,13 +449,21 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { for (String statement : statements) { try { + long t0 = System.nanoTime(); PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statement, zoneIds.get()); + long t1 = System.nanoTime(); + Measurement.INSTANCE.addOperationLatency(Operation.PARSE_SQL_TO_PHYSICAL_PLAN,t1-t0); physicalPlan.setProposer(username.get()); if (physicalPlan.isQuery()) { return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS, "statement is query :" + statement, result); } + + long t2 = System.nanoTime(); TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan); + long t3 = System.nanoTime(); + Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_PHYSICAL_PLAN,t3-t2); + if (resp.getStatus().getStatusCode().equals(TS_StatusCode.SUCCESS_STATUS)) { result.add(Statement.SUCCESS_NO_INFO); } else { @@ -471,6 +482,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { batchErrorMessage = errMessage; } } + if (isAllSuccessful) { return getTSBathExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS, "Execute batch statements successfully", result); @@ -481,6 +493,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { LOGGER.error("{}: error occurs when executing statements", IoTDBConstant.GLOBAL_DB_NAME, e); return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage(), null); } + finally { + long et = System.nanoTime(); + Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_BATCH_SQL, et-st); + } } @Override @@ -739,7 +755,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { private TSExecuteStatementResp executeUpdateStatement(PhysicalPlan plan) { List<Path> paths = plan.getPaths(); - + long st = System.nanoTime(); try { if (!checkAuthorization(paths, plan)) { return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, @@ -750,16 +766,23 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, "Uninitialized authorizer " + e.getMessage()); } + long et = System.nanoTime(); + Measurement.INSTANCE.addOperationLatency(Operation.CHECK_AUTHORIZATION, et-st); // TODO // In current version, we only return OK/ERROR // Do we need to add extra information of executive condition boolean execRet; try { + long t0 = System.nanoTime(); execRet = executeNonQuery(plan); + long t1 = System.nanoTime(); + Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_NON_QUERY,t1-t0); } catch (ProcessorException e) { LOGGER.debug("meet error while processing non-query. ", e); return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage()); } + + long t2 = System.nanoTime(); TS_StatusCode statusCode = execRet ? TS_StatusCode.SUCCESS_STATUS : TS_StatusCode.ERROR_STATUS; String msg = execRet ? "Execute successfully" : "Execute statement error."; TSExecuteStatementResp resp = getTSExecuteStatementResp(statusCode, msg); @@ -769,6 +792,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { TSOperationHandle operationHandle; operationHandle = new TSOperationHandle(operationId, false); resp.setOperationHandle(operationHandle); + long t3 = System.nanoTime(); + Measurement.INSTANCE.addOperationLatency(Operation.CONSTRUCT_JDBC_RESULT,t3-t2); return resp; } diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java index fe56bbc..75d3585 100644 --- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java +++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java @@ -48,7 +48,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class IoTDBStatement implements Statement { private static final String SHOW_TIMESERIES_COMMAND_LOWERCASE = "show timeseries";
