PHOENIX-3811 Do not disable index on write failure by default

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b1ddaa2b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b1ddaa2b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b1ddaa2b

Branch: refs/heads/4.x-HBase-1.1
Commit: b1ddaa2b863ee58674053819632fddc13f0dbce1
Parents: 9838dcf
Author: James Taylor <[email protected]>
Authored: Wed May 10 09:52:23 2017 -0700
Committer: James Taylor <[email protected]>
Committed: Thu May 11 17:39:09 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/AutomaticRebuildIT.java     | 221 --------------
 .../end2end/IndexToolForPartialBuildIT.java     |  15 +-
 ...olForPartialBuildWithNamespaceEnabledIT.java |  15 +-
 .../end2end/index/MutableIndexFailureIT.java    | 276 ++++++++++++------
 .../end2end/index/ReadOnlyIndexFailureIT.java   | 291 -------------------
 .../apache/phoenix/compile/DeleteCompiler.java  |   5 +
 .../apache/phoenix/compile/UpsertCompiler.java  |   4 +
 .../coprocessor/MetaDataEndpointImpl.java       |   6 +-
 .../coprocessor/MetaDataRegionObserver.java     |  44 ++-
 .../UngroupedAggregateRegionObserver.java       |   7 +
 .../phoenix/exception/SQLExceptionCode.java     |   2 +
 .../apache/phoenix/execute/CommitException.java |   8 +-
 .../apache/phoenix/execute/MutationState.java   |  13 +-
 .../phoenix/hbase/index/write/IndexWriter.java  |   4 +
 .../write/LeaveIndexActiveFailurePolicy.java    |  62 ++++
 .../index/PhoenixIndexFailurePolicy.java        |  82 +++++-
 .../index/PhoenixTransactionalIndexer.java      |   5 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  44 ++-
 .../phoenix/mapreduce/index/IndexTool.java      |  12 +-
 .../query/ConnectionQueryServicesImpl.java      |   2 +-
 .../org/apache/phoenix/query/QueryServices.java |   3 +-
 .../phoenix/query/QueryServicesOptions.java     |   3 +-
 .../apache/phoenix/schema/MetaDataClient.java   |   7 +-
 .../java/org/apache/phoenix/util/IndexUtil.java |  12 +-
 .../java/org/apache/phoenix/util/JDBCUtil.java  |   5 +
 .../org/apache/phoenix/util/PhoenixRuntime.java |  14 +
 .../org/apache/phoenix/util/ServerUtil.java     |  37 +++
 .../hbase/index/write/TestIndexWriter.java      |   6 +
 28 files changed, 525 insertions(+), 680 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
deleted file mode 100644
index 25cab35..0000000
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.StringUtil;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.common.collect.Maps;
-
-/**
- * Tests for the {@link AutomaticRebuildIT}
- */
-@RunWith(Parameterized.class)
-public class AutomaticRebuildIT extends BaseOwnClusterIT {
-
-       private final boolean localIndex;
-       protected boolean isNamespaceEnabled = false;
-       protected final String tableDDLOptions;
-
-       public AutomaticRebuildIT(boolean localIndex) {
-               this.localIndex = localIndex;
-               StringBuilder optionBuilder = new StringBuilder();
-               optionBuilder.append(" SPLIT ON(1,2)");
-               this.tableDDLOptions = optionBuilder.toString();
-       }
-
-       @BeforeClass
-       public static void doSetup() throws Exception {
-               Map<String, String> serverProps = 
Maps.newHashMapWithExpectedSize(7);
-               serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
-               serverProps.put("hbase.coprocessor.region.classes", 
FailingRegionObserver.class.getName());
-               serverProps.put(" 
yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
-               serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
-               serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
-               serverProps.put("hbase.client.pause", "5000");
-               
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, "1000");
-               
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE,
 "5");
-               Map<String, String> clientProps = 
Maps.newHashMapWithExpectedSize(1);
-               setUpTestDriver(new 
ReadOnlyProps(serverProps.entrySet().iterator()),
-                               new 
ReadOnlyProps(clientProps.entrySet().iterator()));
-       }
-
-       @Parameters(name = "localIndex = {0}")
-       public static Collection<Boolean[]> data() {
-               return Arrays.asList(new Boolean[][] { { false }, { true } });
-       }
-
-       @Test
-       public void testSecondaryAutomaticRebuildIndex() throws Exception {
-               String schemaName = generateUniqueName();
-               String dataTableName = generateUniqueName();
-               String fullTableName = SchemaUtil.getTableName(schemaName, 
dataTableName);
-               final String indxTable = String.format("%s_%s", dataTableName, 
FailingRegionObserver.INDEX_NAME);
-               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-               props.setProperty(QueryServices.TRANSACTIONS_ENABLED, 
Boolean.TRUE.toString());
-               props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, 
Boolean.FALSE.toString());
-               props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, 
Boolean.toString(isNamespaceEnabled));
-               final Connection conn = DriverManager.getConnection(getUrl(), 
props);
-               Statement stmt = conn.createStatement();
-               try {
-                       if (isNamespaceEnabled) {
-                               conn.createStatement().execute("CREATE SCHEMA 
IF NOT EXISTS " + schemaName);
-                       }
-                       stmt.execute(String.format(
-                                       "CREATE TABLE %s (ID BIGINT NOT NULL, 
NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID ROW_TIMESTAMP)) %s",
-                                       fullTableName, tableDDLOptions));
-                       String upsertQuery = String.format("UPSERT INTO %s 
VALUES(?, ?, ?)", fullTableName);
-                       PreparedStatement stmt1 = 
conn.prepareStatement(upsertQuery);
-                       FailingRegionObserver.FAIL_WRITE = false;
-                       // insert two rows
-                       upsertRow(stmt1, 1000);
-                       upsertRow(stmt1, 2000);
-
-                       conn.commit();
-                       stmt.execute(String.format("CREATE %s INDEX %s ON %s  
(LPAD(UPPER(NAME),11,'x')||'_xyz') ",
-                                       (localIndex ? "LOCAL" : ""), indxTable, 
fullTableName));
-                       FailingRegionObserver.FAIL_WRITE = true;
-                       upsertRow(stmt1, 3000);
-                       upsertRow(stmt1, 4000);
-                       upsertRow(stmt1, 5000);
-                       try {
-                               conn.commit();
-                               fail();
-                       } catch (SQLException e) {
-                       } catch (Exception e) {
-                       }
-                       FailingRegionObserver.FAIL_WRITE = false;
-                       ResultSet rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schemaName), indxTable,
-                                       new String[] { 
PTableType.INDEX.toString() });
-                       assertTrue(rs.next());
-                       assertEquals(indxTable, rs.getString(3));
-                       String indexState = rs.getString("INDEX_STATE");
-                       assertEquals(PIndexState.DISABLE.toString(), 
indexState);
-                       assertFalse(rs.next());
-                       upsertRow(stmt1, 6000);
-                       upsertRow(stmt1, 7000);
-                       conn.commit();
-                       int maxTries = 4, nTries = 0;
-                       boolean isInactive = false;
-                       do {
-                               rs = conn.createStatement()
-                                               
.executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.INDEX_STATE + 
","
-                                                               + 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM "
-                                                               +"\""+ 
SYSTEM_CATALOG_SCHEMA + "\"." + SYSTEM_CATALOG_TABLE + " ("
-                                                               + 
PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where "
-                                                               + 
PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and "
-                                                               + 
PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'"));
-                               rs.next();
-                               if 
(PIndexState.INACTIVE.getSerializedValue().equals(rs.getString(1)) && 
rs.getLong(2) > 3000) {
-                                       isInactive = true;
-                                       break;
-                               }
-                               Thread.sleep(10 * 1000); // sleep 10 secs
-                       } while (++nTries < maxTries);
-                       assertTrue(isInactive);
-                       nTries = 0;
-                       boolean isActive = false;
-                       do {
-                               Thread.sleep(15 * 1000); // sleep 15 secs
-                               rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schemaName), indxTable,
-                                               new String[] { 
PTableType.INDEX.toString() });
-                               assertTrue(rs.next());
-                               if 
(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) {
-                                       isActive = true;
-                                       break;
-                               }
-                       } while (++nTries < maxTries);
-                       assertTrue(isActive);
-
-               } finally {
-                       conn.close();
-               }
-       }
-
-       public static void upsertRow(PreparedStatement stmt, int i) throws 
SQLException {
-               // insert row
-               stmt.setInt(1, i);
-               stmt.setString(2, "uname" + String.valueOf(i));
-               stmt.setInt(3, 95050 + i);
-               stmt.executeUpdate();
-       }
-
-       public static class FailingRegionObserver extends SimpleRegionObserver {
-               public static volatile boolean FAIL_WRITE = false;
-               public static final String INDEX_NAME = "IDX";
-
-               @Override
-               public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
-                               MiniBatchOperationInProgress<Mutation> 
miniBatchOp) throws HBaseIOException {
-                       if 
(c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME)
 && FAIL_WRITE) {
-                               throw new DoNotRetryIOException();
-                       }
-                       Mutation operation = miniBatchOp.getOperation(0);
-                       Set<byte[]> keySet = operation.getFamilyMap().keySet();
-                       for (byte[] family : keySet) {
-                               if 
(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)
 && FAIL_WRITE) {
-                                       throw new DoNotRetryIOException();
-                               }
-                       }
-               }
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
index 599e601..59a9106 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
@@ -89,9 +89,8 @@ public class IndexToolForPartialBuildIT extends 
BaseOwnClusterIT {
         this.tableDDLOptions = optionBuilder.toString();
     }
     
-    @BeforeClass
-    public static void doSetup() throws Exception {
-        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
+    public static Map<String, String> getServerProperties() {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
         serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
         serverProps.put("hbase.coprocessor.region.classes", 
FailingRegionObserver.class.getName());
         serverProps.put(" 
yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
@@ -99,8 +98,14 @@ public class IndexToolForPartialBuildIT extends 
BaseOwnClusterIT {
         serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
         serverProps.put("hbase.client.pause", "5000");
         serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, 
Boolean.FALSE.toString());
-        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
-        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
+        serverProps.put(QueryServices.INDEX_FAILURE_DISABLE_INDEX, 
Boolean.TRUE.toString());
+        return serverProps;
+    }
+    
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = getServerProperties();
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
ReadOnlyProps.EMPTY_PROPS);
     }
     
     @Parameters(name="localIndex = {0}")

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
index 4b2371c..a8c1f1e 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
@@ -21,13 +21,9 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.BeforeClass;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Maps;
@@ -35,7 +31,6 @@ import com.google.common.collect.Maps;
 /**
  * Tests for the {@link IndexToolForPartialBuildWithNamespaceEnabled}
  */
-@RunWith(Parameterized.class)
 public class IndexToolForPartialBuildWithNamespaceEnabledIT extends 
IndexToolForPartialBuildIT {
     
     
@@ -45,15 +40,9 @@ public class IndexToolForPartialBuildWithNamespaceEnabledIT 
extends IndexToolFor
     }
     
     @BeforeClass
+    @Shadower(classBeingShadowed = IndexToolForPartialBuildIT.class)
     public static void doSetup() throws Exception {
-        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
-        serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
-        serverProps.put("hbase.coprocessor.region.classes", 
FailingRegionObserver.class.getName());
-        serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
-        serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
-        serverProps.put("hbase.client.pause", "5000");
-        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, 
"2000");
-        
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, 
"1000");
+        Map<String, String> serverProps = getServerProperties();
         serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
         Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
         clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index d07c8fa..075f799 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -24,16 +24,17 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.sql.Connection;
+import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
@@ -44,25 +45,33 @@ import 
org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.execute.CommitException;
+import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
-import org.apache.phoenix.util.TestUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 /**
  * 
@@ -72,38 +81,47 @@ import com.google.common.collect.Maps;
  * 
  */
 
+@Ignore("Not working for HBase 1.1")
 @Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
 public class MutableIndexFailureIT extends BaseTest {
-    public static volatile boolean FAIL_WRITE = false;
     public static final String INDEX_NAME = "IDX";
+    public static final String TABLE_NAME = "T";
+
+    public static volatile boolean FAIL_WRITE = false;
+    public static volatile String fullTableName;
     
     private String tableName;
     private String indexName;
-    private String fullTableName;
     private String fullIndexName;
 
     private final boolean transactional;
     private final boolean localIndex;
     private final String tableDDLOptions;
     private final boolean isNamespaceMapped;
+    private final boolean leaveIndexActiveOnFailure;
+    private final boolean rebuildIndexOnWriteFailure;
     private String schema = generateUniqueName();
+    private List<CommitException> exceptions = Lists.newArrayList();
 
     @AfterClass
     public static void doTeardown() throws Exception {
         tearDownMiniCluster();
     }
 
-    public MutableIndexFailureIT(boolean transactional, boolean localIndex, 
boolean isNamespaceMapped) {
+    public MutableIndexFailureIT(boolean transactional, boolean localIndex, 
boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, Boolean 
rebuildIndexOnWriteFailure) {
         this.transactional = transactional;
         this.localIndex = localIndex;
-        this.tableDDLOptions = transactional ? " TRANSACTIONAL=true " : "";
-        this.tableName = (localIndex ? "L_" : "") + 
TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "")
-                + (isNamespaceMapped ? "_NM" : "");
-        this.indexName = INDEX_NAME;
-        this.fullTableName = SchemaUtil.getTableName(schema, tableName);
+        this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", 
TRANSACTIONAL=true " : "") 
+                + (disableIndexOnWriteFailure == null ? "" : (", " + 
PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE + "=" + 
disableIndexOnWriteFailure))
+                + (rebuildIndexOnWriteFailure == null ? "" : (", " + 
PhoenixIndexFailurePolicy.REBUILD_INDEX_ON_WRITE_FAILURE + "=" + 
rebuildIndexOnWriteFailure));
+        this.tableName = FailingRegionObserver.FAIL_TABLE_NAME;
+        this.indexName = "A_" + FailingRegionObserver.FAIL_INDEX_NAME;
+        fullTableName = SchemaUtil.getTableName(schema, tableName);
         this.fullIndexName = SchemaUtil.getTableName(schema, indexName);
         this.isNamespaceMapped = isNamespaceMapped;
+        this.leaveIndexActiveOnFailure = ! (disableIndexOnWriteFailure == null 
? QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX : 
disableIndexOnWriteFailure);
+        this.rebuildIndexOnWriteFailure = 
Boolean.TRUE.equals(rebuildIndexOnWriteFailure);
     }
 
     @BeforeClass
@@ -115,16 +133,30 @@ public class MutableIndexFailureIT extends BaseTest {
         serverProps.put("hbase.client.pause", "5000");
         serverProps.put("data.tx.snapshot.dir", "/tmp");
         serverProps.put("hbase.balancer.period", 
String.valueOf(Integer.MAX_VALUE));
-        Map<String, String> clientProps = 
Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, "true");
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, 
Boolean.TRUE.toString());
+        
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, 
"4000");
+        Map<String, String> clientProps = 
Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, 
Boolean.TRUE.toString());
         NUM_SLAVES_BASE = 4;
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
 
-    @Parameters(name = 
"MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2}") 
// name is used by failsafe as file name in reports
-    public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] { { false, false, true }, { 
false, false, false }, { false, true, true },
-                { false, true, false }, { true, false, true }, { true, true, 
true }, { true, false, false },
-                { true, true, false } });
+    @Parameters(name = 
"MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},rebuildIndexOnWriteFailure={4}")
 // name is used by failsafe as file name in reports
+    public static List<Object[]> data() {
+        return Arrays.asList(new Object[][] { 
+                { false, false, true, true, true }, 
+                { false, false, false, true, true }, 
+                { true, false, false, true, true }, 
+                { true, false, true, true, true },
+                { false, true, true, true, true }, 
+                { false, true, false, true, true }, 
+                { true, true, false, true, true }, 
+                { true, true, true, true, true },
+
+                { false, false, false, null, true }, 
+                { false, true, false, false, true }, 
+                { false, false, false, false, null }, 
+        } 
+        );
     }
 
     @Test
@@ -133,9 +165,9 @@ public class MutableIndexFailureIT extends BaseTest {
     }
 
     public void helpTestWriteFailureDisablesIndex() throws Exception {
-        String secondTableName = fullTableName + "_2";
-        String secondIndexName = indexName + "_2";
-        String secondFullIndexName = fullIndexName + "_2";
+        String secondIndexName = "B_" + FailingRegionObserver.FAIL_INDEX_NAME;
+//        String thirdIndexName = "C_" + INDEX_NAME;
+//        String thirdFullIndexName = SchemaUtil.getTableName(schema, 
thirdIndexName);
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, 
String.valueOf(isNamespaceMapped));
         try (Connection conn = driver.connect(url, props)) {
@@ -147,24 +179,26 @@ public class MutableIndexFailureIT extends BaseTest {
             }
             conn.createStatement().execute("CREATE TABLE " + fullTableName
                     + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 
VARCHAR) " + tableDDLOptions);
-            conn.createStatement().execute("CREATE TABLE " + secondTableName
-                    + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 
VARCHAR) " + tableDDLOptions);
             query = "SELECT * FROM " + fullTableName;
             rs = conn.createStatement().executeQuery(query);
             assertFalse(rs.next());
 
             FailingRegionObserver.FAIL_WRITE = false;
             conn.createStatement().execute(
-                    "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + 
indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
+                    "CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + 
indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
+            // Create other index which should be local/global if the other 
index is global/local to
+            // check the drop index.
             conn.createStatement().execute(
-                    "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + 
secondIndexName + " ON " + secondTableName + " (v1) INCLUDE (v2)");
+                    "CREATE "  + (!localIndex ? "LOCAL " : "") + " INDEX " + 
secondIndexName + " ON " + fullTableName + " (v2) INCLUDE (v1)");
+//            conn.createStatement().execute(
+//                    "CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + 
thirdIndexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
 
             query = "SELECT * FROM " + fullIndexName;
             rs = conn.createStatement().executeQuery(query);
             assertFalse(rs.next());
 
             // Verify the metadata for index is correct.
-            rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schema), indexName+"%",
+            rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schema), null,
                     new String[] { PTableType.INDEX.toString() });
             assertTrue(rs.next());
             assertEquals(indexName, rs.getString(3));
@@ -172,14 +206,15 @@ public class MutableIndexFailureIT extends BaseTest {
             assertTrue(rs.next());
             assertEquals(secondIndexName, rs.getString(3));
             assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
-            assertFalse(rs.next());
+//            assertTrue(rs.next());
+//            assertEquals(thirdIndexName, rs.getString(3));
+//            assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
             initializeTable(conn, fullTableName);
-            initializeTable(conn, secondTableName);
             
             query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
             rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-            String expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER "
-                    + 
SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped);
+            String expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER "
+                    + 
SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), 
isNamespaceMapped)+"\nCLIENT MERGE SORT";
             assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
             rs = conn.createStatement().executeQuery(query);
             assertTrue(rs.next());
@@ -194,15 +229,14 @@ public class MutableIndexFailureIT extends BaseTest {
             assertFalse(rs.next());
 
             FailingRegionObserver.FAIL_WRITE = true;
-            updateTable(conn, fullTableName);
-            updateTable(conn, secondTableName);
+            updateTable(conn, true);
             // Verify the metadata for index is correct.
-            rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schema), indexName,
+            rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schema), StringUtil.escapeLike(indexName),
                     new String[] { PTableType.INDEX.toString() });
             assertTrue(rs.next());
             assertEquals(indexName, rs.getString(3));
             // the index is only disabled for non-txn tables upon index table 
write failure
-            if (transactional) {
+            if (transactional || leaveIndexActiveOnFailure) {
                 assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
             } else {
                 String indexState = rs.getString("INDEX_STATE");
@@ -214,24 +248,12 @@ public class MutableIndexFailureIT extends BaseTest {
             // in an all or none manner. If the table is not transactional, 
then the data writes
             // would have succeeded while the index writes would have failed.
             if (!transactional) {
-                // Verify UPSERT on data table still work after index is 
disabled
-                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " 
+ fullTableName + " VALUES(?,?,?)");
-                stmt.setString(1, "a3");
-                stmt.setString(2, "x3");
-                stmt.setString(3, "3");
-                stmt.execute();
-                conn.commit();
-                stmt = conn.prepareStatement("UPSERT INTO " + secondTableName 
+ " VALUES(?,?,?)");
-                stmt.setString(1, "a3");
-                stmt.setString(2, "x3");
-                stmt.setString(3, "3");
-                stmt.execute();
-                conn.commit();
+                updateTableAgain(conn, leaveIndexActiveOnFailure);
                 // Verify previous writes succeeded to data table
                 query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
                 rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-                expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER "
-                        + 
SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped);
+                expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER "
+                        + 
SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), 
isNamespaceMapped)+"\nCLIENT MERGE SORT";
                 assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
                 rs = conn.createStatement().executeQuery(query);
                 assertTrue(rs.next());
@@ -248,13 +270,20 @@ public class MutableIndexFailureIT extends BaseTest {
                 assertEquals("d", rs.getString(2));
                 assertFalse(rs.next());
             }
+            // Comment back in when PHOENIX-3815 is fixed
+//            validateDataWithIndex(conn, fullTableName, thirdFullIndexName, 
false);
 
             // re-enable index table
             FailingRegionObserver.FAIL_WRITE = false;
-            waitForIndexToBeActive(conn,indexName);
-            waitForIndexToBeActive(conn,secondIndexName);
+            if (rebuildIndexOnWriteFailure) {
+                // wait for index to be rebuilt automatically
+                waitForIndexToBeRebuilt(conn,indexName);
+            } else {
+                // simulate replaying failed mutation
+                replayMutations();
+            }
 
-            // Verify UPSERT on data table still work after index table is 
recreated
+            // Verify UPSERT on data table still works after index table is 
recreated
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
fullTableName + " VALUES(?,?,?)");
             stmt.setString(1, "a3");
             stmt.setString(2, "x4");
@@ -262,31 +291,26 @@ public class MutableIndexFailureIT extends BaseTest {
             stmt.execute();
             conn.commit();
             
-            stmt = conn.prepareStatement("UPSERT INTO " + secondTableName + " 
VALUES(?,?,?)");
-            stmt.setString(1, "a3");
-            stmt.setString(2, "x4");
-            stmt.setString(3, "4");
-            stmt.execute();
-            conn.commit();
-
-            // verify index table has correct data
-            validateDataWithIndex(conn, fullTableName, fullIndexName);
-            validateDataWithIndex(conn, secondTableName, secondFullIndexName);
+            // verify index table has correct data (note that second index has 
been dropped)
+            validateDataWithIndex(conn, fullTableName, fullIndexName, 
localIndex);
         } finally {
             FAIL_WRITE = false;
         }
     }
 
-    private void waitForIndexToBeActive(Connection conn, String index) throws 
InterruptedException, SQLException {
+    private void waitForIndexToBeRebuilt(Connection conn, String index) throws 
InterruptedException, SQLException {
         boolean isActive = false;
         if (!transactional) {
-            int maxTries = 4, nTries = 0;
+            int maxTries = 12, nTries = 0;
             do {
-                Thread.sleep(15 * 1000); // sleep 15 secs
-                ResultSet rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schema), index,
-                        new String[] { PTableType.INDEX.toString() });
+                Thread.sleep(5 * 1000); // sleep 5 secs
+                String query = "SELECT CAST(" + 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " AS BIGINT) FROM " +
+                        PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE 
(" + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + 
PhoenixDatabaseMetaData.TABLE_NAME
+                        + ") = (" + "'" + schema + "','" + index + "') "
+                        + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " 
IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL";
+                ResultSet rs = conn.createStatement().executeQuery(query);
                 assertTrue(rs.next());
-                if 
(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) {
+                if (rs.getLong(1) == 0 && !rs.wasNull()) {
                     isActive = true;
                     break;
                 }
@@ -313,16 +337,16 @@ public class MutableIndexFailureIT extends BaseTest {
 
     }
 
-    private void validateDataWithIndex(Connection conn, String tableName, 
String indexName) throws SQLException {
-        String query = "SELECT /*+ INDEX(" + indexName + ") */ k,v1 FROM " + 
tableName;
+    private void validateDataWithIndex(Connection conn, String fullTableName, 
String fullIndexName, boolean localIndex) throws SQLException {
+        String query = "SELECT /*+ INDEX(" + fullTableName + " " + 
SchemaUtil.getTableNameFromFullName(fullIndexName) + ")  */ k,v1 FROM " + 
fullTableName;
         ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
         String expectedPlan = " OVER "
                 + (localIndex
                         ? Bytes.toString(
-                                
SchemaUtil.getPhysicalTableName(tableName.getBytes(), 
isNamespaceMapped).getName())
-                        : 
SchemaUtil.getPhysicalTableName(indexName.getBytes(), 
isNamespaceMapped).getNameAsString());
+                                
SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), 
isNamespaceMapped).getName())
+                        : 
SchemaUtil.getPhysicalTableName(fullIndexName.getBytes(), 
isNamespaceMapped).getNameAsString());
         String explainPlan = QueryUtil.getExplainPlan(rs);
-        assertTrue(explainPlan.contains(expectedPlan));
+        assertTrue(explainPlan, explainPlan.contains(expectedPlan));
         rs = conn.createStatement().executeQuery(query);
         if (transactional) { // failed commit does not get retried
             assertTrue(rs.next());
@@ -355,8 +379,26 @@ public class MutableIndexFailureIT extends BaseTest {
         }
     }
     
-    private void updateTable(Connection conn, String tableName) throws 
SQLException {
-        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " VALUES(?,?,?)");
+    private void replayMutations() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        for (int i = 0; i < exceptions.size(); i++) {
+            CommitException e = exceptions.get(i);
+            long ts = e.getServerTimestamp();
+            props.setProperty(PhoenixRuntime.REPLAY_AT_ATTRIB, 
Long.toString(ts));
+            try (Connection conn = DriverManager.getConnection(getUrl(), 
props)) {
+                if (i == 0) {
+                    updateTable(conn, false);
+                } else if (i == 1) {
+                    updateTableAgain(conn, false);
+                } else {
+                    fail();
+                }
+            }
+        }
+    }
+    
+    private void updateTable(Connection conn, boolean commitShouldFail) throws 
SQLException {
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
fullTableName + " VALUES(?,?,?)");
         // Insert new row
         stmt.setString(1, "d");
         stmt.setString(2, "d");
@@ -368,36 +410,92 @@ public class MutableIndexFailureIT extends BaseTest {
         stmt.setString(3, "2");
         stmt.execute();
         // Delete existing row
-        stmt = conn.prepareStatement("DELETE FROM " + tableName + " WHERE 
k=?");
+        stmt = conn.prepareStatement("DELETE FROM " + fullTableName + " WHERE 
k=?");
         stmt.setString(1, "b");
         stmt.execute();
         try {
             conn.commit();
-            fail();
-        } catch (SQLException e) {
-            System.out.println();
-        } catch (Exception e) {
-            System.out.println();
+            if (commitShouldFail) {
+                fail();
+            }
+        } catch (CommitException e) {
+            if (!commitShouldFail) {
+                throw e;
+            }
+            exceptions.add(e);
         }
 
     }
 
+    private void updateTableAgain(Connection conn, boolean commitShouldFail) 
throws SQLException {
+        // Verify UPSERT on data table still work after index is disabled
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
fullTableName + " VALUES(?,?,?)");
+        stmt.setString(1, "a3");
+        stmt.setString(2, "x3");
+        stmt.setString(3, "3");
+        stmt.execute();
+        try {
+            conn.commit();
+            if (commitShouldFail) {
+                fail();
+            }
+        } catch (CommitException e) {
+            if (!commitShouldFail) {
+                throw e;
+            }
+            exceptions.add(e);
+        }
+    }
+
     public static class FailingRegionObserver extends SimpleRegionObserver {
         public static volatile boolean FAIL_WRITE = false;
-        public static final String INDEX_NAME = "IDX";
+        public static final String FAIL_INDEX_NAME = "FAIL_IDX";
+        public static final String FAIL_TABLE_NAME = "FAIL_TABLE";
+
         @Override
         public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
-            if 
(c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME)
 && FAIL_WRITE) {
-                throw new DoNotRetryIOException();
-            }
-            Mutation operation = miniBatchOp.getOperation(0);
-            Set<byte[]> keySet = operation.getFamilyMap().keySet();
-            for(byte[] family: keySet) {
-                
if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)
 && FAIL_WRITE) {
-                    throw new DoNotRetryIOException();
+            boolean throwException = false;
+            if 
(c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" 
+ FAIL_INDEX_NAME)
+                    && FAIL_WRITE) {
+                throwException = true;
+            } else {
+                // When local index updates are atomic with data updates, 
testing a write failure to a local
+                // index won't make sense.
+                Mutation operation = miniBatchOp.getOperation(0);
+                if (FAIL_WRITE) {
+                    Map<byte[],List<Cell>>cellMap = 
operation.getFamilyCellMap();
+                    for (Map.Entry<byte[],List<Cell>> entry : 
cellMap.entrySet()) {
+                        byte[] family = entry.getKey();
+                        if 
(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX))
 {
+                            int regionStartKeyLen = 
c.getEnvironment().getRegionInfo().getStartKey().length;
+                            Cell firstCell = entry.getValue().get(0);
+                            short indexId = 
MetaDataUtil.getViewIndexIdDataType().getCodec().decodeShort(firstCell.getRowArray(),
 firstCell.getRowOffset() + regionStartKeyLen, SortOrder.getDefault());
+                            // Only throw for first local index as the test 
may have multiple local indexes
+                            if (indexId == Short.MIN_VALUE) {
+                                throwException = true;
+                                break;
+                            }
+                        }
+                    }
                 }
             }
+            if (throwException) {
+                dropIndex(c);
+                throw new DoNotRetryIOException();
+            }
         }
+
+         private void dropIndex(ObserverContext<RegionCoprocessorEnvironment> 
c) {
+             try {
+                 Connection connection =
+                         
QueryUtil.getConnection(c.getEnvironment().getConfiguration());
+                 connection.createStatement().execute(
+                        "DROP INDEX IF EXISTS " + "B_" + FAIL_INDEX_NAME + " 
ON "
+                             + fullTableName);
+             } catch (ClassNotFoundException e) {
+             } catch (SQLException e) {
+             }
+         }
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
deleted file mode 100644
index cf3cb29..0000000
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end.index;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.BaseOwnClusterIT;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.StringUtil;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.common.collect.Maps;
-/**
- * 
- * Test for failure of region server to write to index table.
- * For some reason dropping tables after running this test
- * fails unless it runs its own mini cluster. 
- * 
- * 
- * @since 2.1
- */
-
-@Category(NeedsOwnMiniClusterTest.class)
-@RunWith(Parameterized.class)
-public class ReadOnlyIndexFailureIT extends BaseOwnClusterIT {
-    public static volatile boolean FAIL_WRITE = false;
-    public static final String INDEX_NAME = "IDX";
-
-    private String tableName;
-    private String indexName;
-    private String fullTableName;
-    private String fullIndexName;
-    private final boolean localIndex;
-
-    public ReadOnlyIndexFailureIT(boolean localIndex) {
-        this.localIndex = localIndex;
-        this.tableName = (localIndex ? "L_" : "") + 
TestUtil.DEFAULT_DATA_TABLE_NAME;
-        this.indexName = INDEX_NAME;
-        this.fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-        this.fullIndexName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
-    }
-
-    @Parameters(name = "ReadOnlyIndexFailureIT_localIndex={0}") // name is 
used by failsafe as file name in reports
-    public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] { { false }, { true } });
-    }
-
-    @BeforeClass
-    public static void doSetup() throws Exception {
-        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
-        serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
-        serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
-        serverProps.put("hbase.client.pause", "5000");
-        serverProps.put("hbase.balancer.period", 
String.valueOf(Integer.MAX_VALUE));
-        serverProps.put(QueryServices.INDEX_FAILURE_BLOCK_WRITE, "true");
-        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, 
"true");
-        
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, 
"1000");
-        serverProps.put("hbase.coprocessor.region.classes", 
FailingRegionObserver.class.getName());
-        serverProps.put("hbase.coprocessor.abortonerror", "false");
-        serverProps.put(Indexer.CHECK_VERSION_CONF_KEY, "false");
-        NUM_SLAVES_BASE = 4;
-        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
-                ReadOnlyProps.EMPTY_PROPS);
-    }
-
-    @Test
-    public void testWriteFailureReadOnlyIndex() throws Exception {
-        helpTestWriteFailureReadOnlyIndex();
-    }
-
-    public void helpTestWriteFailureReadOnlyIndex() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        try (Connection conn = driver.connect(url, props)) {
-            String query;
-            ResultSet rs;
-            conn.setAutoCommit(false);
-            conn.createStatement().execute(
-                    "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL 
PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-            query = "SELECT * FROM " + fullTableName;
-            rs = conn.createStatement().executeQuery(query);
-            assertFalse(rs.next());
-
-            FAIL_WRITE = false;
-            if(localIndex) {
-                conn.createStatement().execute(
-                        "CREATE LOCAL INDEX " + indexName + " ON " + 
fullTableName 
-                        + " (v1) INCLUDE (v2)");
-            } else {
-                conn.createStatement().execute(
-                        "CREATE INDEX " + indexName + " ON " + fullTableName 
-                        + " (v1) INCLUDE (v2)");
-            }
-
-            query = "SELECT * FROM " + fullIndexName;
-            rs = conn.createStatement().executeQuery(query);
-            assertFalse(rs.next());
-
-            // Verify the metadata for index is correct.
-            rs = conn.getMetaData().getTables(null, 
-                    StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), 
indexName,
-                    new String[] { PTableType.INDEX.toString() });
-            assertTrue(rs.next());
-            assertEquals(indexName, rs.getString(3));
-            assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
-            assertFalse(rs.next());
-
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
fullTableName 
-                    + " VALUES(?,?,?)");
-            stmt.setString(1, "1");
-            stmt.setString(2, "aaa");
-            stmt.setString(3, "a1");
-            stmt.execute();
-            conn.commit();
-
-            FAIL_WRITE = true;
-            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " 
VALUES(?,?,?)");
-            stmt.setString(1, "2");
-            stmt.setString(2, "bbb");
-            stmt.setString(3, "b2");
-            stmt.execute();
-            try {
-                conn.commit();
-                fail();
-            } catch (SQLException e) {
-            }
-
-            // Only successfully committed row should be seen
-            query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName;
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals("aaa", rs.getString(1));
-            assertFalse(rs.next());
-            
-            // Verify the metadata for index is correct.
-            rs = conn.getMetaData().getTables(null, 
-                    StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), 
indexName,
-                    new String[] { PTableType.INDEX.toString() });
-            assertTrue(rs.next());
-            assertEquals(indexName, rs.getString(3));
-            // the index is always active for tables upon index table write 
failure
-            assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
-            assertFalse(rs.next());
-
-            // if the table is transactional the write to the index table will 
fail because the
-            // index has not been disabled
-            // Verify UPSERT on data table is blocked  after index write failed
-            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " 
VALUES(?,?,?)");
-            stmt.setString(1, "3");
-            stmt.setString(2, "ccc");
-            stmt.setString(3, "3c");
-            try {
-                stmt.execute();
-                /* Writes would be blocked */
-                conn.commit();
-                fail();
-            } catch (SQLException e) {
-                
assertEquals(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE.getErrorCode(), 
e.getErrorCode());
-            }
-
-            FAIL_WRITE = false;
-            // Second attempt at writing will succeed
-            int retries = 0;
-            do {
-                Thread.sleep(5 * 1000); // sleep 5 secs
-                if(!hasIndexDisableTimestamp(conn, indexName)){
-                    break;
-                }
-                if (++retries == 5) {
-                    fail("Failed to rebuild index with allowed time");
-                }
-            } while(true);
-
-            // Verify UPSERT on data table still work after index table is 
recreated
-            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " 
VALUES(?,?,?)");
-            stmt.setString(1, "4");
-            stmt.setString(2, "ddd");
-            stmt.setString(3, "4d");
-            stmt.execute();
-            conn.commit();
-
-            // verify index table has data
-            query = "SELECT count(1) FROM " + fullIndexName;
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals(3, rs.getInt(1));
-            
-            query = "SELECT /*+ INDEX(" + indexName + ") */ v1 FROM " + 
fullTableName;
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals("aaa", rs.getString(1));
-            assertTrue(rs.next());
-            assertEquals("bbb", rs.getString(1));
-            assertTrue(rs.next());
-            assertEquals("ddd", rs.getString(1));
-            assertFalse(rs.next());
-
-            query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName;
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals("aaa", rs.getString(1));
-            assertTrue(rs.next());
-            assertEquals("bbb", rs.getString(1));
-            assertTrue(rs.next());
-            assertEquals("ddd", rs.getString(1));
-            assertFalse(rs.next());
-        }
-    }
-    
-    private static boolean hasIndexDisableTimestamp(Connection conn, String 
indexName) throws SQLException {
-        ResultSet rs = conn.createStatement().executeQuery("SELECT " + 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP +
-                " FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + 
-                " WHERE " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL" +
-                " AND " + PhoenixDatabaseMetaData.TENANT_ID + " IS NULL" +
-                " AND " + PhoenixDatabaseMetaData.TABLE_SCHEM + " IS NULL" +
-                " AND " + PhoenixDatabaseMetaData.TABLE_NAME +  " = '" + 
indexName + "'");
-        assertTrue(rs.next());
-        long ts = rs.getLong(1);
-        return (!rs.wasNull() && ts > 0);
-    }
-
-    
-    public static class FailingRegionObserver extends SimpleRegionObserver {
-        @Override
-        public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
-            if 
(c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME)
 && FAIL_WRITE) {
-                throw new DoNotRetryIOException();
-            }
-            Mutation operation = miniBatchOp.getOperation(0);
-            Set<byte[]> keySet = operation.getFamilyMap().keySet();
-            for(byte[] family: keySet) {
-                
if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)
 && FAIL_WRITE) {
-                    throw new DoNotRetryIOException();
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index cee545a..fe9be6e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -79,6 +79,7 @@ import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
@@ -533,6 +534,10 @@ public class DeleteCompiler {
             } else if (runOnServer) {
                 // TODO: better abstraction
                 Scan scan = context.getScan();
+                // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations 
since there will be
+                // future dated data row mutations that will get in the way of 
generating the
+                // correct index rows on replay.
+                
scan.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
                 scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, 
QueryConstants.TRUE);
     
                 // Build an ungrouped aggregate query: select COUNT(*) from 
<table> where <where>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index bbbd483..e5307d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -713,6 +713,10 @@ public class UpsertCompiler {
                      */
                     final StatementContext context = queryPlan.getContext();
                     final Scan scan = context.getScan();
+                    // Propagate IGNORE_NEWER_MUTATIONS when replaying 
mutations since there will be
+                    // future dated data row mutations that will get in the 
way of generating the
+                    // correct index rows on replay.
+                    
scan.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
                     
scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE, 
UngroupedAggregateRegionObserver.serialize(projectedTable));
                     
scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, 
UngroupedAggregateRegionObserver.serialize(projectedExpressions));
                     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 800b8a1..a02f4bc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -3418,7 +3418,11 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     Cell newDisableTimeStampCell = 
newKVs.get(disableTimeStampKVIndex);
                     long newDisableTimeStamp = (Long) 
PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(),
                             newDisableTimeStampCell.getValueOffset(), 
newDisableTimeStampCell.getValueLength());
-                    if(curTimeStampVal > 0 && curTimeStampVal < 
newDisableTimeStamp){
+                    // We use the sign of the INDEX_DISABLE_TIMESTAMP to 
differentiate the keep-index-active (negative)
+                    // from block-writes-to-data-table case. In either case, 
we want to keep the oldest timestamp to
+                    // drive the partial index rebuild rather than update it 
with each attempt to update the index
+                    // when a new data table write occurs.
+                    if (curTimeStampVal != 0 && Math.abs(curTimeStampVal) < 
Math.abs(newDisableTimeStamp)) {
                         // not reset disable timestamp
                         newKVs.remove(disableTimeStampKVIndex);
                         disableTimeStampKVIndex = -1;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 9482d37..ce42de6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.coprocessor;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 import java.io.IOException;
-import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -74,7 +73,6 @@ import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PLong;
@@ -100,7 +98,6 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
     protected ScheduledThreadPoolExecutor executor = new 
ScheduledThreadPoolExecutor(1);
     private boolean enableRebuildIndex = 
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD;
     private long rebuildIndexTimeInterval = 
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL;
-    private boolean blockWriteRebuildIndex = false;
     private static Map<PName, Long> batchExecutedPerTableMap = new 
HashMap<PName, Long>();
 
     @Override
@@ -128,8 +125,6 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
             QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD);
         rebuildIndexTimeInterval = 
env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB,
             
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL);
-        blockWriteRebuildIndex = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
-               QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
         
     }
     
@@ -172,7 +167,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
         t.setDaemon(true);
         t.start();
 
-        if (!enableRebuildIndex && !blockWriteRebuildIndex) {
+        if (!enableRebuildIndex) {
             LOG.info("Failure Index Rebuild is skipped by configuration.");
             return;
         }
@@ -229,7 +224,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                 Scan scan = new Scan();
                 SingleColumnValueFilter filter = new 
SingleColumnValueFilter(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                     PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
-                    CompareFilter.CompareOp.GREATER, 
PLong.INSTANCE.toBytes(0L));
+                    CompareFilter.CompareOp.NOT_EQUAL, 
PLong.INSTANCE.toBytes(0L));
                 filter.setFilterIfMissing(true);
                 scan.setFilter(filter);
                 scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
@@ -240,10 +235,8 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                     PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
                 scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                     PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
-                PreparedStatement updateDisabledTimeStampSmt = null;
 
                 Map<PTable, List<PTable>> dataTableToIndexesMap = null;
-                MetaDataClient client = null;
                 boolean hasMore = false;
                 List<Cell> results = new ArrayList<Cell>();
                 scanner = this.env.getRegion().getScanner(scan);
@@ -259,17 +252,10 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                     byte[] indexState = 
r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                             PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
 
-                    if (disabledTimeStamp == null || disabledTimeStamp.length 
== 0 || (indexState != null
-                            && PIndexState.BUILDING == 
PIndexState.fromSerializedValue(Bytes.toString(indexState)))) {
-                        // Don't rebuild the building index , because they are 
marked for aysnc
+                    if (disabledTimeStamp == null || disabledTimeStamp.length 
== 0) {
                         continue;
                     }
 
-                    // disableTimeStamp has to be a positive value
-                    long disabledTimeStampVal = 
PLong.INSTANCE.getCodec().decodeLong(disabledTimeStamp, 0, 
SortOrder.getDefault());
-                    if (disabledTimeStampVal <= 0) {
-                        continue;
-                    }
                     byte[] dataTable = 
r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                         PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
                     if ((dataTable == null || dataTable.length == 0) || 
(indexState == null || indexState.length == 0)) {
@@ -302,7 +288,6 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                        // don't run a second index populations upsert select 
                         
props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); 
                         conn = QueryUtil.getConnectionOnServer(props, 
env.getConfiguration()).unwrap(PhoenixConnection.class);
-                        client = new MetaDataClient(conn);
                         dataTableToIndexesMap = Maps.newHashMap();
                     }
                     String dataTableFullName = 
SchemaUtil.getTableName(schemaName, dataTable);
@@ -331,7 +316,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                         dataTableToIndexesMap.put(dataPTable, 
indexesToPartiallyRebuild);
                     }
                     LOG.debug("We have found " + indexPTable.getIndexState() + 
" Index:" + indexPTable.getName()
-                            + " on data table:" + dataPTable.getName() + " 
which was disabled at "
+                            + " on data table:" + dataPTable.getName() + " 
which failed to be updated at "
                             + indexPTable.getIndexDisableTimestamp());
                     indexesToPartiallyRebuild.add(indexPTable);
                 } while (hasMore);
@@ -349,9 +334,22 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                                                        long 
earliestDisableTimestamp = Long.MAX_VALUE;
                                                        List<IndexMaintainer> 
maintainers = Lists
                                                                        
.newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
+                                                       int 
signOfDisableTimeStamp = 0;
                                                        for (PTable index : 
indexesToPartiallyRebuild) {
+                                                   // We need a way of 
differentiating the block writes to data table case from
+                                                   // the leave index active 
case. In either case, we need to know the time stamp
+                                                   // at which writes started 
failing so we can rebuild from that point. If we
+                                                   // keep the index active 
*and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES,
+                                                   // then writes to the data 
table will be blocked (this is client side logic
+                                                   // and we can't change this 
in a minor release). So we use the sign of the
+                                                   // time stamp to 
differentiate.
                                                                long 
disabledTimeStampVal = index.getIndexDisableTimestamp();
-                                                               if 
(disabledTimeStampVal > 0) {
+                                                               if 
(disabledTimeStampVal != 0) {
+                                    if (signOfDisableTimeStamp != 0 && 
signOfDisableTimeStamp != Long.signum(disabledTimeStampVal)) {
+                                        LOG.warn("Found unexpected mix of 
signs with INDEX_DISABLE_TIMESTAMP for " + dataPTable.getName().getString() + " 
with " + indexesToPartiallyRebuild); 
+                                    }
+                                                                   
signOfDisableTimeStamp = Long.signum(disabledTimeStampVal);
+                                       disabledTimeStampVal = 
Math.abs(disabledTimeStampVal);
                                                                        if 
(disabledTimeStampVal < earliestDisableTimestamp) {
                                                                                
earliestDisableTimestamp = disabledTimeStampVal;
                                                                        }
@@ -409,8 +407,8 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                                                                        
batchExecutedPerTableMap.remove(dataPTable.getName());
                                     LOG.info("Making Index:" + 
indexPTable.getTableName() + " active after rebuilding");
                                                                } else {
-
-                                                                       
updateDisableTimestamp(conn, indexTableFullName, env, scanEndTime, metaTable);
+                                                                   // Maintain 
sign of INDEX_DISABLE_TIMESTAMP (see comment above)
+                                                                       
updateDisableTimestamp(conn, indexTableFullName, env, scanEndTime * 
signOfDisableTimeStamp, metaTable);
                                                                        Long 
noOfBatches = batchExecutedPerTableMap.get(dataPTable.getName());
                                                                        if 
(noOfBatches == null) {
                                                                                
noOfBatches = 0l;
@@ -507,7 +505,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
                                PLong.INSTANCE.toBytes(disabledTimestamp));
                metaTable.checkAndPut(indexTableKey, 
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                               PhoenixDatabaseMetaData.INDEX_STATE_BYTES, 
CompareOp.EQUAL, PIndexState.INACTIVE.getSerializedBytes(),
+                               
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, CompareOp.NOT_EQUAL, 
PLong.INSTANCE.toBytes(0),
                                put);
 
        }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 49ef884..a056807 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -379,6 +379,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         
         RegionScanner theScanner = s;
         
+        boolean replayMutations = 
scan.getAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null;
         byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
         byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
         List<Expression> selectExpressions = null;
@@ -610,6 +611,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                             Cell firstKV = results.get(0);
                             Delete delete = new Delete(firstKV.getRowArray(),
                                 firstKV.getRowOffset(), 
firstKV.getRowLength(),ts);
+                            if (replayMutations) {
+                                delete.setAttribute(IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
+                            }
                             mutations.add(delete);
                             // force tephra to ignore this deletes
                             
delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
@@ -661,6 +665,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                 }
                             }
                             for (Mutation mutation : row.toRowMutations()) {
+                                if (replayMutations) {
+                                    
mutation.setAttribute(IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
+                                }
                                 mutations.add(mutation);
                             }
                             for (i = 0; i < selectExpressions.size(); i++) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 2836c45..35ba187 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -182,6 +182,8 @@ public enum SQLExceptionCode {
      ROWTIMESTAMP_COL_INVALID_TYPE(530, "42907", "A column can be added as 
ROW_TIMESTAMP only if it is of type DATE, BIGINT, TIME OR TIMESTAMP."),
      ROWTIMESTAMP_NOT_ALLOWED_ON_VIEW(531, "42908", "Declaring a column as 
row_timestamp is not allowed for views."),
      INVALID_SCN(532, "42909", "Value of SCN cannot be less than zero."),
+     INVALID_REPLAY_AT(533, "42910", "Value of REPLAY_AT cannot be less than 
zero."),
+     UNEQUAL_SCN_AND_REPLAY_AT(534, "42911", "If both specified, values of 
CURRENT_SCN and REPLAY_AT must be equal."),
      /**
      * HBase and Phoenix specific implementation defined sub-classes.
      * Column family related exceptions.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
index a9d8311..b0d22d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
@@ -24,10 +24,16 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 public class CommitException extends SQLException {
     private static final long serialVersionUID = 2L;
     private final int[] uncommittedStatementIndexes;
+    private final long serverTimestamp;
 
-    public CommitException(Exception e, int[] uncommittedStatementIndexes) {
+    public CommitException(Exception e, int[] uncommittedStatementIndexes, 
long serverTimestamp) {
         super(e);
         this.uncommittedStatementIndexes = uncommittedStatementIndexes;
+        this.serverTimestamp = serverTimestamp;
+    }
+    
+    public long getServerTimestamp() {
+        return this.serverTimestamp;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index d32199b..6144c7f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -82,6 +82,7 @@ import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ByteUtil;
@@ -673,6 +674,14 @@ public class MutationState implements SQLCloseable {
                 rowMutationsPertainingToIndex = rowMutations;
             }
             mutationList.addAll(rowMutations);
+            if (connection.isReplayMutations()) {
+                // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations 
since there will be
+                // future dated data row mutations that will get in the way of 
generating the
+                // correct index rows on replay.
+                for (Mutation mutation : rowMutations) {
+                    
mutation.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
+                }
+            }
             if (mutationsPertainingToIndex != null) mutationsPertainingToIndex
                     .addAll(rowMutationsPertainingToIndex);
         }
@@ -1030,6 +1039,7 @@ public class MutationState implements SQLCloseable {
                     joinMutationState(new TableRef(tableRef), valuesMap, 
txMutations);
                 }
             }
+            long serverTimestamp = HConstants.LATEST_TIMESTAMP;
             Iterator<Entry<TableInfo, List<Mutation>>> mutationsIterator = 
physicalTableMutationMap.entrySet().iterator();
             while (mutationsIterator.hasNext()) {
                 Entry<TableInfo, List<Mutation>> pair = 
mutationsIterator.next();
@@ -1106,6 +1116,7 @@ public class MutationState implements SQLCloseable {
                         // Remove batches as we process them
                         mutations.remove(origTableRef);
                     } catch (Exception e) {
+                        serverTimestamp = ServerUtil.parseServerTimestamp(e);
                         SQLException inferredE = 
ServerUtil.parseServerExceptionOrNull(e);
                         if (inferredE != null) {
                             if (shouldRetry && retryCount == 0 && 
inferredE.getErrorCode() == 
SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
@@ -1127,7 +1138,7 @@ public class MutationState implements SQLCloseable {
                         }
                         // Throw to client an exception that indicates the 
statements that
                         // were not committed successfully.
-                        sqlE = new CommitException(e, 
getUncommittedStatementIndexes());
+                        sqlE = new CommitException(e, 
getUncommittedStatementIndexes(), serverTimestamp);
                     } finally {
                         try {
                             if (cache!=null) 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
index 831aa16..a037e92 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
@@ -61,6 +61,10 @@ public class IndexWriter implements Stoppable {
     this(getCommitter(env), getFailurePolicy(env), env, name);
   }
 
+  public IndexWriter(IndexFailurePolicy failurePolicy, 
RegionCoprocessorEnvironment env, String name) throws IOException {
+      this(getCommitter(env), failurePolicy, env, name);
+    }
+
   public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env) 
throws IOException {
     Configuration conf = env.getConfiguration();
     try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java
new file mode 100644
index 0000000..edacd3a
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.util.ServerUtil;
+
+import com.google.common.collect.Multimap;
+
+/**
+ * 
+ * Implementation of IndexFailurePolicy which takes no action when an
+ * index cannot be updated. As with the standard flow of control, an
+ * exception will still be thrown back to the client. Using this failure
+ * policy means that the action to take upon failure is completely up
+ * to the client.
+ *
+ */
+public class LeaveIndexActiveFailurePolicy implements IndexFailurePolicy {
+
+    @Override
+    public boolean isStopped() {
+        return false;
+    }
+
+    @Override
+    public void stop(String arg0) {
+    }
+
+    @Override
+    public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
+    }
+
+    @Override
+    public void handleFailure(Multimap<HTableInterfaceReference, Mutation> 
attempted, Exception cause)
+            throws IOException {
+        // get timestamp of first cell
+        long ts = 
attempted.values().iterator().next().getFamilyCellMap().values().iterator().next().get(0).getTimestamp();
+        throw ServerUtil.wrapInDoNotRetryIOException("Unable to update the 
following indexes: " + attempted.keySet(), cause, ts);
+    }
+
+}

Reply via email to