PHOENIX-3585 MutableIndexIT testSplitDuringIndexScan and testIndexHalfStoreFileReader fail for transactional tables and local indexes
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1e2a9675 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1e2a9675 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1e2a9675 Branch: refs/heads/calcite Commit: 1e2a9675c68f2ea52cf0d7fd3dc6dcff585b02cd Parents: 7201dd5 Author: Thomas D'Silva <[email protected]> Authored: Fri Feb 10 14:10:52 2017 -0800 Committer: Thomas <[email protected]> Committed: Wed Mar 1 15:04:12 2017 -0800 ---------------------------------------------------------------------- .../phoenix/end2end/index/MutableIndexIT.java | 81 ++++++++++---------- .../IndexHalfStoreFileReaderGenerator.java | 12 ++- 2 files changed, 50 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/1e2a9675/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java index 56e5bf4..424099d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java @@ -41,10 +41,17 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.util.ByteUtil; @@ -620,13 +627,11 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { } @Test - @Ignore //TODO remove after PHOENIX-3585 is fixed public void testSplitDuringIndexScan() throws Exception { testSplitDuringIndexScan(false); } @Test - @Ignore //TODO remove after PHOENIX-3585 is fixed public void testSplitDuringIndexReverseScan() throws Exception { testSplitDuringIndexScan(true); } @@ -685,10 +690,10 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { } @Test - @Ignore //TODO remove after PHOENIX-3585 is fixed public void testIndexHalfStoreFileReader() throws Exception { Connection conn1 = getConnection(); - HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin(); + ConnectionQueryServices connectionQueryServices = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES); + HBaseAdmin admin = connectionQueryServices.getAdmin(); String tableName = "TBL_" + generateUniqueName(); String indexName = "IDX_" + generateUniqueName(); try { @@ -700,55 +705,53 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { conn1.createStatement().execute("UPSERT INTO "+tableName+" values('j',2,4,2,'a')"); conn1.createStatement().execute("UPSERT INTO "+tableName+" values('q',3,1,1,'c')"); conn1.commit(); + String query = "SELECT count(*) FROM " + tableName +" where v1<='z'"; ResultSet rs = conn1.createStatement().executeQuery(query); assertTrue(rs.next()); assertEquals(4, rs.getInt(1)); - TableName table = TableName.valueOf(localIndex?tableName: indexName); TableName indexTable = TableName.valueOf(localIndex?tableName: indexName); admin.flush(indexTable); boolean merged = false; + HTableInterface table = connectionQueryServices.getTable(indexTable.getName()); // merge regions until 1 left - end: while (true) { - long numRegions = 0; - while (true) { - rs = conn1.createStatement().executeQuery(query); - assertTrue(rs.next()); - assertEquals(4, rs.getInt(1)); //TODO this returns 5 sometimes instead of 4, duplicate results? - try { - List<HRegionInfo> indexRegions = admin.getTableRegions(indexTable); - numRegions = indexRegions.size(); - if (numRegions==1) { - break end; - } - if(!merged) { - List<HRegionInfo> regions = - admin.getTableRegions(localIndex ? table : indexTable); - Log.info("Merging: " + regions.size()); - admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(), - regions.get(1).getEncodedNameAsBytes(), false); - merged = true; - Threads.sleep(10000); - } + long numRegions = 0; + while (true) { + rs = conn1.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals(4, rs.getInt(1)); //TODO this returns 5 sometimes instead of 4, duplicate results? + try { + List<HRegionInfo> indexRegions = admin.getTableRegions(indexTable); + numRegions = indexRegions.size(); + if (numRegions==1) { break; - } catch (Exception ex) { - Log.info(ex); } - if(!localIndex) { - long waitStartTime = System.currentTimeMillis(); - // wait until merge happened - while (System.currentTimeMillis() - waitStartTime < 10000) { - List<HRegionInfo> regions = admin.getTableRegions(indexTable); - Log.info("Waiting:" + regions.size()); - if (regions.size() < numRegions) { - break; - } - Threads.sleep(1000); - } + if(!merged) { + List<HRegionInfo> regions = + admin.getTableRegions(indexTable); + Log.info("Merging: " + regions.size()); + admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(), + regions.get(1).getEncodedNameAsBytes(), false); + merged = true; + Threads.sleep(10000); + } + } catch (Exception ex) { + Log.info(ex); + } + long waitStartTime = System.currentTimeMillis(); + // wait until merge happened + while (System.currentTimeMillis() - waitStartTime < 10000) { + List<HRegionInfo> regions = admin.getTableRegions(indexTable); + Log.info("Waiting:" + regions.size()); + if (regions.size() < numRegions) { + break; } + Threads.sleep(1000); } + SnapshotTestingUtils.waitForTableToBeOnline(BaseTest.getUtility(), indexTable); + assertTrue("Index table should be online ", admin.isTableAvailable(indexTable)); } } finally { dropTable(admin, conn1); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1e2a9675/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java index a8ebe75..1e9151a 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java @@ -185,10 +185,14 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver { public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s, CompactionRequest request) throws IOException { - - if (!IndexUtil.isLocalIndexStore(store) || s != null) { return s; } - Scan scan = new Scan(); - scan.setMaxVersions(store.getFamily().getMaxVersions()); + if (!IndexUtil.isLocalIndexStore(store)) { return s; } + Scan scan = null; + if (s!=null) { + scan = ((StoreScanner)s).scan; + } else { + scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + } if (!store.hasReferences()) { InternalScanner repairScanner = null; if (request.isMajor() && (!RepairUtil.isLocalIndexStoreFilesConsistent(c.getEnvironment(), store))) {
