This is an automated email from the ASF dual-hosted git repository. apucher pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 669bd59 add execution timeout and fix exception stats (#6177) 669bd59 is described below commit 669bd595baa671f2420476da94524635ab1d14db Author: Alexander Pucher <apuc...@apache.org> AuthorDate: Thu Oct 22 13:13:54 2020 -0700 add execution timeout and fix exception stats (#6177) We add a "-timeout" param to the query runner to support time-limited execution, e.g. for performance benchmarks. This PR also fixes a display issue with query exception statistics not being reset on every interval. --- .../org/apache/pinot/tools/perf/QueryRunner.java | 82 +++++++++++++++------- 1 file changed, 57 insertions(+), 25 deletions(-) diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java index 2ced758..d9d6d6a 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java @@ -77,6 +77,8 @@ public class QueryRunner extends AbstractBaseCommand implements Command { private int _brokerPort = 8099; @Option(name = "-queueDepth", required = false, metaVar = "<int>", usage = "Queue size limit for multi-threaded execution (default 64).") private int _queueDepth = 64; + @Option(name = "-timeout", required = false, metaVar = "<long>", usage = "Timeout in milliseconds for completing all queries (default: unlimited).") + private long _timeout = 0; @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.") private boolean _help; @@ -151,10 +153,10 @@ public class QueryRunner extends AbstractBaseCommand implements Command { switch (_mode) { case "singleThread": LOGGER.info("MODE singleThread with queryFile: {}, numTimesToRunQueries: {}, reportIntervalMs: {}, " - + "numIntervalsToReportAndClearStatistics: {}", _queryFile, _numTimesToRunQueries, _reportIntervalMs, - _numIntervalsToReportAndClearStatistics); + + "numIntervalsToReportAndClearStatistics: {}, timeout: {}", _queryFile, _numTimesToRunQueries, + _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _timeout); singleThreadedQueryRunner(conf, queries, _numTimesToRunQueries, _reportIntervalMs, - _numIntervalsToReportAndClearStatistics); + _numIntervalsToReportAndClearStatistics, _timeout); break; case "multiThreads": if (_numThreads <= 0) { @@ -163,11 +165,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command { break; } LOGGER.info("MODE multiThreads with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, " - + "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, queueDepth: {}", _queryFile, - _numTimesToRunQueries, _numThreads, _reportIntervalMs, _numIntervalsToReportAndClearStatistics, - _queueDepth); + + "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, queueDepth: {}, timeout: {}", + _queryFile, _numTimesToRunQueries, _numThreads, _reportIntervalMs, _numIntervalsToReportAndClearStatistics, + _queueDepth, _timeout); multiThreadedQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _reportIntervalMs, - _numIntervalsToReportAndClearStatistics); + _numIntervalsToReportAndClearStatistics, _timeout); break; case "targetQPS": if (_numThreads <= 0) { @@ -181,11 +183,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command { break; } LOGGER.info("MODE targetQPS with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, startQPS: {}, " - + "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, queueDepth: {}", _queryFile, - _numTimesToRunQueries, _numThreads, _startQPS, _reportIntervalMs, _numIntervalsToReportAndClearStatistics, - _queueDepth); + + "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, queueDepth: {}, timeout: {}", + _queryFile, _numTimesToRunQueries, _numThreads, _startQPS, _reportIntervalMs, + _numIntervalsToReportAndClearStatistics, _queueDepth, _timeout); targetQPSQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _startQPS, - _reportIntervalMs, _numIntervalsToReportAndClearStatistics); + _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _timeout); break; case "increasingQPS": if (_numThreads <= 0) { @@ -211,11 +213,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command { } LOGGER.info("MODE increasingQPS with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, startQPS: {}, " + "deltaQPS: {}, reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, " - + "numIntervalsToIncreaseQPS: {}, queueDepth: {}", _queryFile, _numTimesToRunQueries, _numThreads, - _startQPS, _deltaQPS, _reportIntervalMs, _numIntervalsToReportAndClearStatistics, - _numIntervalsToIncreaseQPS, _queueDepth); + + "numIntervalsToIncreaseQPS: {}, queueDepth: {}, timeout: {}", _queryFile, _numTimesToRunQueries, + _numThreads, _startQPS, _deltaQPS, _reportIntervalMs, _numIntervalsToReportAndClearStatistics, + _numIntervalsToIncreaseQPS, _queueDepth, _timeout); increasingQPSQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _startQPS, _deltaQPS, - _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _numIntervalsToIncreaseQPS); + _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _numIntervalsToIncreaseQPS, _timeout); break; default: LOGGER.error("Invalid mode: {}", _mode); @@ -237,10 +239,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command { * @param reportIntervalMs report interval in milliseconds. * @param numIntervalsToReportAndClearStatistics number of report intervals to report detailed statistics and clear * them, 0 means never. + * @param timeout timeout in milliseconds for completing all queries. * @throws Exception */ public static void singleThreadedQueryRunner(PerfBenchmarkDriverConf conf, Stream<String> queries, int numTimesToRunQueries, - int reportIntervalMs, int numIntervalsToReportAndClearStatistics) + int reportIntervalMs, int numIntervalsToReportAndClearStatistics, long timeout) throws Exception { PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf); int numQueriesExecuted = 0; @@ -249,6 +252,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command { long totalClientTime = 0L; List<Statistics> statisticsList = Collections.singletonList(new Statistics(CLIENT_TIME_STATISTICS)); + final long startTimeAbsolute = System.currentTimeMillis(); long startTime = System.currentTimeMillis(); long reportStartTime = startTime; int numReportIntervals = 0; @@ -256,6 +260,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command { while (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries) { Iterator<String> itQuery = queries.iterator(); while (itQuery.hasNext()) { + if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) { + LOGGER.warn("Timeout of {} sec reached. Aborting", timeout); + return; + } + String query = itQuery.next(); JsonNode response = driver.postQuery(query); @@ -284,6 +293,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command { numReportIntervals = 0; startTime = currentTime; numQueriesExecuted = 0; + numExceptions = 0; totalBrokerTime = 0L; totalClientTime = 0L; for (Statistics statistics : statisticsList) { @@ -325,10 +335,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command { * @param reportIntervalMs report interval in milliseconds. * @param numIntervalsToReportAndClearStatistics number of report intervals to report detailed statistics and clear * them, 0 means never. + * @param timeout timeout in milliseconds for completing all queries * @throws Exception */ public static void multiThreadedQueryRunner(PerfBenchmarkDriverConf conf, Stream<String> queries, int numTimesToRunQueries, - int numThreads, int queueDepth, int reportIntervalMs, int numIntervalsToReportAndClearStatistics) + int numThreads, int queueDepth, int reportIntervalMs, int numIntervalsToReportAndClearStatistics, long timeout) throws Exception { PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf); Queue<String> queryQueue = new LinkedBlockingDeque<>(queueDepth); @@ -346,6 +357,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command { } executorService.shutdown(); + final long startTimeAbsolute = System.currentTimeMillis(); long startTime = System.currentTimeMillis(); long reportStartTime = startTime; int numReportIntervals = 0; @@ -356,6 +368,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command { return; } + if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) { + LOGGER.warn("Timeout of {} sec reached. Aborting", timeout); + return; + } + Iterator<String> itQuery = queries.iterator(); while (itQuery.hasNext()) { String query = itQuery.next(); @@ -381,7 +398,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command { == numIntervalsToReportAndClearStatistics)) { numReportIntervals = 0; startTime = currentTime; - reportAndClearStatistics(numQueriesExecuted, totalBrokerTime, totalClientTime, statisticsList); + reportAndClearStatistics(numQueriesExecuted, numExceptions, totalBrokerTime, totalClientTime, statisticsList); } } } @@ -428,11 +445,12 @@ public class QueryRunner extends AbstractBaseCommand implements Command { * @param reportIntervalMs report interval in milliseconds. * @param numIntervalsToReportAndClearStatistics number of report intervals to report detailed statistics and clear * them, 0 means never. + * @param timeout timeout in milliseconds for completing all queries * @throws Exception */ public static void targetQPSQueryRunner(PerfBenchmarkDriverConf conf, Stream<String> queries, int numTimesToRunQueries, int numThreads, int queueDepth, double startQPS, int reportIntervalMs, - int numIntervalsToReportAndClearStatistics) + int numIntervalsToReportAndClearStatistics, long timeout) throws Exception { PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf); Queue<String> queryQueue = new LinkedBlockingDeque<>(queueDepth); @@ -450,6 +468,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command { } executorService.shutdown(); + final long startTimeAbsolute = System.currentTimeMillis(); final int queryIntervalNanos = (int) (1E9 / startQPS); long startTime = System.currentTimeMillis(); long reportStartTime = startTime; @@ -461,6 +480,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command { return; } + if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) { + LOGGER.warn("Timeout of {} sec reached. Aborting", timeout); + return; + } + long nextQueryNanos = System.nanoTime(); Iterator<String> itQuery = queries.iterator(); while (itQuery.hasNext()) { @@ -495,7 +519,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command { == numIntervalsToReportAndClearStatistics)) { numReportIntervals = 0; startTime = currentTime; - reportAndClearStatistics(numQueriesExecuted, totalBrokerTime, totalClientTime, statisticsList); + reportAndClearStatistics(numQueriesExecuted, numExceptions, totalBrokerTime, totalClientTime, statisticsList); } } } @@ -544,13 +568,14 @@ public class QueryRunner extends AbstractBaseCommand implements Command { * @param reportIntervalMs report interval in milliseconds. * @param numIntervalsToReportAndClearStatistics number of report intervals to report detailed statistics and clear * them, 0 means never. + * @param timeout timeout in milliseconds for completing all queries. * @param numIntervalsToIncreaseQPS number of intervals to increase QPS. * @throws Exception */ public static void increasingQPSQueryRunner(PerfBenchmarkDriverConf conf, Stream<String> queries, int numTimesToRunQueries, int numThreads, int queueDepth, double startQPS, double deltaQPS, int reportIntervalMs, - int numIntervalsToReportAndClearStatistics, int numIntervalsToIncreaseQPS) + int numIntervalsToReportAndClearStatistics, int numIntervalsToIncreaseQPS, long timeout) throws Exception { PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf); Queue<String> queryQueue = new LinkedBlockingDeque<>(queueDepth); @@ -568,6 +593,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command { } executorService.shutdown(); + final long startTimeAbsolute = System.currentTimeMillis(); long startTime = System.currentTimeMillis(); long reportStartTime = startTime; int numReportIntervals = 0; @@ -580,6 +606,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command { return; } + if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) { + LOGGER.warn("Timeout of {} sec reached. Aborting", timeout); + return; + } + long nextQueryNanos = System.nanoTime(); Iterator<String> itQuery = queries.iterator(); while (itQuery.hasNext()) { @@ -616,7 +647,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command { totalClientTime.get() / (double) numQueriesExecutedInt, queryQueue.size()); numReportIntervals = 0; startTime = currentTime; - reportAndClearStatistics(numQueriesExecuted, totalBrokerTime, totalClientTime, statisticsList); + reportAndClearStatistics(numQueriesExecuted, numExceptions, totalBrokerTime, totalClientTime, statisticsList); currentQPS += deltaQPS; queryIntervalNanos = (long) (1E9 / currentQPS); @@ -633,7 +664,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command { if ((numIntervalsToReportAndClearStatistics != 0) && ( numReportIntervals % numIntervalsToReportAndClearStatistics == 0)) { startTime = currentTime; - reportAndClearStatistics(numQueriesExecuted, totalBrokerTime, totalClientTime, statisticsList); + reportAndClearStatistics(numQueriesExecuted, numExceptions, totalBrokerTime, totalClientTime, statisticsList); } } } @@ -679,9 +710,10 @@ public class QueryRunner extends AbstractBaseCommand implements Command { } } - private static void reportAndClearStatistics(AtomicInteger numQueriesExecuted, AtomicLong totalBrokerTime, - AtomicLong totalClientTime, List<Statistics> statisticsList) { + private static void reportAndClearStatistics(AtomicInteger numQueriesExecuted, AtomicInteger numExceptions, + AtomicLong totalBrokerTime, AtomicLong totalClientTime, List<Statistics> statisticsList) { numQueriesExecuted.set(0); + numExceptions.set(0); totalBrokerTime.set(0L); totalClientTime.set(0L); for (Statistics statistics : statisticsList) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org