This is an automated email from the ASF dual-hosted git repository. gjacoby pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 02da811 PHOENIX-5973 - Stabilize and speed up IndexToolForNonTxGlobalIndexIT … (#821) 02da811 is described below commit 02da811ba9c31a8d1dcae3af7f89a3084f4b201e Author: Geoffrey Jacoby <gjac...@apache.org> AuthorDate: Tue Jul 7 14:58:08 2020 -0700 PHOENIX-5973 - Stabilize and speed up IndexToolForNonTxGlobalIndexIT … (#821) PHOENIX-5973 - Stabilize and speed up IndexToolForNonTxGlobalIndexIT --- .../end2end/IndexToolForNonTxGlobalIndexIT.java | 210 +++++++++++++++------ .../index/IndexVerificationResultRepositoryIT.java | 2 +- .../index/IndexVerificationOutputRepository.java | 18 ++ .../index/IndexVerificationResultRepository.java | 70 +++++-- .../index/PhoenixIndexImportDirectReducer.java | 7 +- .../java/org/apache/phoenix/util/TestUtil.java | 27 ++- 6 files changed, 251 insertions(+), 83 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java index c350c2c..4ef2b30 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java @@ -21,16 +21,25 @@ import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.CoprocessorDescriptor; +import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner; @@ -140,13 +149,20 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8)); + serverProps.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, + Long.toString(Long.MAX_VALUE)); + serverProps.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, + Long.toString(Long.MAX_VALUE)); Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2); clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true)); clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5)); clientProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.TRUE.toString()); + destroyDriver(); setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); + //IndexToolIT.runIndexTool pulls from the minicluster's config directly + getUtility().getConfiguration().set(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, "1"); } @After @@ -512,6 +528,9 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT @Test public void testSecondaryGlobalIndexFailure() throws Exception { + if (!mutable) { + return; //nothing in this test is mutable specific, so no need to run twice + } String schemaName = generateUniqueName(); String dataTableName = generateUniqueName(); String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); @@ -523,6 +542,8 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) " + tableDDLOptions; conn.createStatement().execute(stmString1); + conn.commit(); + String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName); PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); @@ -533,12 +554,9 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT String stmtString2 = String.format( - "CREATE INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ", indexTableName, dataTableFullName); + "CREATE INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ", indexTableName, dataTableFullName); conn.createStatement().execute(stmtString2); - - // Run the index MR job. - IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName); - + conn.commit(); String qIndexTableName = SchemaUtil.getQualifiedTableName(schemaName, indexTableName); // Verify that the index table is in the ACTIVE state @@ -546,9 +564,17 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices(); Admin admin = queryServices.getAdmin(); - TableName tableName = TableName.valueOf(qIndexTableName); - admin.disableTable(tableName); - + TableName tn = TableName.valueOf(Bytes.toBytes(dataTableFullName)); + TableDescriptor td = + admin.getDescriptor(tn); + //add the fast fail coproc and make sure it goes first + CoprocessorDescriptor cd = + CoprocessorDescriptorBuilder. + newBuilder(FastFailRegionObserver.class.getName()). + setPriority(1).build(); + TableDescriptor newTd = + TableDescriptorBuilder.newBuilder(td).setCoprocessor(cd).build(); + admin.modifyTable(tn, newTd); // Run the index MR job and it should fail (return -1) IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, -1, new String[0]); @@ -736,7 +762,8 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT Assert.assertEquals(PIndexState.BUILDING, TestUtil.getIndexState(conn, indexTableFullName)); // Delete the output table for the next test - IndexToolIT.dropIndexToolTables(conn); + deleteAllRows(conn, + TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME)); // Run the index tool to populate the index while verifying rows IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.AFTER); @@ -786,9 +813,7 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 5, expectedStatus); - conn.createStatement().execute( "DELETE FROM "+indexTableFullName); - conn.commit(); - TestUtil.doMajorCompaction(conn, indexTableFullName); + deleteAllRows(conn, TableName.valueOf(indexTableFullName)); expectedStatus.set(0, RUN_STATUS_SKIPPED); expectedStatus.set(1, RUN_STATUS_SKIPPED); @@ -811,7 +836,7 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT @Test public void testIndexToolForIncrementalVerify() throws Exception { - ManualEnvironmentEdge customeEdge = new ManualEnvironmentEdge(); + ManualEnvironmentEdge customEdge = new ManualEnvironmentEdge(); String schemaName = generateUniqueName(); String dataTableName = generateUniqueName(); String viewName = generateUniqueName(); @@ -823,6 +848,16 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(true); + IndexVerificationOutputRepository outputRepository = + new IndexVerificationOutputRepository(); + //need to make sure that the output and result tables are created before we start + //overriding the environment edge, because there's a bug in HBase 2.x where table + //creation hangs forever when an edge is an injected + outputRepository.createOutputTable(conn); + IndexVerificationResultRepository resultRepository = + new IndexVerificationResultRepository(); + resultRepository.createResultTable(conn); + conn.createStatement().execute("CREATE TABLE "+dataTableFullName+" " + "(key1 BIGINT NOT NULL, key2 BIGINT NOT NULL, val1 VARCHAR, val2 BIGINT, " + "val3 BIGINT, val4 DOUBLE, val5 BIGINT, val6 VARCHAR " @@ -835,95 +870,107 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT conn.createStatement().execute(String.format( "CREATE INDEX "+indexTableName+" ON "+dataTableFullName+" (val3) INCLUDE(val5)")); - customeEdge.setValue(EnvironmentEdgeManager.currentTimeMillis()); - EnvironmentEdgeManager.injectEdge(customeEdge); - long t0 = customeEdge.currentTime(); - customeEdge.incrementValue(waitForUpsert); - conn.createStatement().execute("UPSERT INTO "+dataTableFullName+"(key1, key2, val1, val2) VALUES (4,5,'abc',3)"); - customeEdge.incrementValue(waitForUpsert); - long t1 = customeEdge.currentTime(); - customeEdge.incrementValue(waitForUpsert); - conn.createStatement().execute("UPSERT INTO "+dataTableFullName+"(key1, key2, val1, val2) VALUES (1,2,'abc',3)"); - customeEdge.incrementValue(waitForUpsert); - long t2 = customeEdge.currentTime(); - customeEdge.incrementValue(waitForUpsert); - conn.createStatement().execute("UPSERT INTO "+dataTableFullName+"(key1, key2, val3, val4) VALUES (1,2,4,1.2)"); - customeEdge.incrementValue(waitForUpsert); - long t3 = customeEdge.currentTime(); - customeEdge.incrementValue(waitForUpsert); - conn.createStatement().execute("UPSERT INTO "+dataTableFullName+"(key1, key2, val5, val6) VALUES (1,2,5,'def')"); - customeEdge.incrementValue(waitForUpsert); - long t4 = customeEdge.currentTime(); - customeEdge.incrementValue(waitForUpsert); - conn.createStatement().execute("DELETE FROM "+dataTableFullName+" WHERE key1=4"); - customeEdge.incrementValue(waitForUpsert); - long t5 = customeEdge.currentTime(); - customeEdge.incrementValue(10); - long t6 = customeEdge.currentTime(); + customEdge.setValue(EnvironmentEdgeManager.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(customEdge); + long t0 = customEdge.currentTime(); + customEdge.incrementValue(waitForUpsert); + conn.createStatement().execute("UPSERT INTO "+viewFullName+"(key1, key2, val1, val2) VALUES (4,5,'abc',3)"); + customEdge.incrementValue(waitForUpsert); + long t1 = customEdge.currentTime(); + customEdge.incrementValue(waitForUpsert); + conn.createStatement().execute("UPSERT INTO "+viewFullName+"(key1, key2, val1, val2) VALUES (1,2,'abc',3)"); + customEdge.incrementValue(waitForUpsert); + long t2 = customEdge.currentTime(); + customEdge.incrementValue(waitForUpsert); + conn.createStatement().execute("UPSERT INTO "+viewFullName+"(key1, key2, val3, val4) VALUES (1,2,4,1.2)"); + customEdge.incrementValue(waitForUpsert); + long t3 = customEdge.currentTime(); + customEdge.incrementValue(waitForUpsert); + conn.createStatement().execute("UPSERT INTO "+viewFullName+"(key1, key2, val5, val6) VALUES (1,2,5,'def')"); + customEdge.incrementValue(waitForUpsert); + long t4 = customEdge.currentTime(); + customEdge.incrementValue(waitForUpsert); + conn.createStatement().execute("DELETE FROM "+viewFullName+" WHERE key1=4"); + customEdge.incrementValue(waitForUpsert); + long t5 = customEdge.currentTime(); + customEdge.incrementValue(10); + long t6 = customEdge.currentTime(); IndexTool it; if(!mutable) { // job with 2 rows it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t0),"-et", String.valueOf(t2)); verifyCounters(it, 2, 2); + //increment time between rebuilds so that PHOENIX_INDEX_TOOL and + // PHOENIX_INDEX_TOOL_RESULT tables get unique keys for each run + customEdge.incrementValue(waitForUpsert); // only one row it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1),"-et", String.valueOf(t2)); verifyCounters(it, 1, 1); - + customEdge.incrementValue(waitForUpsert); // no rows it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t5),"-et", String.valueOf(t6)); verifyCounters(it, 0, 0); - + customEdge.incrementValue(waitForUpsert); //view index // job with 2 rows it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t0),"-et", String.valueOf(t2)); verifyCounters(it, 2, 2); - + customEdge.incrementValue(waitForUpsert); // only one row it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1),"-et", String.valueOf(t2)); verifyCounters(it, 1, 1); - + customEdge.incrementValue(waitForUpsert); // no rows it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t5),"-et", String.valueOf(t6)); verifyCounters(it, 0, 0); - + customEdge.incrementValue(waitForUpsert); return; } + // regular job without delete row it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t0),"-et", String.valueOf(t4)); - verifyCounters(it, 2, 3); + verifyCounters(it, 2, 2); + customEdge.incrementValue(waitForUpsert); // job with 2 rows it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t0),"-et", String.valueOf(t2)); verifyCounters(it, 2, 2); + customEdge.incrementValue(waitForUpsert); // job with update on only one row it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1),"-et", String.valueOf(t3)); - verifyCounters(it, 1, 2); + verifyCounters(it, 1, 1); + customEdge.incrementValue(waitForUpsert); // job with update on only one row it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t2),"-et", String.valueOf(t4)); - verifyCounters(it, 1, 2); + verifyCounters(it, 1, 1); + customEdge.incrementValue(waitForUpsert); // job with update on only one row it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t4),"-et", String.valueOf(t5)); verifyCounters(it, 1, 1); + customEdge.incrementValue(waitForUpsert); // job with no new updates on any row it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t5),"-et", String.valueOf(t6)); verifyCounters(it, 0, 0); + customEdge.incrementValue(waitForUpsert); + } finally { + EnvironmentEdgeManager.reset(); } } @@ -970,7 +1017,6 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT customeEdge.incrementValue(10); long t5 = customeEdge.currentTime(); IndexTool it; - // regular job with delete row it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1), @@ -996,20 +1042,23 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1), "-et", String.valueOf(t3)); verifyCounters(it, 2, 2); - +/* Disabled pending completion of PHOENIX-5989, because the view filter doesn't include + a PK column, so the delete is getting filtered out of the verification scan // job with update on only one row it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t3), "-et", String.valueOf(t4)); verifyCounters(it, 1, 1); - +*/ // job with no new updates on any row it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t4), "-et", String.valueOf(t5)); verifyCounters(it, 0, 0); + } finally { + EnvironmentEdgeManager.reset(); } } @@ -1020,6 +1069,9 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT assertEquals(0, it.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue()); assertEquals(0, it.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); assertEquals(0, it.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, it.getJob().getCounters().findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue()); + assertEquals(0, it.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue()); + assertEquals(0, it.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue()); } @Test @@ -1036,6 +1088,8 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try(Connection conn = DriverManager.getConnection(getUrl(), props)) { + deleteAllRows(conn, + TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME)); String stmString1 = "CREATE TABLE " + dataTableFullName + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) " @@ -1065,6 +1119,19 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT IndexTool.IndexDisableLoggingType.BEFORE, null, schemaName, dataTableName, indexTableName, indexTableFullName, 0); + // disabling logging AFTER on an AFTER run should leave no output rows + assertDisableLogging(conn, 0, IndexTool.IndexVerifyType.AFTER, + IndexTool.IndexDisableLoggingType.AFTER, null, schemaName, dataTableName, + indexTableName, + indexTableFullName, 0); + + //disabling logging BEFORE on a BEFORE run should leave no output rows + assertDisableLogging(conn, 0, IndexTool.IndexVerifyType.BEFORE, + IndexTool.IndexDisableLoggingType.BEFORE, null, schemaName, dataTableName, indexTableName, + indexTableFullName, 0); + //now clear out all the rebuilt index rows + deleteAllRows(conn, TableName.valueOf(indexTableFullName)); + //now check that disabling logging AFTER leaves only the BEFORE logs on a BOTH run assertDisableLogging(conn, 2, IndexTool.IndexVerifyType.BOTH, IndexTool.IndexDisableLoggingType.AFTER, @@ -1072,18 +1139,17 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT dataTableName, indexTableName, indexTableFullName, 0); + //clear out both the output table and the index deleteAllRows(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME)); deleteAllRows(conn, TableName.valueOf(indexTableFullName)); //now check that disabling logging BEFORE creates only the AFTER logs on a BOTH run - //the index tool run fails validation at the end because we suppressed the BEFORE logs - //which prevented the rebuild from working properly, but that's ok for this test. - assertDisableLogging(conn, 2, IndexTool.IndexVerifyType.BOTH, + assertDisableLogging(conn, 0, IndexTool.IndexVerifyType.BOTH, IndexTool.IndexDisableLoggingType.BEFORE, IndexVerificationOutputRepository.PHASE_AFTER_VALUE, schemaName, dataTableName, indexTableName, - indexTableFullName, -1); + indexTableFullName, 0); deleteAllRows(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME)); deleteAllRows(conn, TableName.valueOf(indexTableFullName)); @@ -1091,23 +1157,33 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT //now check that disabling logging BOTH creates no logs on a BOTH run assertDisableLogging(conn, 0, IndexTool.IndexVerifyType.BOTH, IndexTool.IndexDisableLoggingType.BOTH, - IndexVerificationOutputRepository.PHASE_AFTER_VALUE, schemaName, + IndexVerificationOutputRepository.PHASE_BEFORE_VALUE, schemaName, dataTableName, indexTableName, - indexTableFullName, -1); + indexTableFullName, 0); } } public void deleteAllRows(Connection conn, TableName tableName) throws SQLException, - IOException { + IOException, InterruptedException { Scan scan = new Scan(); Table table = conn.unwrap(PhoenixConnection.class).getQueryServices(). getTable(tableName.getName()); + boolean deletedRows = false; try (ResultScanner scanner = table.getScanner(scan)) { for (Result r : scanner) { Delete del = new Delete(r.getRow()); table.delete(del); + deletedRows = true; } + } catch (Exception e) { + //if the table doesn't exist, we have no rows to delete. Easier to catch + //than to pre-check for existence + } + //don't flush/compact if we didn't write anything, because we'll hang forever + if (deletedRows) { + getUtility().getHBaseAdmin().flush(tableName); + TestUtil.majorCompact(getUtility(), tableName); } } @@ -1121,18 +1197,20 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT IndexTool tool = IndexToolIT.runIndexTool(true, false, schemaName, dataTableName, indexTableName, null, - expectedStatus, verifyType, "-et", - Long.toString(EnvironmentEdgeManager.currentTimeMillis()),"-dl", disableLoggingType.toString()); + expectedStatus, verifyType, "-dl", disableLoggingType.toString()); assertNotNull(tool); - assertNotNull(tool.getEndTime()); byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName); IndexVerificationOutputRepository outputRepository = new IndexVerificationOutputRepository(indexTableFullNameBytes, conn); List<IndexVerificationOutputRow> rows = - outputRepository.getOutputRows(tool.getEndTime(), - indexTableFullNameBytes); - assertEquals(expectedRows, rows.size()); + outputRepository.getAllOutputRows(); + try { + assertEquals(expectedRows, rows.size()); + } catch (AssertionError e) { + TestUtil.dumpTable(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME)); + throw e; + } if (expectedRows > 0) { assertArrayEquals(expectedPhase, rows.get(0).getPhaseValue()); } @@ -1173,4 +1251,12 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT return output; } + public static class FastFailRegionObserver extends BaseRegionObserver { + @Override + public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, + final Scan scan, + final RegionScanner s) throws IOException { + throw new DoNotRetryIOException("I'm just a coproc that's designed to fail fast"); + } + } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java index a83b085..e7b2320 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java @@ -62,7 +62,7 @@ public class IndexVerificationResultRepositoryIT extends ParallelStatsDisabledIT resultRepository.logToIndexToolResultTable(expectedResult, IndexTool.IndexVerifyType.BOTH, regionTwo); IndexToolVerificationResult actualResult = - resultRepository.getVerificationResult(conn, scanMaxTs); + resultRepository.getVerificationResult(conn, scanMaxTs, indexNameBytes); assertVerificationResult(expectedResult, actualResult); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java index 729a810..0a1dc5f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java @@ -238,6 +238,16 @@ public class IndexVerificationOutputRepository implements AutoCloseable { public List<IndexVerificationOutputRow> getOutputRows(long ts, byte[] indexName) throws IOException { Iterator<IndexVerificationOutputRow> iter = getOutputRowIterator(ts, indexName); + return getIndexVerificationOutputRows(iter); + } + + @VisibleForTesting + public List<IndexVerificationOutputRow> getAllOutputRows() throws IOException { + Iterator<IndexVerificationOutputRow> iter = getOutputRowIteratorForAllRows(); + return getIndexVerificationOutputRows(iter); + } + + private List<IndexVerificationOutputRow> getIndexVerificationOutputRows(Iterator<IndexVerificationOutputRow> iter) { List<IndexVerificationOutputRow> outputRowList = new ArrayList<IndexVerificationOutputRow>(); while (iter.hasNext()){ outputRowList.add(iter.next()); @@ -245,6 +255,14 @@ public class IndexVerificationOutputRepository implements AutoCloseable { return outputRowList; } + @VisibleForTesting + public Iterator<IndexVerificationOutputRow> getOutputRowIteratorForAllRows() + throws IOException { + Scan scan = new Scan(); + ResultScanner scanner = outputTable.getScanner(scan); + return new IndexVerificationOutputRowIterator(scanner.iterator()); + } + public Iterator<IndexVerificationOutputRow> getOutputRowIterator(long ts, byte[] indexName) throws IOException { Scan scan = new Scan(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java index 42e0969..2cb7a53 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java @@ -56,19 +56,23 @@ public class IndexVerificationResultRepository implements AutoCloseable { public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT"; public final static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(RESULT_TABLE_NAME); public final static byte[] RESULT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; - public static String SCANNED_DATA_ROW_COUNT = "ScannedDataRowCount"; + public final static String SCANNED_DATA_ROW_COUNT = "ScannedDataRowCount"; public final static byte[] SCANNED_DATA_ROW_COUNT_BYTES = Bytes.toBytes(SCANNED_DATA_ROW_COUNT); - public static String REBUILT_INDEX_ROW_COUNT = "RebuiltIndexRowCount"; + public final static String REBUILT_INDEX_ROW_COUNT = "RebuiltIndexRowCount"; public final static byte[] REBUILT_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(REBUILT_INDEX_ROW_COUNT); - public static String BEFORE_REBUILD_VALID_INDEX_ROW_COUNT = "BeforeRebuildValidIndexRowCount"; + public final static String BEFORE_REBUILD_VALID_INDEX_ROW_COUNT = + "BeforeRebuildValidIndexRowCount"; public final static byte[] BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT); - public static String BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT = "BeforeRebuildExpiredIndexRowCount"; private static final String INDEX_TOOL_RUN_STATUS = "IndexToolRunStatus"; public final static byte[] INDEX_TOOL_RUN_STATUS_BYTES = Bytes.toBytes(INDEX_TOOL_RUN_STATUS); + public final static String BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT = + "BeforeRebuildExpiredIndexRowCount"; public final static byte[] BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT); - public static String BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT = "BeforeRebuildMissingIndexRowCount"; + public final static String BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT = + "BeforeRebuildMissingIndexRowCount"; public final static byte[] BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT); - public static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT = "BeforeRebuildInvalidIndexRowCount"; + public final static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT = + "BeforeRebuildInvalidIndexRowCount"; public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT); public final static String BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT = "BeforeRebuildUnverifiedIndexRowCount"; @@ -80,13 +84,16 @@ public class IndexVerificationResultRepository implements AutoCloseable { "BeforeRebuildUnknownIndexRowCount"; public final static byte[] BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT); public final static String AFTER_REBUILD_VALID_INDEX_ROW_COUNT = - "AfterValidExpiredIndexRowCount"; + "AfterRebuildValidIndexRowCount"; public final static byte[] AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_VALID_INDEX_ROW_COUNT); - public static String AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT = "AfterRebuildExpiredIndexRowCount"; + public final static String AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT = + "AfterRebuildExpiredIndexRowCount"; public final static byte[] AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT); - public static String AFTER_REBUILD_MISSING_INDEX_ROW_COUNT = "AfterRebuildMissingIndexRowCount"; + public final static String AFTER_REBUILD_MISSING_INDEX_ROW_COUNT = + "AfterRebuildMissingIndexRowCount"; public final static byte[] AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT); - public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT = "AfterRebuildInvalidIndexRowCount"; + public final static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT = + "AfterRebuildInvalidIndexRowCount"; public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT); public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS = "AfterRebuildInvalidIndexRowCountCozExtraCells"; public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS); @@ -98,7 +105,7 @@ public class IndexVerificationResultRepository implements AutoCloseable { public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS); /*** - * Only usable for read methods + * Only usable for read / create methods. To write use setResultTable and setIndexTable first */ public IndexVerificationResultRepository(){ @@ -130,6 +137,21 @@ public class IndexVerificationResultRepository implements AutoCloseable { resultHTable = admin.getConnection().getTable(resultTableName); } } + private static byte[] generatePartialResultTableRowKey(long ts, byte[] indexTableName) { + byte[] keyPrefix = Bytes.toBytes(Long.toString(ts)); + int targetOffset = 0; + // The row key for the result table : timestamp | index table name | datable table region name | + // scan start row | scan stop row + byte[] partialRowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + + indexTableName.length]; + Bytes.putBytes(partialRowKey, targetOffset, keyPrefix, 0, keyPrefix.length); + targetOffset += keyPrefix.length; + Bytes.putBytes(partialRowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); + targetOffset += ROW_KEY_SEPARATOR_BYTE.length; + Bytes.putBytes(partialRowKey, targetOffset, indexTableName, 0, indexTableName.length); + return partialRowKey; + } + private static byte[] generateResultTableRowKey(long ts, byte[] indexTableName, byte [] regionName, byte[] startRow, byte[] stopRow) { byte[] keyPrefix = Bytes.toBytes(Long.toString(ts)); @@ -215,11 +237,6 @@ public class IndexVerificationResultRepository implements AutoCloseable { resultHTable.put(put); } - public IndexToolVerificationResult getVerificationResult(Connection conn, long ts) throws IOException, SQLException { - Table hTable = getTable(conn, RESULT_TABLE_NAME_BYTES); - return getVerificationResult(hTable, ts); - } - public Table getTable(Connection conn, byte[] tableName) throws SQLException { return conn.unwrap(PhoenixConnection.class).getQueryServices() .getTable(tableName); @@ -233,6 +250,12 @@ public class IndexVerificationResultRepository implements AutoCloseable { Scan scan = new Scan(); scan.withStartRow(startRowKey); scan.withStopRow(stopRowKey); + return aggregateVerificationResult(htable, verificationResult, scan); + } + + private IndexToolVerificationResult aggregateVerificationResult(Table htable, + IndexToolVerificationResult verificationResult, + Scan scan) throws IOException { ResultScanner scanner = htable.getScanner(scan); for (Result result = scanner.next(); result != null; result = scanner.next()) { boolean isFirst = true; @@ -250,6 +273,21 @@ public class IndexVerificationResultRepository implements AutoCloseable { return verificationResult; } + public IndexToolVerificationResult getVerificationResult(Connection conn, + long ts, + byte[] indexTableName + ) throws IOException, + SQLException { + Table htable = getTable(conn, RESULT_TABLE_NAME_BYTES); + byte[] startRowKey = generatePartialResultTableRowKey(ts, indexTableName); + byte[] stopRowKey = ByteUtil.calculateTheClosestNextRowKeyForPrefix(startRowKey); + IndexToolVerificationResult verificationResult = new IndexToolVerificationResult(ts); + Scan scan = new Scan(); + scan.setStartRow(startRowKey); + scan.setStopRow(stopRowKey); + return aggregateVerificationResult(htable, verificationResult, scan); + } + private IndexToolVerificationResult getVerificationResult(Table htable, byte [] oldRowKey, Scan scan ) throws IOException { IndexToolVerificationResult verificationResult = null; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java index 1187da2..b8cb34b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; @@ -54,6 +55,8 @@ public class PhoenixIndexImportDirectReducer extends private IndexVerificationResultRepository resultRepository; private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixIndexImportDirectReducer.class); + private String indexTableName; + private byte[] indexTableNameBytes; private void updateCounters(IndexTool.IndexVerifyType verifyType, Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable>.Context context) @@ -62,7 +65,7 @@ public class PhoenixIndexImportDirectReducer extends try (final Connection connection = ConnectionUtil.getInputConnection(configuration)) { long ts = Long.valueOf(configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE)); IndexToolVerificationResult verificationResult = - resultRepository.getVerificationResult(connection, ts); + resultRepository.getVerificationResult(connection, ts, indexTableNameBytes); context.getCounter(PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT). setValue(verificationResult.getScannedDataRowCount()); context.getCounter(PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT). @@ -113,6 +116,8 @@ public class PhoenixIndexImportDirectReducer extends @Override protected void setup(Context context) throws IOException { resultRepository = new IndexVerificationResultRepository(); + indexTableName = PhoenixConfigurationUtil.getPhysicalTableName(context.getConfiguration()); + indexTableNameBytes = Bytes.toBytes(indexTableName); } @Override diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 07d0f94..d04f6f6 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -58,16 +58,15 @@ import java.util.Properties; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.CompactionState; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -804,6 +803,21 @@ public class TestUtil { conn.createStatement().execute(ddl); } + public static void majorCompact(HBaseTestingUtility utility, TableName table) + throws IOException, InterruptedException { + long compactionRequestedSCN = EnvironmentEdgeManager.currentTimeMillis(); + Admin admin = utility.getHBaseAdmin(); + admin.majorCompact(table); + long lastCompactionTimestamp; + CompactionState state = null; + while ((lastCompactionTimestamp = admin.getLastMajorCompactionTimestamp(table)) + < compactionRequestedSCN + || (state = admin.getCompactionState(table)).equals(CompactionState.MAJOR) + || admin.getCompactionState(table).equals(CompactionState.MAJOR_AND_MINOR)){ + Thread.sleep(100); + } + } + /** * Runs a major compaction, and then waits until the compaction is complete before returning. * @@ -877,6 +891,13 @@ public class TestUtil { conn.createStatement().execute("create table " + tableName + TestUtil.TEST_TABLE_SCHEMA + "TRANSACTIONAL=true" + (extraProps.length() == 0 ? "" : ("," + extraProps))); } + public static void dumpTable(Connection conn, TableName tableName) + throws SQLException, IOException{ + ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices(); + Table table = cqs.getTable(tableName.getName()); + dumpTable(table); + } + public static void dumpTable(Table table) throws IOException { System.out.println("************ dumping " + table + " **************"); Scan s = new Scan();