Repository: phoenix Updated Branches: refs/heads/master adf1c2df0 -> b1f0bf75e
PHOENIX-3868 Pherf - Create sync/async index as part of a scenario Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b1f0bf75 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b1f0bf75 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b1f0bf75 Branch: refs/heads/master Commit: b1f0bf75e6c4883d85e20ed700a109f1ec9b91d3 Parents: adf1c2d Author: Mujtaba <mujt...@apache.org> Authored: Tue Jun 6 11:54:33 2017 -0700 Committer: Mujtaba <mujt...@apache.org> Committed: Tue Jun 6 11:54:33 2017 -0700 ---------------------------------------------------------------------- .../org/apache/phoenix/pherf/DataIngestIT.java | 19 +++++ .../apache/phoenix/pherf/ResultBaseTestIT.java | 4 +- .../phoenix/pherf/configuration/DataModel.java | 1 + .../apache/phoenix/pherf/configuration/Ddl.java | 50 +++++++++++++ .../phoenix/pherf/configuration/Scenario.java | 49 ++++++++----- .../apache/phoenix/pherf/util/PhoenixUtil.java | 75 ++++++++++++++++++-- .../phoenix/pherf/workload/WriteWorkload.java | 14 ++-- .../scenario/prod_test_unsalted_scenario.xml | 11 ++- .../phoenix/pherf/ConfigurationParserTest.java | 5 ++ .../test/resources/scenario/test_scenario.xml | 26 ++++++- 10 files changed, 221 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1f0bf75/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java index 7b7ac29..973ce2c 100644 --- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java +++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java @@ -101,6 +101,9 @@ public class DataIngestIT extends ResultBaseTestIT { } } + // Verify number of rows written + assertExpectedNumberOfRecordsWritten(scenario); + // Run some queries executor = new WorkloadExecutor(); Workload query = new QueryExecutor(parser, util, executor); @@ -114,6 +117,22 @@ public class DataIngestIT extends ResultBaseTestIT { } @Test + public void testPreAndPostDataLoadDdls() throws Exception { + Scenario scenario = parser.getScenarioByName("testPreAndPostDdls"); + WorkloadExecutor executor = new WorkloadExecutor(); + executor.add(new WriteWorkload(util, parser, scenario, GeneratePhoenixStats.NO)); + + try { + executor.get(); + executor.shutdown(); + } catch (Exception e) { + fail("Failed to load data. An exception was thrown: " + e.getMessage()); + } + + assertExpectedNumberOfRecordsWritten(scenario); + } + + @Test public void testRWWorkload() throws Exception { Connection connection = util.getConnection(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1f0bf75/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java index 1e77f48..2f4f4e5 100644 --- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java +++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java @@ -48,11 +48,11 @@ public class ResultBaseTestIT extends BaseHBaseManagedTimeIT { String dir = properties.getProperty("pherf.default.results.dir"); resultUtil.ensureBaseDirExists(dir); - util.setZookeeper("localhost"); + PhoenixUtil.setZookeeper("localhost"); reader = new SchemaReader(util, matcherSchema); parser = new XMLConfigParser(matcherScenario); } - + @AfterClass public static void tearDown() throws Exception { resultUtil.deleteDir(properties.getProperty("pherf.default.results.dir")); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1f0bf75/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java index 8eb42ff..4c99ddd 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java @@ -71,4 +71,5 @@ public class DataModel { } return stringBuilder.toString(); } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1f0bf75/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Ddl.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Ddl.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Ddl.java new file mode 100644 index 0000000..e431040 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Ddl.java @@ -0,0 +1,50 @@ +package org.apache.phoenix.pherf.configuration; + +import javax.xml.bind.annotation.XmlAttribute; + +public class Ddl { + private String statement; + private String tableName; + + public Ddl() { + } + + public Ddl(String statement, String tableName) { + this.statement = statement; + this.tableName = tableName; + } + + /** + * DDL + * @return + */ + @XmlAttribute + public String getStatement() { + return statement; + } + public void setStatement(String statement) { + this.statement = statement; + } + + /** + * Table name used in the DDL + * @return + */ + @XmlAttribute + public String getTableName() { + return tableName; + } + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String toString(){ + if (statement.contains("?")) { + return statement.replace("?", tableName); + } else { + return statement; + } + + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1f0bf75/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java index 200fdc5..02e5cc7 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java @@ -24,6 +24,7 @@ import java.util.Map; import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlElementWrapper; import javax.xml.bind.annotation.XmlRootElement; import org.apache.commons.lang.builder.HashCodeBuilder; @@ -39,8 +40,10 @@ public class Scenario { private WriteParams writeParams; private String name; private String tenantId; - private String ddl; - + private List<Ddl> preScenarioDdls; + private List<Ddl> postScenarioDdls; + + public Scenario() { writeParams = new WriteParams(); } @@ -92,7 +95,7 @@ public class Scenario { public void setRowCount(int rowCount) { this.rowCount = rowCount; } - + /** * Phoenix properties * @@ -179,18 +182,6 @@ public class Scenario { this.tenantId = tenantId; } - /** - * Scenario level DDL that is executed before running the scenario. - */ - @XmlAttribute - public String getDdl() { - return ddl; - } - - public void setDdl(String ddl) { - this.ddl = ddl; - } - public WriteParams getWriteParams() { return writeParams; } @@ -211,4 +202,30 @@ public class Scenario { } return stringBuilder.toString(); } -} + + public List<Ddl> getPreScenarioDdls() { + return preScenarioDdls; + } + + /** + * Scenario level DDLs (for views/index/async) that are executed before data load + */ + @XmlElementWrapper(name = "preScenarioDdls") + @XmlElement(name = "ddl") + public void setPreScenarioDdls(List<Ddl> preScenarioDdls) { + this.preScenarioDdls = preScenarioDdls; + } + + public List<Ddl> getPostScenarioDdls() { + return postScenarioDdls; + } + + /** + * Scenario level DDLs (for views/index/async) that are executed after data load + */ + @XmlElementWrapper(name = "postScenarioDdls") + @XmlElement(name = "ddl") + public void setPostScenarioDdls(List<Ddl> postScenarioDdls) { + this.postScenarioDdls = postScenarioDdls; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1f0bf75/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java index df18544..38dcd64 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java @@ -18,18 +18,24 @@ package org.apache.phoenix.pherf.util; +import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.mapreduce.index.automation.PhoenixMRJobSubmitter; import org.apache.phoenix.pherf.PherfConstants; import org.apache.phoenix.pherf.configuration.*; import org.apache.phoenix.pherf.jmx.MonitorManager; +import org.apache.phoenix.pherf.result.DataLoadThreadTime; +import org.apache.phoenix.pherf.result.DataLoadTimeSummary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.sql.*; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Properties; +import java.util.Set; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; @@ -42,6 +48,8 @@ public class PhoenixUtil { private static PhoenixUtil instance; private static boolean useThinDriver; private static String queryServerUrl; + private static final String ASYNC_KEYWORD = "ASYNC"; + private static final int ONE_MIN_IN_MS = 60000; private PhoenixUtil() { this(false); @@ -281,13 +289,19 @@ public class PhoenixUtil { * * @throws Exception */ - public void executeScenarioDdl(Scenario scenario) throws Exception { - if (null != scenario.getDdl()) { + public void executeScenarioDdl(List<Ddl> ddls, String tenantId, DataLoadTimeSummary dataLoadTimeSummary) throws Exception { + if (null != ddls) { Connection conn = null; try { - logger.info("\nExecuting DDL:" + scenario.getDdl() + " on tenantId:" - + scenario.getTenantId()); - executeStatement(scenario.getDdl(), conn = getConnection(scenario.getTenantId())); + for (Ddl ddl : ddls) { + logger.info("\nExecuting DDL:" + ddl + " on tenantId:" +tenantId); + long startTime = System.currentTimeMillis(); + executeStatement(ddl.toString(), conn = getConnection(tenantId)); + if (ddl.getStatement().toUpperCase().contains(ASYNC_KEYWORD)) { + waitForAsyncIndexToFinish(ddl.getTableName()); + } + dataLoadTimeSummary.add(ddl.getTableName(), 0, (int)(System.currentTimeMillis() - startTime)); + } } finally { if (null != conn) { conn.close(); @@ -296,7 +310,56 @@ public class PhoenixUtil { } } - public static String getZookeeper() { + /** + * Waits for ASYNC index to build + * @param tableName + * @throws InterruptedException + */ + private void waitForAsyncIndexToFinish(String tableName) throws InterruptedException { + //Wait for up to 15 mins for ASYNC index build to start + boolean jobStarted = false; + for (int i=0; i<15; i++) { + if (isYarnJobInProgress(tableName)) { + jobStarted = true; + break; + } + Thread.sleep(ONE_MIN_IN_MS); + } + if (jobStarted == false) { + throw new IllegalStateException("ASYNC index build did not start within 15 mins"); + } + + // Wait till ASYNC index job finishes to get approximate job E2E time + for (;;) { + if (!isYarnJobInProgress(tableName)) + break; + Thread.sleep(ONE_MIN_IN_MS); + } + } + + /** + * Checks if a YARN job with the specific table name is in progress + * @param tableName + * @return + */ + boolean isYarnJobInProgress(String tableName) { + try { + logger.info("Fetching YARN apps..."); + Set<String> response = new PhoenixMRJobSubmitter().getSubmittedYarnApps(); + for (String str : response) { + logger.info("Runnng YARN app: " + str); + if (str.toUpperCase().contains(tableName.toUpperCase())) { + return true; + } + } + } catch (Exception e) { + e.printStackTrace(); + } + + return false; + } + + public static String getZookeeper() { return zookeeper; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1f0bf75/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java index 69d35cc..3574761 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java @@ -171,14 +171,13 @@ public class WriteWorkload implements Workload { private synchronized void exec(DataLoadTimeSummary dataLoadTimeSummary, DataLoadThreadTime dataLoadThreadTime, Scenario scenario) throws Exception { logger.info("\nLoading " + scenario.getRowCount() + " rows for " + scenario.getTableName()); - long start = System.currentTimeMillis(); - // Execute any Scenario DDL before running workload - pUtil.executeScenarioDdl(scenario); - - List<Future<Info>> writeBatches = getBatches(dataLoadThreadTime, scenario); + // Execute any pre dataload scenario DDLs + pUtil.executeScenarioDdl(scenario.getPreScenarioDdls(), scenario.getTenantId(), dataLoadTimeSummary); - waitForBatches(dataLoadTimeSummary, scenario, start, writeBatches); + // Write data + List<Future<Info>> writeBatches = getBatches(dataLoadThreadTime, scenario); + waitForBatches(dataLoadTimeSummary, scenario, System.currentTimeMillis(), writeBatches); // Update Phoenix Statistics if (this.generateStatistics == GeneratePhoenixStats.YES) { @@ -188,6 +187,9 @@ public class WriteWorkload implements Workload { } else { logger.info("Phoenix table stats update not requested."); } + + // Execute any post data load scenario DDLs before starting query workload + pUtil.executeScenarioDdl(scenario.getPostScenarioDdls(), scenario.getTenantId(), dataLoadTimeSummary); } private List<Future<Info>> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario) http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1f0bf75/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml index 8f93685..e538ac2 100644 --- a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml +++ b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml @@ -289,7 +289,6 @@ <maxValue>2014-10-17 00:00:00.000</maxValue> </datavalue> <datavalue distribution="2"> - <minValue>2014-10-17 00:00:00.000</minValue> <maxValue>2014-10-18 00:00:00.000</maxValue> </datavalue> <datavalue distribution="2"> @@ -315,6 +314,16 @@ <name>TENANT_ID</name> </column> </dataOverride> + + <preScenarioDdls> + <ddl>CREATE INDEX IDX_DIVISION ON PHERF.PHERF_PROD_TEST_UNSALTED (DIVISION)</ddl> + </preScenarioDdls> + + <postScenarioDdls> + <ddl>CREATE INDEX IDX_OLDVAL_STRING ON PHERF.PHERF_PROD_TEST_UNSALTED (OLDVAL_STRING)</ddl> + <ddl>CREATE INDEX IDX_CONNECTION_ID ON PHERF.PHERF_PROD_TEST_UNSALTED (CONNECTION_ID)</ddl> + </postScenarioDdls> + <writeParams executionDurationInMs="10000"> <!-- Number of writer it insert into the threadpool http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1f0bf75/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java index 2f08bc0..a5c908e 100644 --- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java +++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java @@ -200,6 +200,11 @@ public class ConfigurationParserTest extends ResultBaseTest { data.setDataMappingColumns(columnList); Scenario scenario = new Scenario(); + scenario.setTenantId("00DXXXXXX"); + List<Ddl> preScenarioDdls = new ArrayList<Ddl>(); + preScenarioDdls.add(new Ddl("CREATE INDEX IF NOT EXISTS ? ON FHA (NEWVAL_NUMBER) ASYNC", "FHAIDX_NEWVAL_NUMBER")); + preScenarioDdls.add(new Ddl("CREATE LOCAL INDEX IF NOT EXISTS ? ON FHA (NEWVAL_NUMBER)", "FHAIDX_NEWVAL_NUMBER")); + scenario.setPreScenarioDdls(preScenarioDdls); scenario.setPhoenixProperties(new HashMap<String, String>()); scenario.getPhoenixProperties().put("phoenix.query.threadPoolSize", "200"); scenario.setDataOverride(new DataOverride()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1f0bf75/phoenix-pherf/src/test/resources/scenario/test_scenario.xml ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml index 34bf31a..e2915ba 100644 --- a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml +++ b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml @@ -236,6 +236,7 @@ <name>FIELD</name> </column> </dataOverride> + <!--Note: 1. Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first 2. DDL included in query are executed only once on start of querySet execution. --> @@ -255,6 +256,23 @@ <query id="q4" statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/> </querySet> </scenario> + + <scenario tableName="PHERF.TEST_TABLE" rowCount="99" name="testPreAndPostDdls"> + <preScenarioDdls> + <ddl statement="CREATE INDEX IDX_DIVISION ON ? (DIVISION)" tableName="PHERF.PHERF_PROD_TEST_UNSALTED"/> + </preScenarioDdls> + + <postScenarioDdls> + <ddl statement="CREATE INDEX IDX_OLDVAL_STRING ON ? (OLDVAL_STRING)" tableName="PHERF.PHERF_PROD_TEST_UNSALTED"/> + <ddl statement="CREATE INDEX IDX_CONNECTION_ID ON ? (CONNECTION_ID)" tableName="PHERF.PHERF_PROD_TEST_UNSALTED"/> + </postScenarioDdls> + + <querySet concurrency="1" executionType="SERIAL" executionDurationInMs="5000" + numberOfExecutions="1"> + <query id="q1" expectedAggregateRowCount="99" statement="select count(*) from PHERF.TEST_TABLE"/> + </querySet> + </scenario> + <!-- To configure a Write Workload to write to a tenant specific view users need to specify the tenantId attribute on the scenario, specifying the tenant they want to write data for as the attribute value. This tells Pherf to take out a @@ -264,8 +282,10 @@ dynamically create the view see comments below with regard to the ddl attribute. --> <scenario tableName="PHERF.TEST_VIEW" tenantId="xyzdefghijklmno" - ddl="CREATE VIEW IF NOT EXISTS PHERF.TEST_VIEW (field1 VARCHAR, field2 VARCHAR) AS SELECT * FROM PHERF.TEST_MULTI_TENANT_TABLE" rowCount="100" name="testMTWriteScenario"> + <preScenarioDdls> + <ddl statement="CREATE VIEW IF NOT EXISTS PHERF.TEST_VIEW (field1 VARCHAR, field2 VARCHAR) AS SELECT * FROM PHERF.TEST_MULTI_TENANT_TABLE" /> + </preScenarioDdls> </scenario> <!-- Scenario level DDL that is dynamically executed before the Write Workload is run. This pattern is really useful when you want to write data to multi-tenant view and the tenant id is @@ -275,8 +295,10 @@ least once. --> <scenario tableName="PHERF.TEST_MT_VIEW" tenantId="abcdefghijklmno" - ddl="CREATE VIEW IF NOT EXISTS PHERF.TEST_MT_VIEW (field1 VARCHAR) AS SELECT * FROM PHERF.TEST_MULTI_TENANT_TABLE" rowCount="100" name="testMTDdlWriteScenario"> + <preScenarioDdls> + <ddl statement="CREATE VIEW IF NOT EXISTS PHERF.TEST_MT_VIEW (field1 VARCHAR) AS SELECT * FROM PHERF.TEST_MULTI_TENANT_TABLE" /> + </preScenarioDdls> </scenario> </scenarios>