Repository: phoenix Updated Branches: refs/heads/master a0f47c2be -> 813c1c8aa
PHOENIX-4109 Ensure mutations are processed in batches with same time stamp during partial rebuild Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/813c1c8a Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/813c1c8a Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/813c1c8a Branch: refs/heads/master Commit: 813c1c8aa0f44f0b38ef5bb4c9ed418f693b5c0d Parents: a0f47c2 Author: James Taylor <[email protected]> Authored: Sun Aug 20 16:57:44 2017 -0700 Committer: James Taylor <[email protected]> Committed: Sun Aug 20 16:57:44 2017 -0700 ---------------------------------------------------------------------- .../end2end/index/PartialIndexRebuilderIT.java | 174 ++++++++++++++++--- .../org/apache/phoenix/hbase/index/Indexer.java | 2 +- .../hbase/index/covered/NonTxIndexBuilder.java | 4 +- .../org/apache/phoenix/util/MetaDataUtil.java | 2 +- 4 files changed, 156 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/813c1c8a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java index 0dc9a35..f42d2c9 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -18,9 +18,9 @@ package org.apache.phoenix.end2end.index; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; @@ -29,8 +29,8 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseIOException; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; @@ -38,13 +38,10 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; -import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PMetaData; import org.apache.phoenix.schema.PTable; @@ -54,6 +51,7 @@ import org.apache.phoenix.util.EnvironmentEdge; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexScrutiny; import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.Repeat; import org.apache.phoenix.util.RunUntilFailure; @@ -131,7 +129,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { } } @Test - @Repeat(3) + @Repeat(5) public void testConcurrentUpsertsWithRebuild() throws Throwable { int nThreads = 5; final int batchSize = 200; @@ -145,7 +143,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { Connection conn = DriverManager.getConnection(getUrl()); HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2)) STORE_NULLS=true, VERSIONS=1"); - //addDelayingCoprocessor(conn, tableName); conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + "(v1)"); final CountDownLatch doneSignal1 = new CountDownLatch(nThreads); @@ -177,6 +174,16 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { return false; } + private static boolean hasDisabledIndex(PMetaData metaCache, PTableKey key) throws TableNotFoundException { + PTable table = metaCache.getTableRef(key).getTable(); + for (PTable index : table.getIndexes()) { + if (index.getIndexState() == PIndexState.DISABLE) { + return true; + } + } + return false; + } + private static boolean mutateRandomly(Connection conn, String fullTableName, int nRows, boolean checkForInactive, String fullIndexName) throws SQLException, InterruptedException { PTableKey key = new PTableKey(null,fullTableName); PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache(); @@ -238,7 +245,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { } @Test - @Repeat(3) + @Repeat(5) public void testDeleteAndUpsertAfterFailure() throws Throwable { final int nRows = 10; String schemaName = generateUniqueName(); @@ -261,19 +268,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { } } - private static void addDelayingCoprocessor(Connection conn, String tableName) throws SQLException, IOException { - int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100; - ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); - HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName)); - descriptor.addCoprocessor(DelayingRegionObserver.class.getName(), null, priority, null); - HBaseAdmin admin = services.getAdmin(); - try { - admin.modifyTable(Bytes.toBytes(tableName), descriptor); - } finally { - admin.close(); - } - } - @Test public void testWriteWhileRebuilding() throws Throwable { final int nRows = 10; @@ -498,6 +492,110 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { } @Test + public void testSeparateTimeBatchesRequired() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + PTableKey key = new PTableKey(null,fullTableName); + final MyClock clock = new MyClock(1000); + EnvironmentEdgeManager.injectEdge(clock); + try (Connection conn = DriverManager.getConnection(getUrl())) { + PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache(); + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + clock.time += 100; + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); + clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')"); + conn.commit(); + clock.time += 100; + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE); + clock.time += 100; + long disableTime = clock.currentTime(); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb', '11')"); + conn.commit(); + clock.time += 100; + assertTrue(hasDisabledIndex(metaCache, key)); + assertEquals(2,TestUtil.getRowCount(conn, fullTableName)); + assertEquals(1,TestUtil.getRowCount(conn, fullIndexName)); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','0')"); + conn.commit(); + clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a')"); + conn.commit(); + clock.time += 100; + IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE); + clock.time += 100; + advanceClockUntilPartialRebuildStarts(fullIndexName, clock); + TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + clock.time += 100; + IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); + } finally { + EnvironmentEdgeManager.injectEdge(null); + } + } + + @Test + public void testMultiValuesWhenDisableAndInactive() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + PTableKey key = new PTableKey(null,fullTableName); + final MyClock clock = new MyClock(1000); + EnvironmentEdgeManager.injectEdge(clock); + try (Connection conn = DriverManager.getConnection(getUrl())) { + PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache(); + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + clock.time += 100; + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2) INCLUDE (v3)"); + clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0','x')"); + conn.commit(); + clock.time += 100; + try (HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) { + // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index rebuilder from triggering + IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE); + clock.time += 100; + long disableTime = clock.currentTime(); + // Set some values while index disabled + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb', '11','yy')"); + conn.commit(); + clock.time += 100; + assertTrue(hasDisabledIndex(metaCache, key)); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','222','zzz')"); + conn.commit(); + clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','dddd','3333','zzzz')"); + conn.commit(); + clock.time += 100; + // Will cause partial index rebuilder to be triggered + IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE); + } + final CountDownLatch doneSignal = new CountDownLatch(1); + advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal); + doneSignal.await(30, TimeUnit.SECONDS); + // Set some values while index is in INACTIVE state + clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee','44444','zzzzz')"); + conn.commit(); + clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','fffff','55555','zzzzzz')"); + conn.commit(); + clock.time += WAIT_AFTER_DISABLED; + // Enough time has passed, so rebuild will start now + TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + clock.time += 100; + IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); + } finally { + EnvironmentEdgeManager.injectEdge(null); + } + } + + @Test public void testDeleteAndUpsertValuesAtSameTS1() throws Throwable { String schemaName = generateUniqueName(); String tableName = generateUniqueName(); @@ -563,7 +661,37 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { } } - private static void advanceClockUntilPartialRebuildStarts(final String fullIndexName, final MyClock clock) { + @Test + public void testRegionsOnlineCheck() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + PTableKey key = new PTableKey(null,fullTableName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache(); + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a')"); + conn.commit(); + Configuration conf = conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration(); + PTable table = metaCache.getTableRef(key).getTable(); + assertTrue(MetaDataUtil.tableRegionsOnline(conf, table)); + try (HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) { + admin.disableTable(fullTableName); + assertFalse(MetaDataUtil.tableRegionsOnline(conf, table)); + admin.enableTable(fullTableName); + } + assertTrue(MetaDataUtil.tableRegionsOnline(conf, table)); + } + } + + private static void advanceClockUntilPartialRebuildStarts(final String fullIndexName, final MyClock clock) throws InterruptedException { + final CountDownLatch doneSignal = new CountDownLatch(1); + advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal); + clock.time += WAIT_AFTER_DISABLED + 1000; + doneSignal.await(30, TimeUnit.SECONDS); + } + + private static void advanceClockUntilPartialRebuildStarts(final String fullIndexName, final MyClock clock, final CountDownLatch doneSignal) { Runnable r = new Runnable() { @Override public void run() { @@ -573,7 +701,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { Thread.sleep(1000); clock.time += 1000; } - clock.time += WAIT_AFTER_DISABLED + 1000; + doneSignal.countDown(); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/813c1c8a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index e19e619..05e2ae7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -521,7 +521,7 @@ public class Indexer extends BaseRegionObserver { miniBatchOp.setWalEdit(0, edit); } - if (copyMutations) { + if (copyMutations || replayWrite != null) { mutations = IndexManagementUtil.flattenMutationsByTimestamp(mutations); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/813c1c8a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java index c013b5b..50e2c3f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java @@ -93,7 +93,9 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { List<KeyValue> kvs = KeyValueUtil.ensureKeyValues(family); for (KeyValue kv : kvs) { batch.add(kv); - assert(ts == kv.getTimestamp()); + if(ts != kv.getTimestamp()) { + throw new IllegalStateException("Time stamps must match for all cells in a batch"); + } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/813c1c8a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java index b52cb79..502ef37 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java @@ -533,7 +533,7 @@ public class MetaDataUtil { try { hcon = HConnectionManager.getConnection(conf); List<HRegionLocation> locations = hcon.locateRegions( - org.apache.hadoop.hbase.TableName.valueOf(table.getTableName().getBytes())); + org.apache.hadoop.hbase.TableName.valueOf(table.getPhysicalName().getBytes())); for (HRegionLocation loc : locations) { try {
