PHOENIX-4178 Detect failed index write while rebuilder is running with index 
staying active


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d047fae9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d047fae9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d047fae9

Branch: refs/heads/4.x-HBase-0.98
Commit: d047fae97423b51f4b40f2b1793b575af8de451c
Parents: afe1e5f
Author: James Taylor <[email protected]>
Authored: Thu Sep 21 18:24:39 2017 -0700
Committer: James Taylor <[email protected]>
Committed: Fri Sep 22 22:00:44 2017 -0700

----------------------------------------------------------------------
 .../end2end/index/MutableIndexFailureIT.java    |  12 +-
 .../end2end/index/PartialIndexRebuilderIT.java  | 459 +++++++++++--------
 .../coprocessor/MetaDataEndpointImpl.java       | 179 ++++----
 .../phoenix/coprocessor/MetaDataProtocol.java   |   6 +-
 .../coprocessor/MetaDataRegionObserver.java     |  80 +---
 .../apache/phoenix/execute/MutationState.java   |  10 +-
 .../apache/phoenix/index/IndexMaintainer.java   |  25 +-
 .../index/PhoenixIndexFailurePolicy.java        |  12 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   1 +
 .../apache/phoenix/optimize/QueryOptimizer.java |   6 +-
 .../phoenix/query/QueryServicesOptions.java     |  16 +-
 .../org/apache/phoenix/schema/PIndexState.java  |   3 +-
 .../java/org/apache/phoenix/util/IndexUtil.java |  14 +-
 .../phoenix/query/QueryServicesTestImpl.java    |  10 +-
 .../java/org/apache/phoenix/util/TestUtil.java  |  41 +-
 15 files changed, 496 insertions(+), 378 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d047fae9/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index a24e93f..0163620 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -36,7 +37,6 @@ import java.util.Properties;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -138,12 +138,12 @@ public class MutableIndexFailureIT extends BaseTest {
         // need to override rpc retries otherwise test doesn't pass
         serverProps.put(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, 
Long.toString(numRpcRetries));
         
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB,
 Long.toString(forwardOverlapMs));
+        
serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, 
Long.toString(disableTimestampThresholdMs));
         /*
          * Effectively disable running the index rebuild task by having an 
infinite delay
          * because we want to control it's execution ourselves
          */
         serverProps.put(QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY, 
Long.toString(Long.MAX_VALUE));
-        
serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, 
Long.toString(disableTimestampThresholdMs));
         Map<String, String> clientProps = 
Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, 
Boolean.TRUE.toString());
         NUM_SLAVES_BASE = 4;
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
@@ -299,10 +299,10 @@ public class MutableIndexFailureIT extends BaseTest {
             assertTrue(rs.next());
             assertEquals(indexName, rs.getString(3));
             // the index is only disabled for non-txn tables upon index table 
write failure
+            String indexState = rs.getString("INDEX_STATE");
             if (transactional || leaveIndexActiveOnFailure) {
-                assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
+                assertTrue(PIndexState.ACTIVE.toString().equals(indexState) || 
PIndexState.PENDING_ACTIVE.toString().equals(indexState));
             } else {
-                String indexState = rs.getString("INDEX_STATE");
                 assertTrue(PIndexState.DISABLE.toString().equals(indexState) 
|| PIndexState.INACTIVE.toString().equals(indexState));
             }
             assertFalse(rs.next());
@@ -311,7 +311,7 @@ public class MutableIndexFailureIT extends BaseTest {
             // in an all or none manner. If the table is not transactional, 
then the data writes
             // would have succeeded while the index writes would have failed.
             if (!transactional) {
-                updateTableAgain(conn, leaveIndexActiveOnFailure);
+                updateTableAgain(conn, false);
                 // Verify previous writes succeeded to data table
                 query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
                 rs = conn.createStatement().executeQuery("EXPLAIN " + query);
@@ -518,7 +518,7 @@ public class MutableIndexFailureIT extends BaseTest {
         public static final String FAIL_TABLE_NAME = "FAIL_TABLE";
 
         @Override
-        public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+        public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
             boolean throwException = false;
             if 
(c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" 
+ FAIL_INDEX_NAME)
                     && FAIL_WRITE) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d047fae9/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index dfe5a28..9095dbe 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Map;
 import java.util.Random;
@@ -32,7 +33,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -40,11 +41,13 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
+import 
org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.execute.CommitException;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PMetaData;
@@ -64,23 +67,68 @@ import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Maps;
 
+@SuppressWarnings("deprecation")
 @RunWith(RunUntilFailure.class)
 public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartialIndexRebuilderIT.class);
     private static final Random RAND = new Random(5);
     private static final int WAIT_AFTER_DISABLED = 5000;
-    private static final int REBUILD_INTERVAL = 2000;
+    private static final long REBUILD_PERIOD = 50000;
+    private static final long REBUILD_INTERVAL = 2000;
+    private static RegionCoprocessorEnvironment 
indexRebuildTaskRegionEnvironment;
 
+    
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
         serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, 
Boolean.TRUE.toString());
         
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, 
Long.toString(REBUILD_INTERVAL));
-        
serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, 
"120000"); // give up rebuilding after 2 minutes
+        
serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, 
"300000"); // give up rebuilding after 5 minutes
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, 
Long.toString(REBUILD_PERIOD)); // batch at 50 seconds
         
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB,
 Long.toString(WAIT_AFTER_DISABLED));
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
ReadOnlyProps.EMPTY_PROPS);
+        indexRebuildTaskRegionEnvironment =
+                (RegionCoprocessorEnvironment) getUtility()
+                        .getRSForFirstRegionInTable(
+                            
PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+                        
.getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+                        .get(0).getCoprocessorHost()
+                        
.findCoprocessorEnvironment(MetaDataRegionObserver.class.getName());
+        MetaDataRegionObserver.initRebuildIndexConnectionProps(
+            indexRebuildTaskRegionEnvironment.getConfiguration());
+    }
+
+    private static void runIndexRebuilder() throws InterruptedException, 
SQLException {
+        BuildIndexScheduleTask task =
+                new MetaDataRegionObserver.BuildIndexScheduleTask(
+                        indexRebuildTaskRegionEnvironment);
+        task.run();
+    }
+    
+    private static void runIndexRebuilderAsync(final int interval, final 
boolean[] cancel) {
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (!cancel[0]) {
+                    try {
+                        runIndexRebuilder();
+                        Thread.sleep(interval);
+                    } catch (InterruptedException e) {
+                        Thread.interrupted();
+                        throw new RuntimeException(e);
+                    } catch (SQLException e) {
+                        LOG.error(e.getMessage(),e);
+                    }
+                }
+            }
+        });
+        thread.setDaemon(true);
+        thread.start();
     }
 
     private static void mutateRandomly(final String fullTableName, final int 
nThreads, final int nRows, final int nIndexValues, final int batchSize, final 
CountDownLatch doneSignal) {
@@ -134,7 +182,6 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
         }
     }
     @Test
-    @Repeat(5)
     public void testConcurrentUpsertsWithRebuild() throws Throwable {
         int nThreads = 5;
         final int batchSize = 200;
@@ -155,12 +202,17 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
         assertTrue("Ran out of time", doneSignal1.await(120, 
TimeUnit.SECONDS));
         
         IndexUtil.updateIndexState(fullIndexName, 
EnvironmentEdgeManager.currentTimeMillis(), metaTable, PIndexState.DISABLE);
-        do {
-            final CountDownLatch doneSignal2 = new CountDownLatch(nThreads);
-            mutateRandomly(fullTableName, nThreads, nRows, nIndexValues, 
batchSize, doneSignal2);
-            assertTrue("Ran out of time", doneSignal2.await(500, 
TimeUnit.SECONDS));
-        } while (!TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
-        
+        boolean[] cancel = new boolean[1];
+        try {
+            do {
+                final CountDownLatch doneSignal2 = new 
CountDownLatch(nThreads);
+                runIndexRebuilderAsync(500,cancel);
+                mutateRandomly(fullTableName, nThreads, nRows, nIndexValues, 
batchSize, doneSignal2);
+                assertTrue("Ran out of time", doneSignal2.await(500, 
TimeUnit.SECONDS));
+            } while (!TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
+        } finally {
+            cancel[0] = true;
+        }
         long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
fullTableName, fullIndexName);
         assertEquals(nRows, actualRowCount);
     }
@@ -172,17 +224,21 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
     private static boolean hasInactiveIndex(PMetaData metaCache, PTableKey 
key) throws TableNotFoundException {
         PTable table = metaCache.getTableRef(key).getTable();
         for (PTable index : table.getIndexes()) {
-            if (index.getIndexState() == PIndexState.ACTIVE) {
+            if (index.getIndexState() == PIndexState.INACTIVE) {
                 return true;
             }
         }
         return false;
     }
-    
+
     private static boolean hasDisabledIndex(PMetaData metaCache, PTableKey 
key) throws TableNotFoundException {
+        return hasIndexWithState(metaCache, key, PIndexState.DISABLE);
+    }
+
+    private static boolean hasIndexWithState(PMetaData metaCache, PTableKey 
key, PIndexState expectedState) throws TableNotFoundException {
         PTable table = metaCache.getTableRef(key).getTable();
         for (PTable index : table.getIndexes()) {
-            if (index.getIndexState() == PIndexState.DISABLE) {
+            if (index.getIndexState() == expectedState) {
                 return true;
             }
         }
@@ -304,8 +360,14 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             long disableTS = EnvironmentEdgeManager.currentTimeMillis();
             HTableInterface metaTable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
             IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, 
PIndexState.DISABLE);
-            mutateRandomly(conn, fullTableName, nRows);
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
+            boolean[] cancel = new boolean[1];
+            try {
+                runIndexRebuilderAsync(500,cancel);
+                mutateRandomly(conn, fullTableName, nRows);
+                TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
+            } finally {
+                cancel[0] = true;
+            }
             
             long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
fullTableName, fullIndexName);
             assertEquals(nRows,actualRowCount);
@@ -346,12 +408,16 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             t.start();
             long disableTS = EnvironmentEdgeManager.currentTimeMillis();
             IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, 
PIndexState.DISABLE);
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
-            doneSignal.await(60, TimeUnit.SECONDS);
+            boolean[] cancel = new boolean[1];
+            try {
+                runIndexRebuilderAsync(500,cancel);
+                TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
+                doneSignal.await(60, TimeUnit.SECONDS);
+            } finally {
+                cancel[0] = true;
+            }
             assertTrue(hasInactiveIndex[0]);
             
-            TestUtil.dumpIndexStatus(conn, fullIndexName);
-
             long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
fullTableName, fullIndexName);
             assertEquals(nRows,actualRowCount);
             
@@ -379,7 +445,10 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             conn.commit();
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','eeeee')");
             conn.commit();
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
+            runIndexRebuilder();
+            Thread.sleep(WAIT_AFTER_DISABLED);
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
 
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
@@ -406,7 +475,10 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             conn.commit();
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc')");
             conn.commit();
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
+            runIndexRebuilder();
+            Thread.sleep(WAIT_AFTER_DISABLED);
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
 
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
@@ -433,7 +505,10 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             conn.commit();
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a',null)");
             conn.commit();
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
+            runIndexRebuilder();
+            Thread.sleep(WAIT_AFTER_DISABLED);
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
 
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
@@ -458,7 +533,10 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             conn.commit();
             conn.createStatement().execute("DELETE FROM " + fullTableName);
             conn.commit();
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
+            runIndexRebuilder();
+            Thread.sleep(WAIT_AFTER_DISABLED);
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
 
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
        }
@@ -483,7 +561,10 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, 
PIndexState.DISABLE);
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','b')");
             conn.commit();
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
+            runIndexRebuilder();
+            Thread.sleep(WAIT_AFTER_DISABLED);
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
 
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
@@ -526,9 +607,11 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc')");
             conn.commit();
             clock.time += 1000;
-            advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
-            clock.time += 100;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.INACTIVE, null));
+            clock.time += WAIT_AFTER_DISABLED;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         } finally {
             EnvironmentEdgeManager.injectEdge(null);
@@ -536,7 +619,7 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
     }
     
     @Test
-    public void testSeparateTimeBatchesRequired() throws Throwable {
+    public void testTimeBatchesInCoprocessorRequired() throws Throwable {
         String schemaName = generateUniqueName();
         String tableName = generateUniqueName();
         String indexName = generateUniqueName();
@@ -572,9 +655,11 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             clock.time += 100;
             IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, 
PIndexState.DISABLE);
             clock.time += 100;
-            advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
-            clock.time += 100;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.INACTIVE, null));
+            clock.time += WAIT_AFTER_DISABLED;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         } finally {
             EnvironmentEdgeManager.injectEdge(null);
@@ -582,7 +667,7 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
     }
     
     @Test
-    public void testMultiValuesWhenDisableAndInactive() throws Throwable {
+    public void testBatchingDuringRebuild() throws Throwable {
         String schemaName = generateUniqueName();
         String tableName = generateUniqueName();
         String indexName = generateUniqueName();
@@ -593,55 +678,45 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
         EnvironmentEdgeManager.injectEdge(clock);
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             PMetaData metaCache = 
conn.unwrap(PhoenixConnection.class).getMetaDataCache();
-            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) 
COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true");
             clock.time += 100;
-            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v1, v2) INCLUDE (v3)");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v1, v2)");
             clock.time += 100;
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','a','0','x')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','a','0')");
             conn.commit();
             clock.time += 100;
-            try (HTableInterface metaTable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES))
 {
-                // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the 
partial index rebuilder from triggering
-                IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, 
PIndexState.DISABLE);
-                clock.time += 100;
-                long disableTime = clock.currentTime();
-                // Set some values while index disabled
-                conn.createStatement().execute("UPSERT INTO " + fullTableName 
+ " VALUES('b','bb', '11','yy')");
-                conn.commit();
-                clock.time += 100;
-                assertTrue(hasDisabledIndex(metaCache, key));
-                conn.createStatement().execute("UPSERT INTO " + fullTableName 
+ " VALUES('a','ccc','222','zzz')");
-                conn.commit();
-                clock.time += 100;
-                conn.createStatement().execute("UPSERT INTO " + fullTableName 
+ " VALUES('a','dddd','3333','zzzz')");
-                conn.commit();
-                clock.time += 100;
-                // Will cause partial index rebuilder to be triggered
-                IndexUtil.updateIndexState(fullIndexName, disableTime, 
metaTable, PIndexState.DISABLE);
-            }
-            final CountDownLatch doneSignal = new CountDownLatch(1);
-            advanceClockUntilPartialRebuildStarts(fullIndexName, clock, 
doneSignal);
-            doneSignal.await(30, TimeUnit.SECONDS);
-            // Set some values while index is in INACTIVE state
+            HTableInterface metaTable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            long disableTime = clock.currentTime();
+            IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, 
PIndexState.DISABLE);
             clock.time += 100;
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','eeeee','44444','zzzzz')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('bb','bb', '11')");
             conn.commit();
-            clock.time += 100;
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','fffff','55555','zzzzzz')");
+            clock.time += REBUILD_PERIOD;
+            assertTrue(hasDisabledIndex(metaCache, key));
+            assertEquals(2,TestUtil.getRowCount(conn, fullTableName));
+            assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('ccc','ccc','222')");
             conn.commit();
-            clock.time += WAIT_AFTER_DISABLED;
-            // Enough time has passed, so rebuild will start now
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
+            assertEquals(3,TestUtil.getRowCount(conn, fullTableName));
+            assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
             clock.time += 100;
+            
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.INACTIVE, null));
+            clock.time += WAIT_AFTER_DISABLED;
+            runIndexRebuilder();
+            assertEquals(2,TestUtil.getRowCount(conn, fullIndexName));
+            
+            clock.time += REBUILD_PERIOD;
+            runIndexRebuilder();
+            // Verify that other batches were processed
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         } finally {
             EnvironmentEdgeManager.injectEdge(null);
         }
     }
-
-    private final static CountDownLatch WAIT_FOR_REBUILD_TO_START = new 
CountDownLatch(1);
-    private final static CountDownLatch WAIT_FOR_INDEX_WRITE = new 
CountDownLatch(1);
-
+    
     @Test
     public void testUpperBoundSetOnRebuild() throws Throwable {
         String schemaName = generateUniqueName();
@@ -665,7 +740,7 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','a', '0')");
             conn.commit();
             // Set clock forward in time past the "overlap" amount we wait for 
index maintenance to kick in
-            clock.time += 10 * WAIT_AFTER_DISABLED;
+            clock.time += 2 * WAIT_AFTER_DISABLED;
             assertTrue(hasDisabledIndex(metaCache, key));
             assertEquals(1,TestUtil.getRowCount(conn, fullTableName));
             assertEquals(0,TestUtil.getRowCount(conn, fullIndexName));
@@ -676,9 +751,11 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             // Set clock back in time and start rebuild
             clock.time = disableTime + 100;
             IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, 
PIndexState.DISABLE);
-            advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
-            clock.time += REBUILD_INTERVAL;
-            waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE, 
clock, REBUILD_INTERVAL);
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.INACTIVE, null));
+            clock.time += WAIT_AFTER_DISABLED;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
             assertEquals(2,TestUtil.getRowCount(conn, fullTableName));
             // If an upper bound was set on the rebuilder, we should only have 
found one row
             assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
@@ -687,21 +764,8 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
         }
     }
 
-    private static void waitForIndexRebuild(Connection conn, String 
fullIndexName, PIndexState expectedIndexState, MyClock clock, long increment) 
throws InterruptedException, SQLException {
-        int maxTries = 60, nTries = 0;
-        do {
-            Thread.sleep(1000); // sleep 1 sec
-            clock.time += increment;
-            if (TestUtil.checkIndexState(conn, fullIndexName, 
expectedIndexState, 0L)) {
-                return;
-            }
-        } while (++nTries < maxTries);
-        fail("Ran out of time waiting for index state to become " + 
expectedIndexState);
-    }
-
-
     @Test
-    public void testDisableIndexDuringRebuild() throws Throwable {
+    public void testMultiValuesWhenDisableAndInactive() throws Throwable {
         String schemaName = generateUniqueName();
         String tableName = generateUniqueName();
         String indexName = generateUniqueName();
@@ -737,47 +801,21 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
                 clock.time += 100;
                 // Will cause partial index rebuilder to be triggered
                 IndexUtil.updateIndexState(fullIndexName, disableTime, 
metaTable, PIndexState.DISABLE);
-                final CountDownLatch doneSignal = new CountDownLatch(1);
-                advanceClockUntilPartialRebuildStarts(fullIndexName, clock, 
doneSignal);
-                // Set some values while index is in INACTIVE state
-                clock.time += 100;
-                conn.createStatement().execute("UPSERT INTO " + fullTableName 
+ " VALUES('a','eeeee','44444','zzzzz')");
-                conn.commit();
-                clock.time += 100;
-                conn.createStatement().execute("UPSERT INTO " + fullTableName 
+ " VALUES('a','fffff','55555','zzzzzz')");
-                conn.commit();
-                doneSignal.await(30, TimeUnit.SECONDS);
-                // Install coprocessor that will simulate an index write 
failure during index rebuild
-                
TestUtil.addCoprocessor(conn,fullIndexName,WriteFailingRegionObserver.class);
-                clock.time += WAIT_AFTER_DISABLED;
-                doneSignal.await(30, TimeUnit.SECONDS);
-                WAIT_FOR_REBUILD_TO_START.await(30, TimeUnit.SECONDS);
-                // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the 
partial index rebuilder from triggering
-                IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, 
PIndexState.DISABLE);
-                   clock.time += 100;
-                   disableTime = clock.currentTime();
-                   // Set some values while index disabled
-                   conn.createStatement().execute("UPSERT INTO " + 
fullTableName + " VALUES('b','bbbbb', '11','yy')");
-                   conn.commit();
-                   clock.time += 100;
-                   conn.createStatement().execute("UPSERT INTO " + 
fullTableName + " VALUES('a','cccccc','222','zzz')");
-                   conn.commit();
-                   clock.time += 100;
-                   conn.createStatement().execute("UPSERT INTO " + 
fullTableName + " VALUES('a','ddddddd','3333','zzzz')");
-                   conn.commit();
-                   clock.time += 100;
-                   // Simulates another write failure. Should cause current 
run of rebuilder to fail and retry again later
-                   IndexUtil.updateIndexState(fullIndexName, disableTime, 
metaTable, PIndexState.DISABLE);
-                removeWriteFailingCoprocessor(conn,fullIndexName);
-                   WAIT_FOR_INDEX_WRITE.countDown();
             }
-            // Original rebuilder should have failed
-            
-            advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
-            clock.time += WAIT_AFTER_DISABLED * 2;
-            // Enough time has passed, so rebuild will start now
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.INACTIVE, null));
+
+            // Set some values while index is in INACTIVE state
             clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','eeeee','44444','zzzzz')");
+            conn.commit();
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','fffff','55555','zzzzzz')");
+            conn.commit();
+            clock.time += WAIT_AFTER_DISABLED;
+            // Enough time has passed, so rebuild will start now
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         } finally {
             EnvironmentEdgeManager.injectEdge(null);
@@ -785,6 +823,100 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
     }
 
     @Test
+    public void testIndexWriteFailureDisablingIndex() throws Throwable {
+        testIndexWriteFailureDuringRebuild(PIndexState.DISABLE);
+    }
+    
+    @Test
+    public void testIndexWriteFailureLeavingIndexActive() throws Throwable {
+        testIndexWriteFailureDuringRebuild(PIndexState.PENDING_ACTIVE);
+    }
+    
+    private void testIndexWriteFailureDuringRebuild(PIndexState 
indexStateOnFailure) throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        final String fullTableName = SchemaUtil.getTableName(schemaName, 
tableName);
+        final String fullIndexName = SchemaUtil.getTableName(schemaName, 
indexName);
+        PTableKey key = new PTableKey(null,fullTableName);
+        final MyClock clock = new MyClock(1000);
+        EnvironmentEdgeManager.injectEdge(clock);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PMetaData metaCache = 
conn.unwrap(PhoenixConnection.class).getMetaDataCache();
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) 
COLUMN_ENCODED_BYTES = 0, DISABLE_INDEX_ON_WRITE_FAILURE = " + 
(indexStateOnFailure == PIndexState.DISABLE));
+            clock.time += 100;
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v1, v2)");
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','a','0')");
+            conn.commit();
+            clock.time += 100;
+            HTableInterface metaTable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+
+            long disableTime = clock.currentTime();
+            // Simulates an index write failure
+            IndexUtil.updateIndexState(fullIndexName, indexStateOnFailure == 
PIndexState.DISABLE ? disableTime : -disableTime, metaTable, 
indexStateOnFailure);
+            
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('bb','bb', '11')");
+            conn.commit();
+            
+            // Large enough to be in separate time batch
+            clock.time += 2 * REBUILD_PERIOD;
+            assertTrue(hasIndexWithState(metaCache, key, indexStateOnFailure));
+            assertEquals(2,TestUtil.getRowCount(conn, fullTableName));
+            assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('ccc','ccc','222')");
+            conn.commit();
+            assertEquals(3,TestUtil.getRowCount(conn, fullTableName));
+            assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
+            clock.time += 100;
+
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
indexStateOnFailure == PIndexState.DISABLE ? PIndexState.INACTIVE : 
PIndexState.ACTIVE, null));
+            clock.time += WAIT_AFTER_DISABLED;
+            
+            // First batch should have been processed
+            runIndexRebuilder();
+            assertEquals(2,TestUtil.getRowCount(conn, fullIndexName));
+
+            // Simulate write failure
+            TestUtil.addCoprocessor(conn, fullIndexName, 
WriteFailingRegionObserver.class);
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('dddd','dddd','3333')");
+            try {
+                conn.commit();
+                fail();
+            } catch (CommitException e) {
+                // Expected
+            }
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
indexStateOnFailure, null));
+            PhoenixStatement stmt = 
conn.createStatement().unwrap(PhoenixStatement.class);
+            ResultSet rs = stmt.executeQuery("SELECT V2 FROM " + fullTableName 
+ " WHERE V1 = 'a'");
+            assertTrue(rs.next());
+            assertEquals("0", rs.getString(1));
+            assertEquals(indexStateOnFailure == PIndexState.DISABLE ? 
fullTableName : fullIndexName, 
stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString());
+            TestUtil.removeCoprocessor(conn, fullIndexName, 
WriteFailingRegionObserver.class);
+            
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
indexStateOnFailure == PIndexState.DISABLE ? PIndexState.INACTIVE : 
PIndexState.ACTIVE, null));
+            clock.time += WAIT_AFTER_DISABLED;
+            
+            // First batch should have been processed again because we started 
over
+            runIndexRebuilder();
+            assertEquals(3,TestUtil.getRowCount(conn, fullIndexName));
+
+            clock.time += 2 * REBUILD_PERIOD;
+            // Second batch should have been processed now
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
+            
+            // Verify that other batches were processed
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
+        } finally {
+            EnvironmentEdgeManager.injectEdge(null);
+        }
+    }
+    
+    @Test
     public void testDeleteAndUpsertValuesAtSameTS1() throws Throwable {
         String schemaName = generateUniqueName();
         String tableName = generateUniqueName();
@@ -808,9 +940,11 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc')");
             conn.commit();
             clock.time += 1000;
-            advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
-            clock.time += 100;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.INACTIVE, null));
+            clock.time += WAIT_AFTER_DISABLED;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         } finally {
             EnvironmentEdgeManager.injectEdge(null);
@@ -841,9 +975,11 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("DELETE FROM " + fullTableName + " 
WHERE k='a'");
             conn.commit();
             clock.time += 1000;
-            advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
-            clock.time += 100;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.INACTIVE, null));
+            clock.time += WAIT_AFTER_DISABLED;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         } finally {
             EnvironmentEdgeManager.injectEdge(null);
@@ -873,65 +1009,10 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
         }
     }
     
-    private static void advanceClockUntilPartialRebuildStarts(final String 
fullIndexName, final MyClock clock) throws InterruptedException {
-        final CountDownLatch doneSignal = new CountDownLatch(1);
-        advanceClockUntilPartialRebuildStarts(fullIndexName, clock, 
doneSignal);
-        clock.time += WAIT_AFTER_DISABLED + 1000;
-        doneSignal.await(30, TimeUnit.SECONDS);
-    }
-    
-    private static void advanceClockUntilPartialRebuildStarts(final String 
fullIndexName, final MyClock clock, final CountDownLatch doneSignal) {
-        Runnable r = new Runnable() {
-            @Override
-            public void run() {
-                try (Connection conn = DriverManager.getConnection(getUrl())) {
-                  int nTries = 10;
-                    while (--nTries >0 && !TestUtil.checkIndexState(conn, 
fullIndexName, PIndexState.INACTIVE, null)) {
-                        Thread.sleep(1000);
-                        clock.time += 1000;
-                    }
-                    doneSignal.countDown();
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        };
-        Thread t = new Thread(r);
-        t.setDaemon(true);
-        t.start();
-    }
-    
-    private static void removeWriteFailingCoprocessor(Connection conn, String 
tableName) throws Exception {
-        ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
-        HTableDescriptor descriptor = 
services.getTableDescriptor(Bytes.toBytes(tableName));
-        
descriptor.removeCoprocessor(WriteFailingRegionObserver.class.getName());
-        int numTries = 10;
-        try (HBaseAdmin admin = services.getAdmin()) {
-            admin.modifyTable(Bytes.toBytes(tableName), descriptor);
-            while 
(!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
-                    && numTries > 0) {
-                numTries--;
-                if (numTries == 0) {
-                    throw new Exception(
-                            "Check to detect if delaying co-processor was 
removed failed after "
-                                    + numTries + " retries.");
-                }
-                Thread.sleep(1000);
-            }
-        }
-    }
-    
     public static class WriteFailingRegionObserver extends 
SimpleRegionObserver {
         @Override
         public void 
postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
-               WAIT_FOR_REBUILD_TO_START.countDown();
-               try {
-                               WAIT_FOR_INDEX_WRITE.await(30, 
TimeUnit.SECONDS);
-                       } catch (InterruptedException e) {
-                               Thread.interrupted();
-                               throw new IOException(e);
-                       }
+            throw new DoNotRetryIOException("Simulating write failure on " + 
c.getEnvironment().getRegionInfo().getTable().getNameAsString());
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d047fae9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 04e1604..7e9450b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -514,7 +514,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
             }
 
             long currentTime = EnvironmentEdgeManager.currentTimeMillis();
-            PTable table = doGetTable(key, request.getClientTimestamp());
+            PTable table = doGetTable(key, request.getClientTimestamp(), 
request.getClientVersion());
             if (table == null) {
                 
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
                 builder.setMutationTime(currentTime);
@@ -526,7 +526,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
             long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? 
disableIndexTimestamp : Long.MAX_VALUE;
             for (PTable index : table.getIndexes()) {
                 disableIndexTimestamp = index.getIndexDisableTimestamp();
-                if (disableIndexTimestamp > 0 && index.getIndexState() == 
PIndexState.ACTIVE && disableIndexTimestamp < minNonZerodisableIndexTimestamp) {
+                if (disableIndexTimestamp > 0 && (index.getIndexState() == 
PIndexState.ACTIVE || index.getIndexState() == PIndexState.PENDING_ACTIVE) && 
disableIndexTimestamp < minNonZerodisableIndexTimestamp) {
                     minNonZerodisableIndexTimestamp = disableIndexTimestamp;
                 }
             }
@@ -553,7 +553,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
     }
 
     private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion 
region,
-            long clientTimeStamp) throws IOException, SQLException {
+            long clientTimeStamp, int clientVersion) throws IOException, 
SQLException {
         Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, 
clientTimeStamp);
         Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
         try (RegionScanner scanner = region.getScanner(scan)) {
@@ -562,7 +562,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
             PTable newTable;
             boolean blockWriteRebuildIndex = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, 
                     QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
-            newTable = getTable(scanner, clientTimeStamp, tableTimeStamp);
+            newTable = getTable(scanner, clientTimeStamp, tableTimeStamp, 
clientVersion);
             if (newTable == null) {
                 return null;
             }
@@ -645,9 +645,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
         }
     }
 
-    private void addIndexToTable(PName tenantId, PName schemaName, PName 
indexName, PName tableName, long clientTimeStamp, List<PTable> indexes) throws 
IOException, SQLException {
+    private void addIndexToTable(PName tenantId, PName schemaName, PName 
indexName, PName tableName, long clientTimeStamp, List<PTable> indexes, int 
clientVersion) throws IOException, SQLException {
         byte[] key = SchemaUtil.getTableKey(tenantId == null ? 
ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(), schemaName.getBytes(), 
indexName.getBytes());
-        PTable indexTable = doGetTable(key, clientTimeStamp);
+        PTable indexTable = doGetTable(key, clientTimeStamp, clientVersion);
         if (indexTable == null) {
             ServerUtil.throwIOException("Index not found", new 
TableNotFoundException(schemaName.getString(), indexName.getString()));
             return;
@@ -795,7 +795,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
         arguments.add(arg);
     }
 
-    private PTable getTable(RegionScanner scanner, long clientTimeStamp, long 
tableTimeStamp)
+    private PTable getTable(RegionScanner scanner, long clientTimeStamp, long 
tableTimeStamp, int clientVersion)
         throws IOException, SQLException {
         List<Cell> results = Lists.newArrayList();
         scanner.next(results);
@@ -896,6 +896,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
         PIndexState indexState =
                 indexStateKv == null ? null : 
PIndexState.fromSerializedValue(indexStateKv
                         .getValueArray()[indexStateKv.getValueOffset()]);
+        // If client is not yet up to 4.12, then translate PENDING_ACTIVE to 
ACTIVE (as would have been
+        // the value in those versions) since the client won't have this index 
state in its enum.
+        if (indexState == PIndexState.PENDING_ACTIVE && clientVersion < 
PhoenixDatabaseMetaData.MIN_PENDING_ACTIVE_INDEX) {
+            indexState = PIndexState.ACTIVE;
+        }
         Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX];
         boolean isImmutableRows =
                 immutableRowsKv == null ? false : (Boolean) 
PBoolean.INSTANCE.toObject(
@@ -981,7 +986,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
           } else if (Bytes.compareTo(LINK_TYPE_BYTES, 0, 
LINK_TYPE_BYTES.length, colKv.getQualifierArray(), colKv.getQualifierOffset(), 
colKv.getQualifierLength())==0) {    
               LinkType linkType = 
LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
               if (linkType == LinkType.INDEX_TABLE) {
-                  addIndexToTable(tenantId, schemaName, famName, tableName, 
clientTimeStamp, indexes);
+                  addIndexToTable(tenantId, schemaName, famName, tableName, 
clientTimeStamp, indexes, clientVersion);
               } else if (linkType == LinkType.PHYSICAL_TABLE) {
                   physicalTables.add(famName);
               } else if (linkType == LinkType.PARENT_TABLE) {
@@ -1247,13 +1252,13 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
     }
 
     private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key,
-        ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp)
+        ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp, 
int clientVersion)
         throws IOException, SQLException {
         HRegion region = env.getRegion();
         Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
         PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
         // We always cache the latest version - fault in if not in cache
-        if (table != null || (table = buildTable(key, cacheKey, region, 
asOfTimeStamp)) != null) {
+        if (table != null || (table = buildTable(key, cacheKey, region, 
asOfTimeStamp, clientVersion)) != null) {
             return table;
         }
         // if not found then check if newer table already exists and add 
delete marker for timestamp
@@ -1339,6 +1344,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         byte[] schemaName = null;
         byte[] tableName = null;
         try {
+            int clientVersion = request.getClientVersion();
             List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
             MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, 
rowKeyMetaData);
             byte[] tenantIdBytes = 
rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
@@ -1400,7 +1406,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                         acquireLock(region, parentTableKey, locks);
                         parentCacheKey = new ImmutableBytesPtr(parentTableKey);
                         parentTable = loadTable(env, parentTableKey, 
parentCacheKey, clientTimeStamp,
-                                clientTimeStamp);
+                                clientTimeStamp, clientVersion);
                         if (parentTable == null || 
isTableDeleted(parentTable)) {
                             
builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
                             
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
@@ -1433,7 +1439,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 // Get as of latest timestamp so we can detect if we have a 
newer table that already
                 // exists without making an additional query
                 PTable table =
-                        loadTable(env, tableKey, cacheKey, clientTimeStamp, 
HConstants.LATEST_TIMESTAMP);
+                        loadTable(env, tableKey, cacheKey, clientTimeStamp, 
HConstants.LATEST_TIMESTAMP, clientVersion);
                 if (table != null) {
                     if (table.getTimeStamp() < clientTimeStamp) {
                         // If the table is older than the client time stamp 
and it's deleted,
@@ -1639,8 +1645,8 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
     }
     
     private void findAllChildViews(HRegion region, byte[] tenantId, PTable 
table,
-            TableViewFinder result, long clientTimeStamp) throws IOException, 
SQLException {
-        TableViewFinder currResult = findChildViews(region, tenantId, table);
+            TableViewFinder result, long clientTimeStamp, int clientVersion) 
throws IOException, SQLException {
+        TableViewFinder currResult = findChildViews(region, tenantId, table, 
clientVersion);
         result.addResult(currResult);
         for (ViewInfo viewInfo : currResult.getViewInfoList()) {
             byte[] viewtenantId = viewInfo.getTenantId();
@@ -1648,8 +1654,8 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             byte[] viewTable = viewInfo.getViewName();
             byte[] tableKey = SchemaUtil.getTableKey(viewtenantId, viewSchema, 
viewTable);
             ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
-            PTable view = loadTable(env, tableKey, cacheKey, clientTimeStamp, 
clientTimeStamp);
-            findAllChildViews(region, viewtenantId, view, result, 
clientTimeStamp);
+            PTable view = loadTable(env, tableKey, cacheKey, clientTimeStamp, 
clientTimeStamp, clientVersion);
+            findAllChildViews(region, viewtenantId, view, result, 
clientTimeStamp, clientVersion);
         }
     }
         
@@ -1773,7 +1779,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
     private static final byte[] PHYSICAL_TABLE_BYTES =
             new byte[] { PTable.LinkType.PHYSICAL_TABLE.getSerializedValue() };
 
-    private TableViewFinder findChildViews(HRegion region, byte[] tenantId, 
PTable table)
+    private TableViewFinder findChildViews(HRegion region, byte[] tenantId, 
PTable table, int clientVersion)
             throws IOException, SQLException {
         byte[] tableKey =
                 SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY,
@@ -1782,7 +1788,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
         PTable systemCatalog =
                 loadTable(env, tableKey, cacheKey, MIN_SYSTEM_TABLE_TIMESTAMP,
-                    HConstants.LATEST_TIMESTAMP);
+                    HConstants.LATEST_TIMESTAMP, clientVersion);
         if (systemCatalog.getTimeStamp() < MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0) {
             return findChildViews_deprecated(region, tenantId, table, 
PHYSICAL_TABLE_BYTES);
         } else {
@@ -1840,7 +1846,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 result =
                         doDropTable(key, tenantIdBytes, schemaName, tableName, 
parentTableName,
                             PTableType.fromSerializedValue(tableType), 
tableMetadata,
-                            invalidateList, locks, tableNamesToDelete, 
sharedTablesToDelete, isCascade);
+                            invalidateList, locks, tableNamesToDelete, 
sharedTablesToDelete, isCascade, request.getClientVersion());
                 if (result.getMutationCode() != 
MutationCode.TABLE_ALREADY_EXISTS) {
                     done.run(MetaDataMutationResult.toProto(result));
                     return;
@@ -1871,7 +1877,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
     private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, 
byte[] schemaName,
         byte[] tableName, byte[] parentTableName, PTableType tableType, 
List<Mutation> rowsToDelete,
         List<ImmutableBytesPtr> invalidateList, List<RowLock> locks,
-        List<byte[]> tableNamesToDelete, List<SharedTableState> 
sharedTablesToDelete, boolean isCascade) throws IOException, SQLException {
+        List<byte[]> tableNamesToDelete, List<SharedTableState> 
sharedTablesToDelete, boolean isCascade, int clientVersion) throws IOException, 
SQLException {
 
 
         long clientTimeStamp = MetaDataUtil.getClientTimeStamp(rowsToDelete);
@@ -1884,7 +1890,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
 
         // We always cache the latest version - fault in if not in cache
         if (table != null
-                || (table = buildTable(key, cacheKey, region, 
HConstants.LATEST_TIMESTAMP)) != null) {
+                || (table = buildTable(key, cacheKey, region, 
HConstants.LATEST_TIMESTAMP, clientVersion)) != null) {
             if (table.getTimeStamp() < clientTimeStamp) {
                 if (isTableDeleted(table) || tableType != table.getType()) {
                     return new 
MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, 
EnvironmentEdgeManager.currentTimeMillis(), null);
@@ -1921,7 +1927,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
 
             if (tableType == PTableType.TABLE || tableType == 
PTableType.SYSTEM) {
                 // Handle any child views that exist
-                TableViewFinder tableViewFinderResult = findChildViews(region, 
tenantId, table);
+                TableViewFinder tableViewFinderResult = findChildViews(region, 
tenantId, table, clientVersion);
                 if (tableViewFinderResult.hasViews()) {
                     if (isCascade) {
                         if (tableViewFinderResult.allViewsInMultipleRegions()) 
{
@@ -1941,7 +1947,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                                 acquireLock(region, viewKey, locks);
                                 MetaDataMutationResult result = 
doDropTable(viewKey, viewTenantId, viewSchemaName,
                                         viewName, null, PTableType.VIEW, 
rowsToDelete, invalidateList, locks,
-                                        tableNamesToDelete, 
sharedTablesToDelete, false);
+                                        tableNamesToDelete, 
sharedTablesToDelete, false, clientVersion);
                                 if (result.getMutationCode() != 
MutationCode.TABLE_ALREADY_EXISTS) { return result; }
                             }
                         }
@@ -2004,7 +2010,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             acquireLock(region, indexKey, locks);
             MetaDataMutationResult result =
                     doDropTable(indexKey, tenantId, schemaName, indexName, 
tableName, PTableType.INDEX,
-                        rowsToDelete, invalidateList, locks, 
tableNamesToDelete, sharedTablesToDelete, false);
+                        rowsToDelete, invalidateList, locks, 
tableNamesToDelete, sharedTablesToDelete, false, clientVersion);
             if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) 
{
                 return result;
             }
@@ -2022,7 +2028,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
     }
 
     private MetaDataMutationResult
-    mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator) throws 
IOException {
+    mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator, int 
clientVersion) throws IOException {
         byte[][] rowKeyMetaData = new byte[5][];
         MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, 
rowKeyMetaData);
         byte[] tenantId = 
rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
@@ -2056,7 +2062,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 // Get client timeStamp from mutations
                 long clientTimeStamp = 
MetaDataUtil.getClientTimeStamp(tableMetadata);
                 if (table == null
-                        && (table = buildTable(key, cacheKey, region, 
HConstants.LATEST_TIMESTAMP)) == null) {
+                        && (table = buildTable(key, cacheKey, region, 
HConstants.LATEST_TIMESTAMP, clientVersion)) == null) {
                     // if not found then call newerTableExists and add delete 
marker for timestamp
                     // found
                     table = buildDeletedTable(key, cacheKey, region, 
clientTimeStamp);
@@ -2127,7 +2133,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 if (result !=null) {
                     return result;
                 } else {
-                    table = buildTable(key, cacheKey, region, 
HConstants.LATEST_TIMESTAMP);
+                    table = buildTable(key, cacheKey, region, 
HConstants.LATEST_TIMESTAMP, clientVersion);
                     return new 
MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, table);
                 }
             } finally {
@@ -2286,7 +2292,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
     
     private MetaDataMutationResult 
addColumnsAndTablePropertiesToChildViews(PTable basePhysicalTable, 
List<Mutation> tableMetadata, List<Mutation> mutationsForAddingColumnsToViews, 
byte[] schemaName, byte[] tableName,
             List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, 
TableViewFinder childViewsResult,
-            HRegion region, List<RowLock> locks) throws IOException, 
SQLException {
+            HRegion region, List<RowLock> locks, int clientVersion) throws 
IOException, SQLException {
         List<PutWithOrdinalPosition> columnPutsForBaseTable = 
Lists.newArrayListWithExpectedSize(tableMetadata.size());
         Map<TableProperty, Cell> tablePropertyCellMap = 
Maps.newHashMapWithExpectedSize(tableMetadata.size());
         // Isolate the puts relevant to adding columns. Also figure out what 
kind of columns are being added.
@@ -2333,7 +2339,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             
             // lock the rows corresponding to views so that no other thread 
can modify the view meta-data
             RowLock viewRowLock = acquireLock(region, viewKey, locks);
-            PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock);
+            PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock, 
clientVersion);
             
             ColumnOrdinalPositionUpdateList ordinalPositionList = new 
ColumnOrdinalPositionUpdateList();
             List<PColumn> viewPkCols = new ArrayList<>(view.getPKColumns());
@@ -2632,7 +2638,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             PTable basePhysicalTable, List<RowLock> locks, List<Mutation> 
tableMetadata,
             List<Mutation> mutationsForAddingColumnsToViews, byte[] 
schemaName, byte[] tableName,
             List<ImmutableBytesPtr> invalidateList, long clientTimeStamp,
-            TableViewFinder childViewsResult, List<byte[]> tableNamesToDelete, 
List<SharedTableState> sharedTablesToDelete)
+            TableViewFinder childViewsResult, List<byte[]> tableNamesToDelete, 
List<SharedTableState> sharedTablesToDelete, int clientVersion)
             throws IOException, SQLException {
         List<Delete> columnDeletesForBaseTable = new 
ArrayList<>(tableMetadata.size());
         // Isolate the deletes relevant to dropping columns. Also figure out 
what kind of columns
@@ -2658,7 +2664,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             // lock the rows corresponding to views so that no other thread 
can modify the view
             // meta-data
             RowLock viewRowLock = acquireLock(region, viewKey, locks);
-            PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock);
+            PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock, 
clientVersion);
 
             ColumnOrdinalPositionUpdateList ordinalPositionList =
                     new ColumnOrdinalPositionUpdateList();
@@ -2729,7 +2735,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     dropIndexes(view, region, invalidateList, locks, 
clientTimeStamp,
                         schemaName, view.getName().getBytes(),
                         mutationsForAddingColumnsToViews, existingViewColumn,
-                        tableNamesToDelete, sharedTablesToDelete);
+                        tableNamesToDelete, sharedTablesToDelete, 
clientVersion);
                 }
             }
 
@@ -2951,7 +2957,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     List<Mutation> mutationsForAddingColumnsToViews = 
Lists.newArrayListWithExpectedSize(tableMetaData.size() * ( 1 + 
table.getIndexes().size()));
                     if (type == PTableType.TABLE || type == PTableType.SYSTEM) 
{
                         TableViewFinder childViewsResult = new 
TableViewFinder();
-                        findAllChildViews(region, tenantId, table, 
childViewsResult, clientTimeStamp);
+                        findAllChildViews(region, tenantId, table, 
childViewsResult, clientTimeStamp, request.getClientVersion());
                         if (childViewsResult.hasViews()) {
                            /* 
                             * Dis-allow if:
@@ -2976,7 +2982,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                             } else {
                                 mutationsForAddingColumnsToViews = new 
ArrayList<>(childViewsResult.getViewInfoList().size() * tableMetaData.size());
                                 MetaDataMutationResult mutationResult = 
addColumnsAndTablePropertiesToChildViews(table, tableMetaData, 
mutationsForAddingColumnsToViews, schemaName, tableName, invalidateList, 
clientTimeStamp,
-                                        childViewsResult, region, locks);
+                                        childViewsResult, region, locks, 
request.getClientVersion());
                                 // return if we were not able to add the 
column successfully
                                 if (mutationResult!=null)
                                        return mutationResult;
@@ -3067,7 +3073,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     tableMetaData.addAll(mutationsForAddingColumnsToViews);
                     return null;
                 }
-            });
+            }, request.getClientVersion());
             if (result != null) {
                 done.run(MetaDataMutationResult.toProto(result));
             }
@@ -3078,11 +3084,11 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         }
     }
 
-    private PTable doGetTable(byte[] key, long clientTimeStamp) throws 
IOException, SQLException {
-        return doGetTable(key, clientTimeStamp, null);
+    private PTable doGetTable(byte[] key, long clientTimeStamp, int 
clientVersion) throws IOException, SQLException {
+        return doGetTable(key, clientTimeStamp, null, clientVersion);
     }
 
-    private PTable doGetTable(byte[] key, long clientTimeStamp, RowLock 
rowLock) throws IOException, SQLException {
+    private PTable doGetTable(byte[] key, long clientTimeStamp, RowLock 
rowLock, int clientVersion) throws IOException, SQLException {
         ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
         Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                 GlobalCache.getInstance(this.env).getMetaDataCache();
@@ -3134,13 +3140,13 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 return table;
             }
             // Query for the latest table first, since it's not cached
-            table = buildTable(key, cacheKey, region, 
HConstants.LATEST_TIMESTAMP);
+            table = buildTable(key, cacheKey, region, 
HConstants.LATEST_TIMESTAMP, clientVersion);
             if ((table != null && table.getTimeStamp() < clientTimeStamp) || 
                     (blockWriteRebuildIndex && 
table.getIndexDisableTimestamp() > 0)) {
                 return table;
             }
             // Otherwise, query for an older version of the table - it won't 
be cached
-            return buildTable(key, cacheKey, region, clientTimeStamp);
+            return buildTable(key, cacheKey, region, clientTimeStamp, 
clientVersion);
         } finally {
             if (!wasLocked) rowLock.release();
         }
@@ -3207,7 +3213,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
     }
 
     @Override
-    public void dropColumn(RpcController controller, DropColumnRequest request,
+    public void dropColumn(RpcController controller, final DropColumnRequest 
request,
             RpcCallback<MetaDataResponse> done) {
         List<Mutation> tableMetaData = null;
         final List<byte[]> tableNamesToDelete = Lists.newArrayList();
@@ -3229,13 +3235,13 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     PTableType type = table.getType();
                     if (type == PTableType.TABLE || type == PTableType.SYSTEM) 
{
                         TableViewFinder childViewsResult = new 
TableViewFinder();
-                        findAllChildViews(region, tenantId, table, 
childViewsResult, clientTimeStamp);
+                        findAllChildViews(region, tenantId, table, 
childViewsResult, clientTimeStamp, request.getClientVersion());
                         if (childViewsResult.hasViews()) {
                             MetaDataMutationResult mutationResult =
                                     dropColumnsFromChildViews(region, table,
                                         locks, tableMetaData, 
additionalTableMetaData,
                                         schemaName, tableName, invalidateList,
-                                        clientTimeStamp, childViewsResult, 
tableNamesToDelete, sharedTablesToDelete);
+                                        clientTimeStamp, childViewsResult, 
tableNamesToDelete, sharedTablesToDelete, request.getClientVersion());
                             // return if we were not able to drop the column 
successfully
                             if (mutationResult != null) return mutationResult;
                         }
@@ -3293,7 +3299,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                                     dropIndexes(table, region, invalidateList, 
locks,
                                         clientTimeStamp, schemaName, tableName,
                                         additionalTableMetaData, 
columnToDelete,
-                                        tableNamesToDelete, 
sharedTablesToDelete);
+                                        tableNamesToDelete, 
sharedTablesToDelete, request.getClientVersion());
                                 } catch (ColumnFamilyNotFoundException e) {
                                     return new MetaDataMutationResult(
                                             MutationCode.COLUMN_NOT_FOUND, 
EnvironmentEdgeManager
@@ -3316,7 +3322,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     long currentTime = 
MetaDataUtil.getClientTimeStamp(tableMetaData);
                     return new 
MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, null, 
tableNamesToDelete, sharedTablesToDelete);
                 }
-            });
+            }, request.getClientVersion());
             if (result != null) {
                 done.run(MetaDataMutationResult.toProto(result));
             }
@@ -3330,7 +3336,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
     private void dropIndexes(PTable table, HRegion region, 
List<ImmutableBytesPtr> invalidateList,
             List<RowLock> locks, long clientTimeStamp, byte[] schemaName,
             byte[] tableName, List<Mutation> additionalTableMetaData, PColumn 
columnToDelete, 
-            List<byte[]> tableNamesToDelete, List<SharedTableState> 
sharedTablesToDelete)
+            List<byte[]> tableNamesToDelete, List<SharedTableState> 
sharedTablesToDelete, int clientVersion)
             throws IOException, SQLException {
         // Look for columnToDelete in any indexes. If found as PK column, get 
lock and drop the
         // index and then invalidate it
@@ -3369,7 +3375,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 additionalTableMetaData.add(new Delete(linkKey, 
clientTimeStamp));
                 doDropTable(indexKey, tenantId, 
index.getSchemaName().getBytes(), index
                         .getTableName().getBytes(), tableName, index.getType(),
-                    additionalTableMetaData, invalidateList, locks, 
tableNamesToDelete, sharedTablesToDelete, false);
+                    additionalTableMetaData, invalidateList, locks, 
tableNamesToDelete, sharedTablesToDelete, false, clientVersion);
                 invalidateList.add(new ImmutableBytesPtr(indexKey));
             }
             // If the dropped column is a covered index column, invalidate the 
index
@@ -3435,7 +3441,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 done.run(MetaDataMutationResult.toProto(result));
                 return;
             }
-            long timeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
+            long timeStamp = HConstants.LATEST_TIMESTAMP;
             ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
             List<Cell> newKVs = 
tableMetadata.get(0).getFamilyCellMap().get(TABLE_FAMILY_BYTES);
             Cell newKV = null;
@@ -3447,6 +3453,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                       INDEX_STATE_BYTES, 0, INDEX_STATE_BYTES.length) == 0){
                   newKV = cell;
                   indexStateKVIndex = index;
+                  timeStamp = cell.getTimestamp();
                 } else if (Bytes.compareTo(cell.getQualifierArray(), 
cell.getQualifierOffset(), cell.getQualifierLength(),
                   INDEX_DISABLE_TIMESTAMP_BYTES, 0, 
INDEX_DISABLE_TIMESTAMP_BYTES.length) == 0) {
                   disableTimeStampKVIndex = index;
@@ -3461,7 +3468,6 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             }
             try {
                 Get get = new Get(key);
-                get.setTimeRange(PTable.INITIAL_SEQ_NUM, timeStamp);
                 get.addColumn(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
                 get.addColumn(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
                 get.addColumn(TABLE_FAMILY_BYTES, 
INDEX_DISABLE_TIMESTAMP_BYTES);
@@ -3481,6 +3487,8 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 PIndexState currentState =
                         
PIndexState.fromSerializedValue(currentStateKV.getValueArray()[currentStateKV
                                 .getValueOffset()]);
+                // Timestamp of INDEX_STATE gets updated with each call
+                long actualTimestamp = currentStateKV.getTimestamp();
                 long curTimeStampVal = 0;
                 if ((currentDisableTimeStamp != null && 
currentDisableTimeStamp.getValueLength() > 0)) {
                     curTimeStampVal = (Long) 
PLong.INSTANCE.toObject(currentDisableTimeStamp.getValueArray(),
@@ -3488,32 +3496,28 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     // new DisableTimeStamp is passed in
                     if (disableTimeStampKVIndex >= 0) {
                         Cell newDisableTimeStampCell = 
newKVs.get(disableTimeStampKVIndex);
+                        long expectedTimestamp = 
newDisableTimeStampCell.getTimestamp();
+                        // If the index status has been updated after the 
upper bound of the scan we use
+                        // to partially rebuild the index, then we need to 
fail the rebuild because an
+                        // index write failed before the rebuild was complete.
+                        if (actualTimestamp > expectedTimestamp) {
+                            
builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
+                            
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                            done.run(builder.build());
+                            return;
+                        }
                         long newDisableTimeStamp = (Long) 
PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(),
                                 newDisableTimeStampCell.getValueOffset(), 
newDisableTimeStampCell.getValueLength());
-                        // We never set the INDEX_DISABLE_TIMESTAMP to a 
positive value when we're setting the state to ACTIVE.
-                        // Instead, we're passing in what we expect the 
INDEX_DISABLE_TIMESTAMP to be currently. If it's
-                        // changed, it means that a data table row failed to 
write while we were partially rebuilding it
-                        // and we must rerun it.
-                        if (newState == PIndexState.ACTIVE && 
newDisableTimeStamp > 0) {
-                            // Don't allow setting to ACTIVE if the 
INDEX_DISABLE_TIMESTAMP doesn't match
-                            // what we expect.
-                            if (newDisableTimeStamp != 
Math.abs(curTimeStampVal)) {
-                                
builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
-                                
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
-                                done.run(builder.build());
-                                return;
-                           }
-                            // Reset INDEX_DISABLE_TIMESTAMP_BYTES to zero as 
we're good to go.
-                            newKVs.set(disableTimeStampKVIndex, 
-                                    CellUtil.createCell(key, 
TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES, 
-                                            timeStamp, 
KeyValue.Type.Put.getCode(), PLong.INSTANCE.toBytes(0L)));
-                        }
                         // We use the sign of the INDEX_DISABLE_TIMESTAMP to 
differentiate the keep-index-active (negative)
                         // from block-writes-to-data-table case. In either 
case, we want to keep the oldest timestamp to
                         // drive the partial index rebuild rather than update 
it with each attempt to update the index
                         // when a new data table write occurs.
-                        if (curTimeStampVal != 0 && Math.abs(curTimeStampVal) 
< Math.abs(newDisableTimeStamp)) {
-                            // not reset disable timestamp
+                        // We do legitimately move the INDEX_DISABLE_TIMESTAMP 
to be newer when we're rebuilding the
+                        // index in which case the state will be INACTIVE or 
PENDING_ACTIVE.
+                        if (curTimeStampVal != 0 
+                                && (newState == PIndexState.DISABLE || 
newState == PIndexState.PENDING_ACTIVE) 
+                                && Math.abs(curTimeStampVal) < 
Math.abs(newDisableTimeStamp)) {
+                            // do not reset disable timestamp as we want to 
keep the min
                             newKVs.remove(disableTimeStampKVIndex);
                             disableTimeStampKVIndex = -1;
                         }
@@ -3545,29 +3549,42 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 if (currentState == PIndexState.BUILDING && newState != 
PIndexState.ACTIVE) {
                     timeStamp = currentStateKV.getTimestamp();
                 }
-                if ((currentState == PIndexState.UNUSABLE && newState == 
PIndexState.ACTIVE)
-                        || (currentState == PIndexState.ACTIVE && newState == 
PIndexState.UNUSABLE)) {
+                if ((currentState == PIndexState.ACTIVE || currentState == 
PIndexState.PENDING_ACTIVE) && newState == PIndexState.UNUSABLE) {
                     newState = PIndexState.INACTIVE;
                     newKVs.set(indexStateKVIndex, 
KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
                         INDEX_STATE_BYTES, timeStamp, 
Bytes.toBytes(newState.getSerializedValue())));
-                } else if (currentState == PIndexState.INACTIVE && newState == 
PIndexState.USABLE) {
-                    newState = PIndexState.ACTIVE;
+                } else if ((currentState == PIndexState.INACTIVE || 
currentState == PIndexState.PENDING_ACTIVE) && newState == PIndexState.USABLE) {
+                    // Don't allow manual state change to USABLE (i.e. ACTIVE) 
if non zero INDEX_DISABLE_TIMESTAMP
+                    if (curTimeStampVal != 0) {
+                        newState = currentState;
+                    } else {
+                        newState = PIndexState.ACTIVE;
+                    }
                     newKVs.set(indexStateKVIndex, 
KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
                         INDEX_STATE_BYTES, timeStamp, 
Bytes.toBytes(newState.getSerializedValue())));
                 }
 
                 PTable returnTable = null;
                 if (currentState != newState || disableTimeStampKVIndex != -1) 
{
+                    // make a copy of tableMetadata so we can add to it
+                    tableMetadata = new ArrayList<Mutation>(tableMetadata);
+                    // Always include the empty column value at latest 
timestamp so
+                    // that clients pull over update.
+                    Put emptyValue = new Put(key);
+                    emptyValue.add(TABLE_FAMILY_BYTES, 
+                            QueryConstants.EMPTY_COLUMN_BYTES, 
+                            HConstants.LATEST_TIMESTAMP, 
+                            QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+                    tableMetadata.add(emptyValue);
                     byte[] dataTableKey = null;
-                    if(dataTableKV != null) {
-                        dataTableKey = SchemaUtil.getTableKey(tenantId, 
schemaName, dataTableKV.getValue());
-                    }
-                    if(dataTableKey != null) {
-                        // make a copy of tableMetadata
-                        tableMetadata = new ArrayList<Mutation>(tableMetadata);
+                    if (dataTableKV != null) {
+                        dataTableKey = SchemaUtil.getTableKey(tenantId, 
schemaName, CellUtil.cloneValue(dataTableKV));
                         // insert an empty KV to trigger time stamp update on 
data table row
                         Put p = new Put(dataTableKey);
-                        p.add(TABLE_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, 
QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+                        p.add(TABLE_FAMILY_BYTES,
+                                QueryConstants.EMPTY_COLUMN_BYTES,
+                                HConstants.LATEST_TIMESTAMP,
+                                QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
                         tableMetadata.add(p);
                     }
                     boolean setRowKeyOrderOptimizableCell = newState == 
PIndexState.BUILDING && !rowKeyOrderOptimizable;
@@ -3585,7 +3602,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     }
                     if (setRowKeyOrderOptimizableCell || 
disableTimeStampKVIndex != -1
                             || currentState == PIndexState.DISABLE || newState 
== PIndexState.BUILDING) {
-                        returnTable = doGetTable(key, 
HConstants.LATEST_TIMESTAMP, rowLock);
+                        returnTable = doGetTable(key, 
HConstants.LATEST_TIMESTAMP, rowLock, request.getClientVersion());
                     }
                 }
                 // Get client timeStamp from mutations, since it may get 
updated by the

Reply via email to