Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.0 feec7c4ff -> 9dce7c11d
PHOENIX-2524 Fixes for pherf to run with queryserver (elserj) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9dce7c11 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9dce7c11 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9dce7c11 Branch: refs/heads/4.x-HBase-1.0 Commit: 9dce7c11dfb96ea6ceb8eaf417e8f511b1fcbd9b Parents: feec7c4 Author: Mujtaba <mujt...@apache.org> Authored: Fri Dec 18 15:39:40 2015 -0800 Committer: Mujtaba <mujt...@apache.org> Committed: Fri Dec 18 15:39:40 2015 -0800 ---------------------------------------------------------------------- .../src/build/components/all-common-jars.xml | 8 +++ .../java/org/apache/phoenix/pherf/Pherf.java | 19 ++++++- .../apache/phoenix/pherf/result/ResultUtil.java | 13 ++++- .../phoenix/pherf/result/file/Header.java | 1 + .../pherf/result/file/ResultFileDetails.java | 1 + .../apache/phoenix/pherf/util/PhoenixUtil.java | 57 ++++++++++++++++---- .../phoenix/pherf/workload/WriteWorkload.java | 33 +++++++----- 7 files changed, 105 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dce7c11/phoenix-assembly/src/build/components/all-common-jars.xml ---------------------------------------------------------------------- diff --git a/phoenix-assembly/src/build/components/all-common-jars.xml b/phoenix-assembly/src/build/components/all-common-jars.xml index bed9f25..960c3c9 100644 --- a/phoenix-assembly/src/build/components/all-common-jars.xml +++ b/phoenix-assembly/src/build/components/all-common-jars.xml @@ -104,5 +104,13 @@ </excludes> <fileMode>0644</fileMode> </fileSet> + <fileSet> + <directory>${project.basedir}/../phoenix-pherf/target/</directory> + <outputDirectory>lib</outputDirectory> + <includes> + <include>phoenix-*.jar</include> + </includes> + <fileMode>0644</fileMode> + </fileSet> </fileSets> </component> http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dce7c11/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java index eaf199a..b84183b 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java @@ -89,6 +89,8 @@ public class Pherf { options.addOption("label", true, "Label a run. Result file name will be suffixed with specified label"); options.addOption("compare", true, "Specify labeled run(s) to compare"); options.addOption("useAverageCompareType", false, "Compare results with Average query time instead of default is Minimum query time."); + options.addOption("t", "thin", false, "Use the Phoenix Thin Driver"); + options.addOption("s", "server", true, "The URL for the Phoenix QueryServer"); } private final String zookeeper; @@ -109,6 +111,8 @@ public class Pherf { private final String label; private final String compareResults; private final CompareType compareType; + private final boolean thinDriver; + private final String queryServerUrl; public Pherf(String[] args) throws Exception { CommandLineParser parser = new PosixParser(); @@ -155,14 +159,27 @@ public class Pherf { label = command.getOptionValue("label", null); compareResults = command.getOptionValue("compare", null); compareType = command.hasOption("useAverageCompareType") ? CompareType.AVERAGE : CompareType.MINIMUM; + thinDriver = command.hasOption("thin"); + if (thinDriver) { + queryServerUrl = command.getOptionValue("server", "http://localhost:8765"); + } else { + queryServerUrl = null; + } if ((command.hasOption("h") || (args == null || args.length == 0)) && !command .hasOption("listFiles")) { hf.printHelp("Pherf", options); System.exit(1); } - PhoenixUtil.setZookeeper(zookeeper); PhoenixUtil.setRowCountOverride(rowCountOverride); + if (!thinDriver) { + logger.info("Using thick driver with ZooKeepers '{}'", zookeeper); + PhoenixUtil.setZookeeper(zookeeper); + } else { + logger.info("Using thin driver with PQS '{}'", queryServerUrl); + // Enables the thin-driver and sets the PQS URL + PhoenixUtil.useThinDriver(queryServerUrl); + } ResultUtil.setFileSuffix(label); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dce7c11/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java index 92eb80a..0c2a7b8 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java @@ -84,7 +84,12 @@ public class ResultUtil { ensureBaseResultDirExists(); CSVResultHandler writer = null; - ResultFileDetails resultFileDetails = ResultFileDetails.CSV_AGGREGATE_DATA_LOAD; + ResultFileDetails resultFileDetails; + if (PhoenixUtil.isThinDriver()) { + resultFileDetails = ResultFileDetails.CSV_THIN_AGGREGATE_DATA_LOAD; + } else { + resultFileDetails = ResultFileDetails.CSV_AGGREGATE_DATA_LOAD; + } try { writer = new CSVFileResultHandler(); writer.setResultFileDetails(resultFileDetails); @@ -92,7 +97,11 @@ public class ResultUtil { for (TableLoadTime loadTime : dataLoadTime.getTableLoadTime()) { List<ResultValue> rowValues = new ArrayList<>(); - rowValues.add(new ResultValue(PhoenixUtil.getZookeeper())); + if (PhoenixUtil.isThinDriver()) { + rowValues.add(new ResultValue(PhoenixUtil.getQueryServerUrl())); + } else { + rowValues.add(new ResultValue(PhoenixUtil.getZookeeper())); + } rowValues.addAll(loadTime.getCsvRepresentation(this)); Result result = http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dce7c11/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java index 066fa7a..7d09f68 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java @@ -28,6 +28,7 @@ public enum Header { DETAILED_PERFORMANCE(DETAILED_BASE + ",RESULT_ROW_COUNT,RUN_TIME_MS"), DETAILED_FUNCTIONAL(DETAILED_BASE + ",DIFF_STATUS,EXPLAIN_PLAN"), AGGREGATE_DATA_LOAD("ZK,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"), + THIN_AGGREGATE_DATA_LOAD("QUERYSERVER,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"), MONITOR("STAT_NAME,STAT_VALUE,TIME_STAMP"); private String header; http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dce7c11/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java index a85f830..51aa407 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java @@ -24,6 +24,7 @@ public enum ResultFileDetails { CSV_DETAILED_PERFORMANCE(Header.DETAILED_PERFORMANCE, Extension.DETAILED_CSV), CSV_DETAILED_FUNCTIONAL(Header.DETAILED_FUNCTIONAL, Extension.DETAILED_CSV), CSV_AGGREGATE_DATA_LOAD(Header.AGGREGATE_DATA_LOAD, Extension.CSV), + CSV_THIN_AGGREGATE_DATA_LOAD(Header.THIN_AGGREGATE_DATA_LOAD, Extension.CSV), CSV_MONITOR(Header.MONITOR, Extension.CSV), XML(Header.EMPTY, Extension.XML), IMAGE(Header.EMPTY, Extension.VISUALIZATION); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dce7c11/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java index 064c604..b778833 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java @@ -28,6 +28,7 @@ import java.sql.*; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Properties; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; @@ -39,6 +40,8 @@ public class PhoenixUtil { private static int rowCountOverride = 0; private boolean testEnabled; private static PhoenixUtil instance; + private static boolean useThinDriver; + private static String queryServerUrl; private PhoenixUtil() { this(false); @@ -57,6 +60,19 @@ public class PhoenixUtil { return instance; } + public static void useThinDriver(String queryServerUrl) { + PhoenixUtil.useThinDriver = true; + PhoenixUtil.queryServerUrl = Objects.requireNonNull(queryServerUrl); + } + + public static String getQueryServerUrl() { + return PhoenixUtil.queryServerUrl; + } + + public static boolean isThinDriver() { + return PhoenixUtil.useThinDriver; + } + public Connection getConnection() throws Exception { return getConnection(null); } @@ -66,17 +82,31 @@ public class PhoenixUtil { } private Connection getConnection(String tenantId, boolean testEnabled) throws Exception { - if (null == zookeeper) { - throw new IllegalArgumentException( - "Zookeeper must be set before initializing connection!"); - } - Properties props = new Properties(); - if (null != tenantId) { - props.setProperty("TenantId", tenantId); - logger.debug("\nSetting tenantId to " + tenantId); + if (useThinDriver) { + if (null == queryServerUrl) { + throw new IllegalArgumentException("QueryServer URL must be set before" + + " initializing connection"); + } + Properties props = new Properties(); + if (null != tenantId) { + props.setProperty("TenantId", tenantId); + logger.debug("\nSetting tenantId to " + tenantId); + } + String url = "jdbc:phoenix:thin:url=" + queryServerUrl + ";serialization=PROTOBUF"; + return DriverManager.getConnection(url, props); + } else { + if (null == zookeeper) { + throw new IllegalArgumentException( + "Zookeeper must be set before initializing connection!"); + } + Properties props = new Properties(); + if (null != tenantId) { + props.setProperty("TenantId", tenantId); + logger.debug("\nSetting tenantId to " + tenantId); + } + String url = "jdbc:phoenix:" + zookeeper + (testEnabled ? ";test=true" : ""); + return DriverManager.getConnection(url, props); } - String url = "jdbc:phoenix:" + zookeeper + (testEnabled ? ";test=true" : ""); - return DriverManager.getConnection(url, props); } public boolean executeStatement(String sql, Scenario scenario) throws Exception { @@ -278,7 +308,12 @@ public class PhoenixUtil { public static void setZookeeper(String zookeeper) { logger.info("Setting zookeeper: " + zookeeper); - PhoenixUtil.zookeeper = zookeeper; + useThickDriver(zookeeper); + } + + public static void useThickDriver(String zookeeper) { + PhoenixUtil.useThinDriver = false; + PhoenixUtil.zookeeper = Objects.requireNonNull(zookeeper); } public static int getRowCountOverride() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dce7c11/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java index 7c73e22..a35e6e8 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java @@ -168,7 +168,7 @@ public class WriteWorkload implements Workload { // Execute any Scenario DDL before running workload pUtil.executeScenarioDdl(scenario); - List<Future> writeBatches = getBatches(dataLoadThreadTime, scenario); + List<Future<Info>> writeBatches = getBatches(dataLoadThreadTime, scenario); waitForBatches(dataLoadTimeSummary, scenario, start, writeBatches); @@ -182,12 +182,12 @@ public class WriteWorkload implements Workload { } } - private List<Future> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario) + private List<Future<Info>> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario) throws Exception { RowCalculator rowCalculator = new RowCalculator(getThreadPoolSize(), scenario.getRowCount()); - List<Future> writeBatches = new ArrayList<>(); + List<Future<Info>> writeBatches = new ArrayList<>(); for (int i = 0; i < getThreadPoolSize(); i++) { List<Column> @@ -212,7 +212,7 @@ public class WriteWorkload implements Workload { } private void waitForBatches(DataLoadTimeSummary dataLoadTimeSummary, Scenario scenario, - long start, List<Future> writeBatches) + long start, List<Future<Info>> writeBatches) throws InterruptedException, java.util.concurrent.ExecutionException { int sumRows = 0, sumDuration = 0; // Wait for all the batch threads to complete @@ -223,10 +223,12 @@ public class WriteWorkload implements Workload { logger.info("Executor (" + this.hashCode() + ") writes complete with row count (" + writeInfo.getRowCount() + ") in Ms (" + writeInfo.getDuration() + ")"); } - logger.info("Writes completed with total row count (" + sumRows + ") with total time of(" - + sumDuration + ") Ms"); + long testDuration = System.currentTimeMillis() - start; + logger.info("Writes completed with total row count (" + sumRows + + ") with total elapsed time of (" + testDuration + + ") ms and total CPU execution time of (" + sumDuration + ") ms"); dataLoadTimeSummary - .add(scenario.getTableName(), sumRows, (int) (System.currentTimeMillis() - start)); + .add(scenario.getTableName(), sumRows, (int) testDuration); } public Future<Info> upsertData(final Scenario scenario, final List<Column> columns, @@ -235,9 +237,10 @@ public class WriteWorkload implements Workload { Future<Info> future = pool.submit(new Callable<Info>() { @Override public Info call() throws Exception { int rowsCreated = 0; - long start = 0, duration, totalDuration; + long start = 0, last = 0, duration, totalDuration; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Connection connection = null; + PreparedStatement stmt = null; try { connection = pUtil.getConnection(scenario.getTenantId()); long logStartTime = System.currentTimeMillis(); @@ -247,17 +250,16 @@ public class WriteWorkload implements Workload { Long.MAX_VALUE : WriteWorkload.this.writeParams.getExecutionDurationInMs(); + last = start = System.currentTimeMillis(); + String sql = buildSql(columns, tableName); + stmt = connection.prepareStatement(sql); for (long i = rowCount; (i > 0) && ((System.currentTimeMillis() - logStartTime) < maxDuration); i--) { - String sql = buildSql(columns, tableName); - PreparedStatement stmt = connection.prepareStatement(sql); stmt = buildStatement(scenario, columns, stmt, simpleDateFormat); - start = System.currentTimeMillis(); rowsCreated += stmt.executeUpdate(); - stmt.close(); if ((i % getBatchSize()) == 0) { connection.commit(); - duration = System.currentTimeMillis() - start; + duration = System.currentTimeMillis() - last; logger.info("Writer (" + Thread.currentThread().getName() + ") committed Batch. Total " + getBatchSize() + " rows for this thread (" + this.hashCode() + ") in (" @@ -272,9 +274,14 @@ public class WriteWorkload implements Workload { // Pause for throttling if configured to do so Thread.sleep(threadSleepDuration); + // Re-compute the start time for the next batch + last = System.currentTimeMillis(); } } } finally { + if (stmt != null) { + stmt.close(); + } if (connection != null) { try { connection.commit();