PHOENIX-4178 Detect failed index write while rebuilder is running with index staying active
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d047fae9 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d047fae9 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d047fae9 Branch: refs/heads/4.x-HBase-0.98 Commit: d047fae97423b51f4b40f2b1793b575af8de451c Parents: afe1e5f Author: James Taylor <[email protected]> Authored: Thu Sep 21 18:24:39 2017 -0700 Committer: James Taylor <[email protected]> Committed: Fri Sep 22 22:00:44 2017 -0700 ---------------------------------------------------------------------- .../end2end/index/MutableIndexFailureIT.java | 12 +- .../end2end/index/PartialIndexRebuilderIT.java | 459 +++++++++++-------- .../coprocessor/MetaDataEndpointImpl.java | 179 ++++---- .../phoenix/coprocessor/MetaDataProtocol.java | 6 +- .../coprocessor/MetaDataRegionObserver.java | 80 +--- .../apache/phoenix/execute/MutationState.java | 10 +- .../apache/phoenix/index/IndexMaintainer.java | 25 +- .../index/PhoenixIndexFailurePolicy.java | 12 +- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 1 + .../apache/phoenix/optimize/QueryOptimizer.java | 6 +- .../phoenix/query/QueryServicesOptions.java | 16 +- .../org/apache/phoenix/schema/PIndexState.java | 3 +- .../java/org/apache/phoenix/util/IndexUtil.java | 14 +- .../phoenix/query/QueryServicesTestImpl.java | 10 +- .../java/org/apache/phoenix/util/TestUtil.java | 41 +- 15 files changed, 496 insertions(+), 378 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d047fae9/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index a24e93f..0163620 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; @@ -36,7 +37,6 @@ import java.util.Properties; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -138,12 +138,12 @@ public class MutableIndexFailureIT extends BaseTest { // need to override rpc retries otherwise test doesn't pass serverProps.put(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, Long.toString(numRpcRetries)); serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(forwardOverlapMs)); + serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, Long.toString(disableTimestampThresholdMs)); /* * Effectively disable running the index rebuild task by having an infinite delay * because we want to control it's execution ourselves */ serverProps.put(QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY, Long.toString(Long.MAX_VALUE)); - serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, Long.toString(disableTimestampThresholdMs)); Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); NUM_SLAVES_BASE = 4; setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); @@ -299,10 +299,10 @@ public class MutableIndexFailureIT extends BaseTest { assertTrue(rs.next()); assertEquals(indexName, rs.getString(3)); // the index is only disabled for non-txn tables upon index table write failure + String indexState = rs.getString("INDEX_STATE"); if (transactional || leaveIndexActiveOnFailure) { - assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); + assertTrue(PIndexState.ACTIVE.toString().equals(indexState) || PIndexState.PENDING_ACTIVE.toString().equals(indexState)); } else { - String indexState = rs.getString("INDEX_STATE"); assertTrue(PIndexState.DISABLE.toString().equals(indexState) || PIndexState.INACTIVE.toString().equals(indexState)); } assertFalse(rs.next()); @@ -311,7 +311,7 @@ public class MutableIndexFailureIT extends BaseTest { // in an all or none manner. If the table is not transactional, then the data writes // would have succeeded while the index writes would have failed. if (!transactional) { - updateTableAgain(conn, leaveIndexActiveOnFailure); + updateTableAgain(conn, false); // Verify previous writes succeeded to data table query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName; rs = conn.createStatement().executeQuery("EXPLAIN " + query); @@ -518,7 +518,7 @@ public class MutableIndexFailureIT extends BaseTest { public static final String FAIL_TABLE_NAME = "FAIL_TABLE"; @Override - public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException { + public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { boolean throwException = false; if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" + FAIL_INDEX_NAME) && FAIL_WRITE) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d047fae9/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java index dfe5a28..9095dbe 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.Map; import java.util.Random; @@ -32,7 +33,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; @@ -40,11 +41,13 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.MetaDataRegionObserver; +import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; +import org.apache.phoenix.execute.CommitException; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; -import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PMetaData; @@ -64,23 +67,68 @@ import org.apache.phoenix.util.TestUtil; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Maps; +@SuppressWarnings("deprecation") @RunWith(RunUntilFailure.class) public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { + private static final Logger LOG = LoggerFactory.getLogger(PartialIndexRebuilderIT.class); private static final Random RAND = new Random(5); private static final int WAIT_AFTER_DISABLED = 5000; - private static final int REBUILD_INTERVAL = 2000; + private static final long REBUILD_PERIOD = 50000; + private static final long REBUILD_INTERVAL = 2000; + private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment; + @BeforeClass public static void doSetup() throws Exception { Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10); serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString()); serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, Long.toString(REBUILD_INTERVAL)); - serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "120000"); // give up rebuilding after 2 minutes + serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "300000"); // give up rebuilding after 5 minutes + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, Long.toString(REBUILD_PERIOD)); // batch at 50 seconds serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(WAIT_AFTER_DISABLED)); setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS); + indexRebuildTaskRegionEnvironment = + (RegionCoprocessorEnvironment) getUtility() + .getRSForFirstRegionInTable( + PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) + .getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) + .get(0).getCoprocessorHost() + .findCoprocessorEnvironment(MetaDataRegionObserver.class.getName()); + MetaDataRegionObserver.initRebuildIndexConnectionProps( + indexRebuildTaskRegionEnvironment.getConfiguration()); + } + + private static void runIndexRebuilder() throws InterruptedException, SQLException { + BuildIndexScheduleTask task = + new MetaDataRegionObserver.BuildIndexScheduleTask( + indexRebuildTaskRegionEnvironment); + task.run(); + } + + private static void runIndexRebuilderAsync(final int interval, final boolean[] cancel) { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + while (!cancel[0]) { + try { + runIndexRebuilder(); + Thread.sleep(interval); + } catch (InterruptedException e) { + Thread.interrupted(); + throw new RuntimeException(e); + } catch (SQLException e) { + LOG.error(e.getMessage(),e); + } + } + } + }); + thread.setDaemon(true); + thread.start(); } private static void mutateRandomly(final String fullTableName, final int nThreads, final int nRows, final int nIndexValues, final int batchSize, final CountDownLatch doneSignal) { @@ -134,7 +182,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { } } @Test - @Repeat(5) public void testConcurrentUpsertsWithRebuild() throws Throwable { int nThreads = 5; final int batchSize = 200; @@ -155,12 +202,17 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { assertTrue("Ran out of time", doneSignal1.await(120, TimeUnit.SECONDS)); IndexUtil.updateIndexState(fullIndexName, EnvironmentEdgeManager.currentTimeMillis(), metaTable, PIndexState.DISABLE); - do { - final CountDownLatch doneSignal2 = new CountDownLatch(nThreads); - mutateRandomly(fullTableName, nThreads, nRows, nIndexValues, batchSize, doneSignal2); - assertTrue("Ran out of time", doneSignal2.await(500, TimeUnit.SECONDS)); - } while (!TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); - + boolean[] cancel = new boolean[1]; + try { + do { + final CountDownLatch doneSignal2 = new CountDownLatch(nThreads); + runIndexRebuilderAsync(500,cancel); + mutateRandomly(fullTableName, nThreads, nRows, nIndexValues, batchSize, doneSignal2); + assertTrue("Ran out of time", doneSignal2.await(500, TimeUnit.SECONDS)); + } while (!TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); + } finally { + cancel[0] = true; + } long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); assertEquals(nRows, actualRowCount); } @@ -172,17 +224,21 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { private static boolean hasInactiveIndex(PMetaData metaCache, PTableKey key) throws TableNotFoundException { PTable table = metaCache.getTableRef(key).getTable(); for (PTable index : table.getIndexes()) { - if (index.getIndexState() == PIndexState.ACTIVE) { + if (index.getIndexState() == PIndexState.INACTIVE) { return true; } } return false; } - + private static boolean hasDisabledIndex(PMetaData metaCache, PTableKey key) throws TableNotFoundException { + return hasIndexWithState(metaCache, key, PIndexState.DISABLE); + } + + private static boolean hasIndexWithState(PMetaData metaCache, PTableKey key, PIndexState expectedState) throws TableNotFoundException { PTable table = metaCache.getTableRef(key).getTable(); for (PTable index : table.getIndexes()) { - if (index.getIndexState() == PIndexState.DISABLE) { + if (index.getIndexState() == expectedState) { return true; } } @@ -304,8 +360,14 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { long disableTS = EnvironmentEdgeManager.currentTimeMillis(); HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); - mutateRandomly(conn, fullTableName, nRows); - TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + boolean[] cancel = new boolean[1]; + try { + runIndexRebuilderAsync(500,cancel); + mutateRandomly(conn, fullTableName, nRows); + TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + } finally { + cancel[0] = true; + } long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); assertEquals(nRows,actualRowCount); @@ -346,12 +408,16 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { t.start(); long disableTS = EnvironmentEdgeManager.currentTimeMillis(); IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); - TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); - doneSignal.await(60, TimeUnit.SECONDS); + boolean[] cancel = new boolean[1]; + try { + runIndexRebuilderAsync(500,cancel); + TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + doneSignal.await(60, TimeUnit.SECONDS); + } finally { + cancel[0] = true; + } assertTrue(hasInactiveIndex[0]); - TestUtil.dumpIndexStatus(conn, fullIndexName); - long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); assertEquals(nRows,actualRowCount); @@ -379,7 +445,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn.commit(); conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee')"); conn.commit(); - TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + runIndexRebuilder(); + Thread.sleep(WAIT_AFTER_DISABLED); + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } @@ -406,7 +475,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn.commit(); conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); conn.commit(); - TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + runIndexRebuilder(); + Thread.sleep(WAIT_AFTER_DISABLED); + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } @@ -433,7 +505,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn.commit(); conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)"); conn.commit(); - TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + runIndexRebuilder(); + Thread.sleep(WAIT_AFTER_DISABLED); + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } @@ -458,7 +533,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn.commit(); conn.createStatement().execute("DELETE FROM " + fullTableName); conn.commit(); - TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + runIndexRebuilder(); + Thread.sleep(WAIT_AFTER_DISABLED); + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } @@ -483,7 +561,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','b')"); conn.commit(); - TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + runIndexRebuilder(); + Thread.sleep(WAIT_AFTER_DISABLED); + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } @@ -526,9 +607,11 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); conn.commit(); clock.time += 1000; - advanceClockUntilPartialRebuildStarts(fullIndexName, clock); - TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); - clock.time += 100; + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null)); + clock.time += WAIT_AFTER_DISABLED; + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } finally { EnvironmentEdgeManager.injectEdge(null); @@ -536,7 +619,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { } @Test - public void testSeparateTimeBatchesRequired() throws Throwable { + public void testTimeBatchesInCoprocessorRequired() throws Throwable { String schemaName = generateUniqueName(); String tableName = generateUniqueName(); String indexName = generateUniqueName(); @@ -572,9 +655,11 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { clock.time += 100; IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE); clock.time += 100; - advanceClockUntilPartialRebuildStarts(fullIndexName, clock); - TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); - clock.time += 100; + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null)); + clock.time += WAIT_AFTER_DISABLED; + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } finally { EnvironmentEdgeManager.injectEdge(null); @@ -582,7 +667,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { } @Test - public void testMultiValuesWhenDisableAndInactive() throws Throwable { + public void testBatchingDuringRebuild() throws Throwable { String schemaName = generateUniqueName(); String tableName = generateUniqueName(); String indexName = generateUniqueName(); @@ -593,55 +678,45 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { EnvironmentEdgeManager.injectEdge(clock); try (Connection conn = DriverManager.getConnection(getUrl())) { PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache(); - conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); clock.time += 100; - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2) INCLUDE (v3)"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); clock.time += 100; - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0','x')"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')"); conn.commit(); clock.time += 100; - try (HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) { - // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index rebuilder from triggering - IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE); - clock.time += 100; - long disableTime = clock.currentTime(); - // Set some values while index disabled - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb', '11','yy')"); - conn.commit(); - clock.time += 100; - assertTrue(hasDisabledIndex(metaCache, key)); - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','222','zzz')"); - conn.commit(); - clock.time += 100; - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','dddd','3333','zzzz')"); - conn.commit(); - clock.time += 100; - // Will cause partial index rebuilder to be triggered - IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE); - } - final CountDownLatch doneSignal = new CountDownLatch(1); - advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal); - doneSignal.await(30, TimeUnit.SECONDS); - // Set some values while index is in INACTIVE state + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + long disableTime = clock.currentTime(); + IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE); clock.time += 100; - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee','44444','zzzzz')"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('bb','bb', '11')"); conn.commit(); - clock.time += 100; - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','fffff','55555','zzzzzz')"); + clock.time += REBUILD_PERIOD; + assertTrue(hasDisabledIndex(metaCache, key)); + assertEquals(2,TestUtil.getRowCount(conn, fullTableName)); + assertEquals(1,TestUtil.getRowCount(conn, fullIndexName)); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('ccc','ccc','222')"); conn.commit(); - clock.time += WAIT_AFTER_DISABLED; - // Enough time has passed, so rebuild will start now - TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + assertEquals(3,TestUtil.getRowCount(conn, fullTableName)); + assertEquals(1,TestUtil.getRowCount(conn, fullIndexName)); clock.time += 100; + + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null)); + clock.time += WAIT_AFTER_DISABLED; + runIndexRebuilder(); + assertEquals(2,TestUtil.getRowCount(conn, fullIndexName)); + + clock.time += REBUILD_PERIOD; + runIndexRebuilder(); + // Verify that other batches were processed + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } finally { EnvironmentEdgeManager.injectEdge(null); } } - - private final static CountDownLatch WAIT_FOR_REBUILD_TO_START = new CountDownLatch(1); - private final static CountDownLatch WAIT_FOR_INDEX_WRITE = new CountDownLatch(1); - + @Test public void testUpperBoundSetOnRebuild() throws Throwable { String schemaName = generateUniqueName(); @@ -665,7 +740,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a', '0')"); conn.commit(); // Set clock forward in time past the "overlap" amount we wait for index maintenance to kick in - clock.time += 10 * WAIT_AFTER_DISABLED; + clock.time += 2 * WAIT_AFTER_DISABLED; assertTrue(hasDisabledIndex(metaCache, key)); assertEquals(1,TestUtil.getRowCount(conn, fullTableName)); assertEquals(0,TestUtil.getRowCount(conn, fullIndexName)); @@ -676,9 +751,11 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { // Set clock back in time and start rebuild clock.time = disableTime + 100; IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE); - advanceClockUntilPartialRebuildStarts(fullIndexName, clock); - clock.time += REBUILD_INTERVAL; - waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE, clock, REBUILD_INTERVAL); + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null)); + clock.time += WAIT_AFTER_DISABLED; + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); assertEquals(2,TestUtil.getRowCount(conn, fullTableName)); // If an upper bound was set on the rebuilder, we should only have found one row assertEquals(1,TestUtil.getRowCount(conn, fullIndexName)); @@ -687,21 +764,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { } } - private static void waitForIndexRebuild(Connection conn, String fullIndexName, PIndexState expectedIndexState, MyClock clock, long increment) throws InterruptedException, SQLException { - int maxTries = 60, nTries = 0; - do { - Thread.sleep(1000); // sleep 1 sec - clock.time += increment; - if (TestUtil.checkIndexState(conn, fullIndexName, expectedIndexState, 0L)) { - return; - } - } while (++nTries < maxTries); - fail("Ran out of time waiting for index state to become " + expectedIndexState); - } - - @Test - public void testDisableIndexDuringRebuild() throws Throwable { + public void testMultiValuesWhenDisableAndInactive() throws Throwable { String schemaName = generateUniqueName(); String tableName = generateUniqueName(); String indexName = generateUniqueName(); @@ -737,47 +801,21 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { clock.time += 100; // Will cause partial index rebuilder to be triggered IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE); - final CountDownLatch doneSignal = new CountDownLatch(1); - advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal); - // Set some values while index is in INACTIVE state - clock.time += 100; - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee','44444','zzzzz')"); - conn.commit(); - clock.time += 100; - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','fffff','55555','zzzzzz')"); - conn.commit(); - doneSignal.await(30, TimeUnit.SECONDS); - // Install coprocessor that will simulate an index write failure during index rebuild - TestUtil.addCoprocessor(conn,fullIndexName,WriteFailingRegionObserver.class); - clock.time += WAIT_AFTER_DISABLED; - doneSignal.await(30, TimeUnit.SECONDS); - WAIT_FOR_REBUILD_TO_START.await(30, TimeUnit.SECONDS); - // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index rebuilder from triggering - IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE); - clock.time += 100; - disableTime = clock.currentTime(); - // Set some values while index disabled - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bbbbb', '11','yy')"); - conn.commit(); - clock.time += 100; - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','cccccc','222','zzz')"); - conn.commit(); - clock.time += 100; - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ddddddd','3333','zzzz')"); - conn.commit(); - clock.time += 100; - // Simulates another write failure. Should cause current run of rebuilder to fail and retry again later - IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE); - removeWriteFailingCoprocessor(conn,fullIndexName); - WAIT_FOR_INDEX_WRITE.countDown(); } - // Original rebuilder should have failed - - advanceClockUntilPartialRebuildStarts(fullIndexName, clock); - clock.time += WAIT_AFTER_DISABLED * 2; - // Enough time has passed, so rebuild will start now - TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null)); + + // Set some values while index is in INACTIVE state clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee','44444','zzzzz')"); + conn.commit(); + clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','fffff','55555','zzzzzz')"); + conn.commit(); + clock.time += WAIT_AFTER_DISABLED; + // Enough time has passed, so rebuild will start now + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } finally { EnvironmentEdgeManager.injectEdge(null); @@ -785,6 +823,100 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { } @Test + public void testIndexWriteFailureDisablingIndex() throws Throwable { + testIndexWriteFailureDuringRebuild(PIndexState.DISABLE); + } + + @Test + public void testIndexWriteFailureLeavingIndexActive() throws Throwable { + testIndexWriteFailureDuringRebuild(PIndexState.PENDING_ACTIVE); + } + + private void testIndexWriteFailureDuringRebuild(PIndexState indexStateOnFailure) throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + PTableKey key = new PTableKey(null,fullTableName); + final MyClock clock = new MyClock(1000); + EnvironmentEdgeManager.injectEdge(clock); + try (Connection conn = DriverManager.getConnection(getUrl())) { + PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache(); + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, DISABLE_INDEX_ON_WRITE_FAILURE = " + (indexStateOnFailure == PIndexState.DISABLE)); + clock.time += 100; + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); + clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')"); + conn.commit(); + clock.time += 100; + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + + long disableTime = clock.currentTime(); + // Simulates an index write failure + IndexUtil.updateIndexState(fullIndexName, indexStateOnFailure == PIndexState.DISABLE ? disableTime : -disableTime, metaTable, indexStateOnFailure); + + clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('bb','bb', '11')"); + conn.commit(); + + // Large enough to be in separate time batch + clock.time += 2 * REBUILD_PERIOD; + assertTrue(hasIndexWithState(metaCache, key, indexStateOnFailure)); + assertEquals(2,TestUtil.getRowCount(conn, fullTableName)); + assertEquals(1,TestUtil.getRowCount(conn, fullIndexName)); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('ccc','ccc','222')"); + conn.commit(); + assertEquals(3,TestUtil.getRowCount(conn, fullTableName)); + assertEquals(1,TestUtil.getRowCount(conn, fullIndexName)); + clock.time += 100; + + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, indexStateOnFailure == PIndexState.DISABLE ? PIndexState.INACTIVE : PIndexState.ACTIVE, null)); + clock.time += WAIT_AFTER_DISABLED; + + // First batch should have been processed + runIndexRebuilder(); + assertEquals(2,TestUtil.getRowCount(conn, fullIndexName)); + + // Simulate write failure + TestUtil.addCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('dddd','dddd','3333')"); + try { + conn.commit(); + fail(); + } catch (CommitException e) { + // Expected + } + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, indexStateOnFailure, null)); + PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); + ResultSet rs = stmt.executeQuery("SELECT V2 FROM " + fullTableName + " WHERE V1 = 'a'"); + assertTrue(rs.next()); + assertEquals("0", rs.getString(1)); + assertEquals(indexStateOnFailure == PIndexState.DISABLE ? fullTableName : fullIndexName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString()); + TestUtil.removeCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class); + + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, indexStateOnFailure == PIndexState.DISABLE ? PIndexState.INACTIVE : PIndexState.ACTIVE, null)); + clock.time += WAIT_AFTER_DISABLED; + + // First batch should have been processed again because we started over + runIndexRebuilder(); + assertEquals(3,TestUtil.getRowCount(conn, fullIndexName)); + + clock.time += 2 * REBUILD_PERIOD; + // Second batch should have been processed now + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); + + // Verify that other batches were processed + IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); + } finally { + EnvironmentEdgeManager.injectEdge(null); + } + } + + @Test public void testDeleteAndUpsertValuesAtSameTS1() throws Throwable { String schemaName = generateUniqueName(); String tableName = generateUniqueName(); @@ -808,9 +940,11 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); conn.commit(); clock.time += 1000; - advanceClockUntilPartialRebuildStarts(fullIndexName, clock); - TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); - clock.time += 100; + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null)); + clock.time += WAIT_AFTER_DISABLED; + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } finally { EnvironmentEdgeManager.injectEdge(null); @@ -841,9 +975,11 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE k='a'"); conn.commit(); clock.time += 1000; - advanceClockUntilPartialRebuildStarts(fullIndexName, clock); - TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); - clock.time += 100; + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null)); + clock.time += WAIT_AFTER_DISABLED; + runIndexRebuilder(); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } finally { EnvironmentEdgeManager.injectEdge(null); @@ -873,65 +1009,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { } } - private static void advanceClockUntilPartialRebuildStarts(final String fullIndexName, final MyClock clock) throws InterruptedException { - final CountDownLatch doneSignal = new CountDownLatch(1); - advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal); - clock.time += WAIT_AFTER_DISABLED + 1000; - doneSignal.await(30, TimeUnit.SECONDS); - } - - private static void advanceClockUntilPartialRebuildStarts(final String fullIndexName, final MyClock clock, final CountDownLatch doneSignal) { - Runnable r = new Runnable() { - @Override - public void run() { - try (Connection conn = DriverManager.getConnection(getUrl())) { - int nTries = 10; - while (--nTries >0 && !TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null)) { - Thread.sleep(1000); - clock.time += 1000; - } - doneSignal.countDown(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - Thread t = new Thread(r); - t.setDaemon(true); - t.start(); - } - - private static void removeWriteFailingCoprocessor(Connection conn, String tableName) throws Exception { - ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); - HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName)); - descriptor.removeCoprocessor(WriteFailingRegionObserver.class.getName()); - int numTries = 10; - try (HBaseAdmin admin = services.getAdmin()) { - admin.modifyTable(Bytes.toBytes(tableName), descriptor); - while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor) - && numTries > 0) { - numTries--; - if (numTries == 0) { - throw new Exception( - "Check to detect if delaying co-processor was removed failed after " - + numTries + " retries."); - } - Thread.sleep(1000); - } - } - } - public static class WriteFailingRegionObserver extends SimpleRegionObserver { @Override public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { - WAIT_FOR_REBUILD_TO_START.countDown(); - try { - WAIT_FOR_INDEX_WRITE.await(30, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.interrupted(); - throw new IOException(e); - } + throw new DoNotRetryIOException("Simulating write failure on " + c.getEnvironment().getRegionInfo().getTable().getNameAsString()); } } - } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d047fae9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 04e1604..7e9450b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -514,7 +514,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } long currentTime = EnvironmentEdgeManager.currentTimeMillis(); - PTable table = doGetTable(key, request.getClientTimestamp()); + PTable table = doGetTable(key, request.getClientTimestamp(), request.getClientVersion()); if (table == null) { builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND); builder.setMutationTime(currentTime); @@ -526,7 +526,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? disableIndexTimestamp : Long.MAX_VALUE; for (PTable index : table.getIndexes()) { disableIndexTimestamp = index.getIndexDisableTimestamp(); - if (disableIndexTimestamp > 0 && index.getIndexState() == PIndexState.ACTIVE && disableIndexTimestamp < minNonZerodisableIndexTimestamp) { + if (disableIndexTimestamp > 0 && (index.getIndexState() == PIndexState.ACTIVE || index.getIndexState() == PIndexState.PENDING_ACTIVE) && disableIndexTimestamp < minNonZerodisableIndexTimestamp) { minNonZerodisableIndexTimestamp = disableIndexTimestamp; } } @@ -553,7 +553,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, - long clientTimeStamp) throws IOException, SQLException { + long clientTimeStamp, int clientVersion) throws IOException, SQLException { Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp); Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); try (RegionScanner scanner = region.getScanner(scan)) { @@ -562,7 +562,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso PTable newTable; boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); - newTable = getTable(scanner, clientTimeStamp, tableTimeStamp); + newTable = getTable(scanner, clientTimeStamp, tableTimeStamp, clientVersion); if (newTable == null) { return null; } @@ -645,9 +645,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } - private void addIndexToTable(PName tenantId, PName schemaName, PName indexName, PName tableName, long clientTimeStamp, List<PTable> indexes) throws IOException, SQLException { + private void addIndexToTable(PName tenantId, PName schemaName, PName indexName, PName tableName, long clientTimeStamp, List<PTable> indexes, int clientVersion) throws IOException, SQLException { byte[] key = SchemaUtil.getTableKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(), schemaName.getBytes(), indexName.getBytes()); - PTable indexTable = doGetTable(key, clientTimeStamp); + PTable indexTable = doGetTable(key, clientTimeStamp, clientVersion); if (indexTable == null) { ServerUtil.throwIOException("Index not found", new TableNotFoundException(schemaName.getString(), indexName.getString())); return; @@ -795,7 +795,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso arguments.add(arg); } - private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp) + private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp, int clientVersion) throws IOException, SQLException { List<Cell> results = Lists.newArrayList(); scanner.next(results); @@ -896,6 +896,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso PIndexState indexState = indexStateKv == null ? null : PIndexState.fromSerializedValue(indexStateKv .getValueArray()[indexStateKv.getValueOffset()]); + // If client is not yet up to 4.12, then translate PENDING_ACTIVE to ACTIVE (as would have been + // the value in those versions) since the client won't have this index state in its enum. + if (indexState == PIndexState.PENDING_ACTIVE && clientVersion < PhoenixDatabaseMetaData.MIN_PENDING_ACTIVE_INDEX) { + indexState = PIndexState.ACTIVE; + } Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX]; boolean isImmutableRows = immutableRowsKv == null ? false : (Boolean) PBoolean.INSTANCE.toObject( @@ -981,7 +986,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } else if (Bytes.compareTo(LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length, colKv.getQualifierArray(), colKv.getQualifierOffset(), colKv.getQualifierLength())==0) { LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]); if (linkType == LinkType.INDEX_TABLE) { - addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes); + addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes, clientVersion); } else if (linkType == LinkType.PHYSICAL_TABLE) { physicalTables.add(famName); } else if (linkType == LinkType.PARENT_TABLE) { @@ -1247,13 +1252,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key, - ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp) + ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp, int clientVersion) throws IOException, SQLException { HRegion region = env.getRegion(); Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); PTable table = (PTable)metaDataCache.getIfPresent(cacheKey); // We always cache the latest version - fault in if not in cache - if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp)) != null) { + if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp, clientVersion)) != null) { return table; } // if not found then check if newer table already exists and add delete marker for timestamp @@ -1339,6 +1344,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] schemaName = null; byte[] tableName = null; try { + int clientVersion = request.getClientVersion(); List<Mutation> tableMetadata = ProtobufUtil.getMutations(request); MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData); byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; @@ -1400,7 +1406,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso acquireLock(region, parentTableKey, locks); parentCacheKey = new ImmutableBytesPtr(parentTableKey); parentTable = loadTable(env, parentTableKey, parentCacheKey, clientTimeStamp, - clientTimeStamp); + clientTimeStamp, clientVersion); if (parentTable == null || isTableDeleted(parentTable)) { builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND); builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); @@ -1433,7 +1439,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // Get as of latest timestamp so we can detect if we have a newer table that already // exists without making an additional query PTable table = - loadTable(env, tableKey, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP); + loadTable(env, tableKey, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP, clientVersion); if (table != null) { if (table.getTimeStamp() < clientTimeStamp) { // If the table is older than the client time stamp and it's deleted, @@ -1639,8 +1645,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } private void findAllChildViews(HRegion region, byte[] tenantId, PTable table, - TableViewFinder result, long clientTimeStamp) throws IOException, SQLException { - TableViewFinder currResult = findChildViews(region, tenantId, table); + TableViewFinder result, long clientTimeStamp, int clientVersion) throws IOException, SQLException { + TableViewFinder currResult = findChildViews(region, tenantId, table, clientVersion); result.addResult(currResult); for (ViewInfo viewInfo : currResult.getViewInfoList()) { byte[] viewtenantId = viewInfo.getTenantId(); @@ -1648,8 +1654,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] viewTable = viewInfo.getViewName(); byte[] tableKey = SchemaUtil.getTableKey(viewtenantId, viewSchema, viewTable); ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey); - PTable view = loadTable(env, tableKey, cacheKey, clientTimeStamp, clientTimeStamp); - findAllChildViews(region, viewtenantId, view, result, clientTimeStamp); + PTable view = loadTable(env, tableKey, cacheKey, clientTimeStamp, clientTimeStamp, clientVersion); + findAllChildViews(region, viewtenantId, view, result, clientTimeStamp, clientVersion); } } @@ -1773,7 +1779,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final byte[] PHYSICAL_TABLE_BYTES = new byte[] { PTable.LinkType.PHYSICAL_TABLE.getSerializedValue() }; - private TableViewFinder findChildViews(HRegion region, byte[] tenantId, PTable table) + private TableViewFinder findChildViews(HRegion region, byte[] tenantId, PTable table, int clientVersion) throws IOException, SQLException { byte[] tableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY, @@ -1782,7 +1788,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey); PTable systemCatalog = loadTable(env, tableKey, cacheKey, MIN_SYSTEM_TABLE_TIMESTAMP, - HConstants.LATEST_TIMESTAMP); + HConstants.LATEST_TIMESTAMP, clientVersion); if (systemCatalog.getTimeStamp() < MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0) { return findChildViews_deprecated(region, tenantId, table, PHYSICAL_TABLE_BYTES); } else { @@ -1840,7 +1846,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso result = doDropTable(key, tenantIdBytes, schemaName, tableName, parentTableName, PTableType.fromSerializedValue(tableType), tableMetadata, - invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, isCascade); + invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, isCascade, request.getClientVersion()); if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { done.run(MetaDataMutationResult.toProto(result)); return; @@ -1871,7 +1877,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName, byte[] tableName, byte[] parentTableName, PTableType tableType, List<Mutation> rowsToDelete, List<ImmutableBytesPtr> invalidateList, List<RowLock> locks, - List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, boolean isCascade) throws IOException, SQLException { + List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, boolean isCascade, int clientVersion) throws IOException, SQLException { long clientTimeStamp = MetaDataUtil.getClientTimeStamp(rowsToDelete); @@ -1884,7 +1890,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // We always cache the latest version - fault in if not in cache if (table != null - || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) != null) { + || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion)) != null) { if (table.getTimeStamp() < clientTimeStamp) { if (isTableDeleted(table) || tableType != table.getType()) { return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null); @@ -1921,7 +1927,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (tableType == PTableType.TABLE || tableType == PTableType.SYSTEM) { // Handle any child views that exist - TableViewFinder tableViewFinderResult = findChildViews(region, tenantId, table); + TableViewFinder tableViewFinderResult = findChildViews(region, tenantId, table, clientVersion); if (tableViewFinderResult.hasViews()) { if (isCascade) { if (tableViewFinderResult.allViewsInMultipleRegions()) { @@ -1941,7 +1947,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso acquireLock(region, viewKey, locks); MetaDataMutationResult result = doDropTable(viewKey, viewTenantId, viewSchemaName, viewName, null, PTableType.VIEW, rowsToDelete, invalidateList, locks, - tableNamesToDelete, sharedTablesToDelete, false); + tableNamesToDelete, sharedTablesToDelete, false, clientVersion); if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { return result; } } } @@ -2004,7 +2010,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso acquireLock(region, indexKey, locks); MetaDataMutationResult result = doDropTable(indexKey, tenantId, schemaName, indexName, tableName, PTableType.INDEX, - rowsToDelete, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false); + rowsToDelete, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false, clientVersion); if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { return result; } @@ -2022,7 +2028,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } private MetaDataMutationResult - mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator) throws IOException { + mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator, int clientVersion) throws IOException { byte[][] rowKeyMetaData = new byte[5][]; MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData); byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; @@ -2056,7 +2062,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // Get client timeStamp from mutations long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata); if (table == null - && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) == null) { + && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion)) == null) { // if not found then call newerTableExists and add delete marker for timestamp // found table = buildDeletedTable(key, cacheKey, region, clientTimeStamp); @@ -2127,7 +2133,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (result !=null) { return result; } else { - table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP); + table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion); return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, table); } } finally { @@ -2286,7 +2292,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private MetaDataMutationResult addColumnsAndTablePropertiesToChildViews(PTable basePhysicalTable, List<Mutation> tableMetadata, List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName, List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, TableViewFinder childViewsResult, - HRegion region, List<RowLock> locks) throws IOException, SQLException { + HRegion region, List<RowLock> locks, int clientVersion) throws IOException, SQLException { List<PutWithOrdinalPosition> columnPutsForBaseTable = Lists.newArrayListWithExpectedSize(tableMetadata.size()); Map<TableProperty, Cell> tablePropertyCellMap = Maps.newHashMapWithExpectedSize(tableMetadata.size()); // Isolate the puts relevant to adding columns. Also figure out what kind of columns are being added. @@ -2333,7 +2339,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // lock the rows corresponding to views so that no other thread can modify the view meta-data RowLock viewRowLock = acquireLock(region, viewKey, locks); - PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock); + PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock, clientVersion); ColumnOrdinalPositionUpdateList ordinalPositionList = new ColumnOrdinalPositionUpdateList(); List<PColumn> viewPkCols = new ArrayList<>(view.getPKColumns()); @@ -2632,7 +2638,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso PTable basePhysicalTable, List<RowLock> locks, List<Mutation> tableMetadata, List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName, List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, - TableViewFinder childViewsResult, List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete) + TableViewFinder childViewsResult, List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, int clientVersion) throws IOException, SQLException { List<Delete> columnDeletesForBaseTable = new ArrayList<>(tableMetadata.size()); // Isolate the deletes relevant to dropping columns. Also figure out what kind of columns @@ -2658,7 +2664,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // lock the rows corresponding to views so that no other thread can modify the view // meta-data RowLock viewRowLock = acquireLock(region, viewKey, locks); - PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock); + PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock, clientVersion); ColumnOrdinalPositionUpdateList ordinalPositionList = new ColumnOrdinalPositionUpdateList(); @@ -2729,7 +2735,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso dropIndexes(view, region, invalidateList, locks, clientTimeStamp, schemaName, view.getName().getBytes(), mutationsForAddingColumnsToViews, existingViewColumn, - tableNamesToDelete, sharedTablesToDelete); + tableNamesToDelete, sharedTablesToDelete, clientVersion); } } @@ -2951,7 +2957,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso List<Mutation> mutationsForAddingColumnsToViews = Lists.newArrayListWithExpectedSize(tableMetaData.size() * ( 1 + table.getIndexes().size())); if (type == PTableType.TABLE || type == PTableType.SYSTEM) { TableViewFinder childViewsResult = new TableViewFinder(); - findAllChildViews(region, tenantId, table, childViewsResult, clientTimeStamp); + findAllChildViews(region, tenantId, table, childViewsResult, clientTimeStamp, request.getClientVersion()); if (childViewsResult.hasViews()) { /* * Dis-allow if: @@ -2976,7 +2982,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } else { mutationsForAddingColumnsToViews = new ArrayList<>(childViewsResult.getViewInfoList().size() * tableMetaData.size()); MetaDataMutationResult mutationResult = addColumnsAndTablePropertiesToChildViews(table, tableMetaData, mutationsForAddingColumnsToViews, schemaName, tableName, invalidateList, clientTimeStamp, - childViewsResult, region, locks); + childViewsResult, region, locks, request.getClientVersion()); // return if we were not able to add the column successfully if (mutationResult!=null) return mutationResult; @@ -3067,7 +3073,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso tableMetaData.addAll(mutationsForAddingColumnsToViews); return null; } - }); + }, request.getClientVersion()); if (result != null) { done.run(MetaDataMutationResult.toProto(result)); } @@ -3078,11 +3084,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } - private PTable doGetTable(byte[] key, long clientTimeStamp) throws IOException, SQLException { - return doGetTable(key, clientTimeStamp, null); + private PTable doGetTable(byte[] key, long clientTimeStamp, int clientVersion) throws IOException, SQLException { + return doGetTable(key, clientTimeStamp, null, clientVersion); } - private PTable doGetTable(byte[] key, long clientTimeStamp, RowLock rowLock) throws IOException, SQLException { + private PTable doGetTable(byte[] key, long clientTimeStamp, RowLock rowLock, int clientVersion) throws IOException, SQLException { ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key); Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); @@ -3134,13 +3140,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return table; } // Query for the latest table first, since it's not cached - table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP); + table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion); if ((table != null && table.getTimeStamp() < clientTimeStamp) || (blockWriteRebuildIndex && table.getIndexDisableTimestamp() > 0)) { return table; } // Otherwise, query for an older version of the table - it won't be cached - return buildTable(key, cacheKey, region, clientTimeStamp); + return buildTable(key, cacheKey, region, clientTimeStamp, clientVersion); } finally { if (!wasLocked) rowLock.release(); } @@ -3207,7 +3213,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } @Override - public void dropColumn(RpcController controller, DropColumnRequest request, + public void dropColumn(RpcController controller, final DropColumnRequest request, RpcCallback<MetaDataResponse> done) { List<Mutation> tableMetaData = null; final List<byte[]> tableNamesToDelete = Lists.newArrayList(); @@ -3229,13 +3235,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso PTableType type = table.getType(); if (type == PTableType.TABLE || type == PTableType.SYSTEM) { TableViewFinder childViewsResult = new TableViewFinder(); - findAllChildViews(region, tenantId, table, childViewsResult, clientTimeStamp); + findAllChildViews(region, tenantId, table, childViewsResult, clientTimeStamp, request.getClientVersion()); if (childViewsResult.hasViews()) { MetaDataMutationResult mutationResult = dropColumnsFromChildViews(region, table, locks, tableMetaData, additionalTableMetaData, schemaName, tableName, invalidateList, - clientTimeStamp, childViewsResult, tableNamesToDelete, sharedTablesToDelete); + clientTimeStamp, childViewsResult, tableNamesToDelete, sharedTablesToDelete, request.getClientVersion()); // return if we were not able to drop the column successfully if (mutationResult != null) return mutationResult; } @@ -3293,7 +3299,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso dropIndexes(table, region, invalidateList, locks, clientTimeStamp, schemaName, tableName, additionalTableMetaData, columnToDelete, - tableNamesToDelete, sharedTablesToDelete); + tableNamesToDelete, sharedTablesToDelete, request.getClientVersion()); } catch (ColumnFamilyNotFoundException e) { return new MetaDataMutationResult( MutationCode.COLUMN_NOT_FOUND, EnvironmentEdgeManager @@ -3316,7 +3322,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso long currentTime = MetaDataUtil.getClientTimeStamp(tableMetaData); return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, null, tableNamesToDelete, sharedTablesToDelete); } - }); + }, request.getClientVersion()); if (result != null) { done.run(MetaDataMutationResult.toProto(result)); } @@ -3330,7 +3336,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private void dropIndexes(PTable table, HRegion region, List<ImmutableBytesPtr> invalidateList, List<RowLock> locks, long clientTimeStamp, byte[] schemaName, byte[] tableName, List<Mutation> additionalTableMetaData, PColumn columnToDelete, - List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete) + List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, int clientVersion) throws IOException, SQLException { // Look for columnToDelete in any indexes. If found as PK column, get lock and drop the // index and then invalidate it @@ -3369,7 +3375,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso additionalTableMetaData.add(new Delete(linkKey, clientTimeStamp)); doDropTable(indexKey, tenantId, index.getSchemaName().getBytes(), index .getTableName().getBytes(), tableName, index.getType(), - additionalTableMetaData, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false); + additionalTableMetaData, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false, clientVersion); invalidateList.add(new ImmutableBytesPtr(indexKey)); } // If the dropped column is a covered index column, invalidate the index @@ -3435,7 +3441,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(MetaDataMutationResult.toProto(result)); return; } - long timeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata); + long timeStamp = HConstants.LATEST_TIMESTAMP; ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key); List<Cell> newKVs = tableMetadata.get(0).getFamilyCellMap().get(TABLE_FAMILY_BYTES); Cell newKV = null; @@ -3447,6 +3453,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso INDEX_STATE_BYTES, 0, INDEX_STATE_BYTES.length) == 0){ newKV = cell; indexStateKVIndex = index; + timeStamp = cell.getTimestamp(); } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), INDEX_DISABLE_TIMESTAMP_BYTES, 0, INDEX_DISABLE_TIMESTAMP_BYTES.length) == 0) { disableTimeStampKVIndex = index; @@ -3461,7 +3468,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } try { Get get = new Get(key); - get.setTimeRange(PTable.INITIAL_SEQ_NUM, timeStamp); get.addColumn(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES); get.addColumn(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES); get.addColumn(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES); @@ -3481,6 +3487,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso PIndexState currentState = PIndexState.fromSerializedValue(currentStateKV.getValueArray()[currentStateKV .getValueOffset()]); + // Timestamp of INDEX_STATE gets updated with each call + long actualTimestamp = currentStateKV.getTimestamp(); long curTimeStampVal = 0; if ((currentDisableTimeStamp != null && currentDisableTimeStamp.getValueLength() > 0)) { curTimeStampVal = (Long) PLong.INSTANCE.toObject(currentDisableTimeStamp.getValueArray(), @@ -3488,32 +3496,28 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // new DisableTimeStamp is passed in if (disableTimeStampKVIndex >= 0) { Cell newDisableTimeStampCell = newKVs.get(disableTimeStampKVIndex); + long expectedTimestamp = newDisableTimeStampCell.getTimestamp(); + // If the index status has been updated after the upper bound of the scan we use + // to partially rebuild the index, then we need to fail the rebuild because an + // index write failed before the rebuild was complete. + if (actualTimestamp > expectedTimestamp) { + builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION); + builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); + done.run(builder.build()); + return; + } long newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(), newDisableTimeStampCell.getValueOffset(), newDisableTimeStampCell.getValueLength()); - // We never set the INDEX_DISABLE_TIMESTAMP to a positive value when we're setting the state to ACTIVE. - // Instead, we're passing in what we expect the INDEX_DISABLE_TIMESTAMP to be currently. If it's - // changed, it means that a data table row failed to write while we were partially rebuilding it - // and we must rerun it. - if (newState == PIndexState.ACTIVE && newDisableTimeStamp > 0) { - // Don't allow setting to ACTIVE if the INDEX_DISABLE_TIMESTAMP doesn't match - // what we expect. - if (newDisableTimeStamp != Math.abs(curTimeStampVal)) { - builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION); - builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); - done.run(builder.build()); - return; - } - // Reset INDEX_DISABLE_TIMESTAMP_BYTES to zero as we're good to go. - newKVs.set(disableTimeStampKVIndex, - CellUtil.createCell(key, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES, - timeStamp, KeyValue.Type.Put.getCode(), PLong.INSTANCE.toBytes(0L))); - } // We use the sign of the INDEX_DISABLE_TIMESTAMP to differentiate the keep-index-active (negative) // from block-writes-to-data-table case. In either case, we want to keep the oldest timestamp to // drive the partial index rebuild rather than update it with each attempt to update the index // when a new data table write occurs. - if (curTimeStampVal != 0 && Math.abs(curTimeStampVal) < Math.abs(newDisableTimeStamp)) { - // not reset disable timestamp + // We do legitimately move the INDEX_DISABLE_TIMESTAMP to be newer when we're rebuilding the + // index in which case the state will be INACTIVE or PENDING_ACTIVE. + if (curTimeStampVal != 0 + && (newState == PIndexState.DISABLE || newState == PIndexState.PENDING_ACTIVE) + && Math.abs(curTimeStampVal) < Math.abs(newDisableTimeStamp)) { + // do not reset disable timestamp as we want to keep the min newKVs.remove(disableTimeStampKVIndex); disableTimeStampKVIndex = -1; } @@ -3545,29 +3549,42 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (currentState == PIndexState.BUILDING && newState != PIndexState.ACTIVE) { timeStamp = currentStateKV.getTimestamp(); } - if ((currentState == PIndexState.UNUSABLE && newState == PIndexState.ACTIVE) - || (currentState == PIndexState.ACTIVE && newState == PIndexState.UNUSABLE)) { + if ((currentState == PIndexState.ACTIVE || currentState == PIndexState.PENDING_ACTIVE) && newState == PIndexState.UNUSABLE) { newState = PIndexState.INACTIVE; newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES, INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue()))); - } else if (currentState == PIndexState.INACTIVE && newState == PIndexState.USABLE) { - newState = PIndexState.ACTIVE; + } else if ((currentState == PIndexState.INACTIVE || currentState == PIndexState.PENDING_ACTIVE) && newState == PIndexState.USABLE) { + // Don't allow manual state change to USABLE (i.e. ACTIVE) if non zero INDEX_DISABLE_TIMESTAMP + if (curTimeStampVal != 0) { + newState = currentState; + } else { + newState = PIndexState.ACTIVE; + } newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES, INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue()))); } PTable returnTable = null; if (currentState != newState || disableTimeStampKVIndex != -1) { + // make a copy of tableMetadata so we can add to it + tableMetadata = new ArrayList<Mutation>(tableMetadata); + // Always include the empty column value at latest timestamp so + // that clients pull over update. + Put emptyValue = new Put(key); + emptyValue.add(TABLE_FAMILY_BYTES, + QueryConstants.EMPTY_COLUMN_BYTES, + HConstants.LATEST_TIMESTAMP, + QueryConstants.EMPTY_COLUMN_VALUE_BYTES); + tableMetadata.add(emptyValue); byte[] dataTableKey = null; - if(dataTableKV != null) { - dataTableKey = SchemaUtil.getTableKey(tenantId, schemaName, dataTableKV.getValue()); - } - if(dataTableKey != null) { - // make a copy of tableMetadata - tableMetadata = new ArrayList<Mutation>(tableMetadata); + if (dataTableKV != null) { + dataTableKey = SchemaUtil.getTableKey(tenantId, schemaName, CellUtil.cloneValue(dataTableKV)); // insert an empty KV to trigger time stamp update on data table row Put p = new Put(dataTableKey); - p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); + p.add(TABLE_FAMILY_BYTES, + QueryConstants.EMPTY_COLUMN_BYTES, + HConstants.LATEST_TIMESTAMP, + QueryConstants.EMPTY_COLUMN_VALUE_BYTES); tableMetadata.add(p); } boolean setRowKeyOrderOptimizableCell = newState == PIndexState.BUILDING && !rowKeyOrderOptimizable; @@ -3585,7 +3602,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } if (setRowKeyOrderOptimizableCell || disableTimeStampKVIndex != -1 || currentState == PIndexState.DISABLE || newState == PIndexState.BUILDING) { - returnTable = doGetTable(key, HConstants.LATEST_TIMESTAMP, rowLock); + returnTable = doGetTable(key, HConstants.LATEST_TIMESTAMP, rowLock, request.getClientVersion()); } } // Get client timeStamp from mutations, since it may get updated by the
