PHOENIX-3811 Do not disable index on write failure by default

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cd5ab4fb
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cd5ab4fb
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cd5ab4fb

Branch: refs/heads/4.x-HBase-0.98
Commit: cd5ab4fbc13a1db743aa89a0008d60035219632c
Parents: 4beb182
Author: James Taylor <[email protected]>
Authored: Wed May 10 09:52:23 2017 -0700
Committer: James Taylor <[email protected]>
Committed: Thu May 11 18:05:24 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/AutomaticRebuildIT.java     | 221 ----------
 .../end2end/IndexToolForPartialBuildIT.java     |  15 +-
 ...olForPartialBuildWithNamespaceEnabledIT.java |  15 +-
 .../end2end/index/MutableIndexFailureIT.java    | 257 +++++++----
 .../end2end/index/ReadOnlyIndexFailureIT.java   | 291 -------------
 .../apache/phoenix/compile/DeleteCompiler.java  |   5 +
 .../apache/phoenix/compile/UpsertCompiler.java  |   4 +
 .../coprocessor/MetaDataEndpointImpl.java       |   6 +-
 .../coprocessor/MetaDataRegionObserver.java     | 422 ++++++++++---------
 .../UngroupedAggregateRegionObserver.java       |   7 +
 .../phoenix/exception/SQLExceptionCode.java     |   2 +
 .../apache/phoenix/execute/CommitException.java |   8 +-
 .../apache/phoenix/execute/MutationState.java   |  13 +-
 .../phoenix/hbase/index/write/IndexWriter.java  |   4 +
 .../write/LeaveIndexActiveFailurePolicy.java    |  62 +++
 .../index/PhoenixIndexFailurePolicy.java        |  82 +++-
 .../index/PhoenixTransactionalIndexer.java      |   5 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  44 +-
 .../phoenix/mapreduce/index/IndexTool.java      |  12 +-
 .../query/ConnectionQueryServicesImpl.java      |   1 +
 .../org/apache/phoenix/query/QueryServices.java |   3 +-
 .../phoenix/query/QueryServicesOptions.java     |   3 +-
 .../apache/phoenix/schema/MetaDataClient.java   |   6 +
 .../java/org/apache/phoenix/util/IndexUtil.java |  12 +-
 .../java/org/apache/phoenix/util/JDBCUtil.java  |   5 +
 .../org/apache/phoenix/util/PhoenixRuntime.java |  14 +
 .../org/apache/phoenix/util/ServerUtil.java     |  37 ++
 .../hbase/index/write/TestIndexWriter.java      |   6 +
 28 files changed, 698 insertions(+), 864 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
deleted file mode 100644
index 25cab35..0000000
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.StringUtil;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.common.collect.Maps;
-
-/**
- * Tests for the {@link AutomaticRebuildIT}
- */
-@RunWith(Parameterized.class)
-public class AutomaticRebuildIT extends BaseOwnClusterIT {
-
-       private final boolean localIndex;
-       protected boolean isNamespaceEnabled = false;
-       protected final String tableDDLOptions;
-
-       public AutomaticRebuildIT(boolean localIndex) {
-               this.localIndex = localIndex;
-               StringBuilder optionBuilder = new StringBuilder();
-               optionBuilder.append(" SPLIT ON(1,2)");
-               this.tableDDLOptions = optionBuilder.toString();
-       }
-
-       @BeforeClass
-       public static void doSetup() throws Exception {
-               Map<String, String> serverProps = 
Maps.newHashMapWithExpectedSize(7);
-               serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
-               serverProps.put("hbase.coprocessor.region.classes", 
FailingRegionObserver.class.getName());
-               serverProps.put(" 
yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
-               serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
-               serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
-               serverProps.put("hbase.client.pause", "5000");
-               
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, "1000");
-               
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE,
 "5");
-               Map<String, String> clientProps = 
Maps.newHashMapWithExpectedSize(1);
-               setUpTestDriver(new 
ReadOnlyProps(serverProps.entrySet().iterator()),
-                               new 
ReadOnlyProps(clientProps.entrySet().iterator()));
-       }
-
-       @Parameters(name = "localIndex = {0}")
-       public static Collection<Boolean[]> data() {
-               return Arrays.asList(new Boolean[][] { { false }, { true } });
-       }
-
-       @Test
-       public void testSecondaryAutomaticRebuildIndex() throws Exception {
-               String schemaName = generateUniqueName();
-               String dataTableName = generateUniqueName();
-               String fullTableName = SchemaUtil.getTableName(schemaName, 
dataTableName);
-               final String indxTable = String.format("%s_%s", dataTableName, 
FailingRegionObserver.INDEX_NAME);
-               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-               props.setProperty(QueryServices.TRANSACTIONS_ENABLED, 
Boolean.TRUE.toString());
-               props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, 
Boolean.FALSE.toString());
-               props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, 
Boolean.toString(isNamespaceEnabled));
-               final Connection conn = DriverManager.getConnection(getUrl(), 
props);
-               Statement stmt = conn.createStatement();
-               try {
-                       if (isNamespaceEnabled) {
-                               conn.createStatement().execute("CREATE SCHEMA 
IF NOT EXISTS " + schemaName);
-                       }
-                       stmt.execute(String.format(
-                                       "CREATE TABLE %s (ID BIGINT NOT NULL, 
NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID ROW_TIMESTAMP)) %s",
-                                       fullTableName, tableDDLOptions));
-                       String upsertQuery = String.format("UPSERT INTO %s 
VALUES(?, ?, ?)", fullTableName);
-                       PreparedStatement stmt1 = 
conn.prepareStatement(upsertQuery);
-                       FailingRegionObserver.FAIL_WRITE = false;
-                       // insert two rows
-                       upsertRow(stmt1, 1000);
-                       upsertRow(stmt1, 2000);
-
-                       conn.commit();
-                       stmt.execute(String.format("CREATE %s INDEX %s ON %s  
(LPAD(UPPER(NAME),11,'x')||'_xyz') ",
-                                       (localIndex ? "LOCAL" : ""), indxTable, 
fullTableName));
-                       FailingRegionObserver.FAIL_WRITE = true;
-                       upsertRow(stmt1, 3000);
-                       upsertRow(stmt1, 4000);
-                       upsertRow(stmt1, 5000);
-                       try {
-                               conn.commit();
-                               fail();
-                       } catch (SQLException e) {
-                       } catch (Exception e) {
-                       }
-                       FailingRegionObserver.FAIL_WRITE = false;
-                       ResultSet rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schemaName), indxTable,
-                                       new String[] { 
PTableType.INDEX.toString() });
-                       assertTrue(rs.next());
-                       assertEquals(indxTable, rs.getString(3));
-                       String indexState = rs.getString("INDEX_STATE");
-                       assertEquals(PIndexState.DISABLE.toString(), 
indexState);
-                       assertFalse(rs.next());
-                       upsertRow(stmt1, 6000);
-                       upsertRow(stmt1, 7000);
-                       conn.commit();
-                       int maxTries = 4, nTries = 0;
-                       boolean isInactive = false;
-                       do {
-                               rs = conn.createStatement()
-                                               
.executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.INDEX_STATE + 
","
-                                                               + 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM "
-                                                               +"\""+ 
SYSTEM_CATALOG_SCHEMA + "\"." + SYSTEM_CATALOG_TABLE + " ("
-                                                               + 
PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where "
-                                                               + 
PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and "
-                                                               + 
PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'"));
-                               rs.next();
-                               if 
(PIndexState.INACTIVE.getSerializedValue().equals(rs.getString(1)) && 
rs.getLong(2) > 3000) {
-                                       isInactive = true;
-                                       break;
-                               }
-                               Thread.sleep(10 * 1000); // sleep 10 secs
-                       } while (++nTries < maxTries);
-                       assertTrue(isInactive);
-                       nTries = 0;
-                       boolean isActive = false;
-                       do {
-                               Thread.sleep(15 * 1000); // sleep 15 secs
-                               rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schemaName), indxTable,
-                                               new String[] { 
PTableType.INDEX.toString() });
-                               assertTrue(rs.next());
-                               if 
(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) {
-                                       isActive = true;
-                                       break;
-                               }
-                       } while (++nTries < maxTries);
-                       assertTrue(isActive);
-
-               } finally {
-                       conn.close();
-               }
-       }
-
-       public static void upsertRow(PreparedStatement stmt, int i) throws 
SQLException {
-               // insert row
-               stmt.setInt(1, i);
-               stmt.setString(2, "uname" + String.valueOf(i));
-               stmt.setInt(3, 95050 + i);
-               stmt.executeUpdate();
-       }
-
-       public static class FailingRegionObserver extends SimpleRegionObserver {
-               public static volatile boolean FAIL_WRITE = false;
-               public static final String INDEX_NAME = "IDX";
-
-               @Override
-               public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
-                               MiniBatchOperationInProgress<Mutation> 
miniBatchOp) throws HBaseIOException {
-                       if 
(c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME)
 && FAIL_WRITE) {
-                               throw new DoNotRetryIOException();
-                       }
-                       Mutation operation = miniBatchOp.getOperation(0);
-                       Set<byte[]> keySet = operation.getFamilyMap().keySet();
-                       for (byte[] family : keySet) {
-                               if 
(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)
 && FAIL_WRITE) {
-                                       throw new DoNotRetryIOException();
-                               }
-                       }
-               }
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
index 599e601..59a9106 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
@@ -89,9 +89,8 @@ public class IndexToolForPartialBuildIT extends 
BaseOwnClusterIT {
         this.tableDDLOptions = optionBuilder.toString();
     }
     
-    @BeforeClass
-    public static void doSetup() throws Exception {
-        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
+    public static Map<String, String> getServerProperties() {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
         serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
         serverProps.put("hbase.coprocessor.region.classes", 
FailingRegionObserver.class.getName());
         serverProps.put(" 
yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
@@ -99,8 +98,14 @@ public class IndexToolForPartialBuildIT extends 
BaseOwnClusterIT {
         serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
         serverProps.put("hbase.client.pause", "5000");
         serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, 
Boolean.FALSE.toString());
-        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
-        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
+        serverProps.put(QueryServices.INDEX_FAILURE_DISABLE_INDEX, 
Boolean.TRUE.toString());
+        return serverProps;
+    }
+    
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = getServerProperties();
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
ReadOnlyProps.EMPTY_PROPS);
     }
     
     @Parameters(name="localIndex = {0}")

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
index 4b2371c..a8c1f1e 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
@@ -21,13 +21,9 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.BeforeClass;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Maps;
@@ -35,7 +31,6 @@ import com.google.common.collect.Maps;
 /**
  * Tests for the {@link IndexToolForPartialBuildWithNamespaceEnabled}
  */
-@RunWith(Parameterized.class)
 public class IndexToolForPartialBuildWithNamespaceEnabledIT extends 
IndexToolForPartialBuildIT {
     
     
@@ -45,15 +40,9 @@ public class IndexToolForPartialBuildWithNamespaceEnabledIT 
extends IndexToolFor
     }
     
     @BeforeClass
+    @Shadower(classBeingShadowed = IndexToolForPartialBuildIT.class)
     public static void doSetup() throws Exception {
-        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
-        serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
-        serverProps.put("hbase.coprocessor.region.classes", 
FailingRegionObserver.class.getName());
-        serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
-        serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
-        serverProps.put("hbase.client.pause", "5000");
-        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, 
"2000");
-        
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, 
"1000");
+        Map<String, String> serverProps = getServerProperties();
         serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
         Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
         clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 7f289bf..d9dca1e 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -24,16 +24,17 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.sql.Connection;
+import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
@@ -44,26 +45,33 @@ import 
org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.execute.CommitException;
+import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
-import org.apache.phoenix.util.TestUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 /**
  * 
@@ -73,10 +81,12 @@ import com.google.common.collect.Maps;
  * 
  */
 
+@Ignore("Not working with HBase 0.98")
 @Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
 public class MutableIndexFailureIT extends BaseTest {
     public static final String INDEX_NAME = "IDX";
+    public static final String TABLE_NAME = "T";
 
     public static volatile boolean FAIL_WRITE = false;
     public static volatile String fullTableName;
@@ -89,23 +99,29 @@ public class MutableIndexFailureIT extends BaseTest {
     private final boolean localIndex;
     private final String tableDDLOptions;
     private final boolean isNamespaceMapped;
+    private final boolean leaveIndexActiveOnFailure;
+    private final boolean rebuildIndexOnWriteFailure;
     private String schema = generateUniqueName();
+    private List<CommitException> exceptions = Lists.newArrayList();
 
     @AfterClass
     public static void doTeardown() throws Exception {
         tearDownMiniCluster();
     }
 
-    public MutableIndexFailureIT(boolean transactional, boolean localIndex, 
boolean isNamespaceMapped) {
+    public MutableIndexFailureIT(boolean transactional, boolean localIndex, 
boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, Boolean 
rebuildIndexOnWriteFailure) {
         this.transactional = transactional;
         this.localIndex = localIndex;
-        this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", 
TRANSACTIONAL=true " : "");
-        this.tableName = (localIndex ? "L_" : "") + 
TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "")
-                + (isNamespaceMapped ? "_NM" : "");
-        this.indexName = FailingRegionObserver.INDEX_NAME;
+        this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", 
TRANSACTIONAL=true " : "") 
+                + (disableIndexOnWriteFailure == null ? "" : (", " + 
PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE + "=" + 
disableIndexOnWriteFailure))
+                + (rebuildIndexOnWriteFailure == null ? "" : (", " + 
PhoenixIndexFailurePolicy.REBUILD_INDEX_ON_WRITE_FAILURE + "=" + 
rebuildIndexOnWriteFailure));
+        this.tableName = FailingRegionObserver.FAIL_TABLE_NAME;
+        this.indexName = "A_" + FailingRegionObserver.FAIL_INDEX_NAME;
         fullTableName = SchemaUtil.getTableName(schema, tableName);
         this.fullIndexName = SchemaUtil.getTableName(schema, indexName);
         this.isNamespaceMapped = isNamespaceMapped;
+        this.leaveIndexActiveOnFailure = ! (disableIndexOnWriteFailure == null 
? QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX : 
disableIndexOnWriteFailure);
+        this.rebuildIndexOnWriteFailure = 
Boolean.TRUE.equals(rebuildIndexOnWriteFailure);
     }
 
     @BeforeClass
@@ -117,16 +133,30 @@ public class MutableIndexFailureIT extends BaseTest {
         serverProps.put("hbase.client.pause", "5000");
         serverProps.put("data.tx.snapshot.dir", "/tmp");
         serverProps.put("hbase.balancer.period", 
String.valueOf(Integer.MAX_VALUE));
-        Map<String, String> clientProps = 
Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, "true");
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, 
Boolean.TRUE.toString());
+        
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, 
"4000");
+        Map<String, String> clientProps = 
Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, 
Boolean.TRUE.toString());
         NUM_SLAVES_BASE = 4;
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
 
-    @Parameters(name = 
"MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2}") 
// name is used by failsafe as file name in reports
-    public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] { { false, false, true }, { 
false, false, false }, { false, true, true },
-                { false, true, false }, { true, false, true }, { true, true, 
true }, { true, false, false },
-                { true, true, false } });
+    @Parameters(name = 
"MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},rebuildIndexOnWriteFailure={4}")
 // name is used by failsafe as file name in reports
+    public static List<Object[]> data() {
+        return Arrays.asList(new Object[][] { 
+                { false, false, true, true, true }, 
+                { false, false, false, true, true }, 
+                { true, false, false, true, true }, 
+                { true, false, true, true, true },
+                { false, true, true, true, true }, 
+                { false, true, false, true, true }, 
+                { true, true, false, true, true }, 
+                { true, true, true, true, true },
+
+                { false, false, false, null, true }, 
+                { false, true, false, false, true }, 
+                { false, false, false, false, null }, 
+        } 
+        );
     }
 
     @Test
@@ -135,9 +165,9 @@ public class MutableIndexFailureIT extends BaseTest {
     }
 
     public void helpTestWriteFailureDisablesIndex() throws Exception {
-        String secondTableName = fullTableName + "_2";
-        String secondIndexName = indexName + "_2";
-        String secondFullIndexName = fullIndexName + "_2";
+        String secondIndexName = "B_" + FailingRegionObserver.FAIL_INDEX_NAME;
+//        String thirdIndexName = "C_" + INDEX_NAME;
+//        String thirdFullIndexName = SchemaUtil.getTableName(schema, 
thirdIndexName);
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, 
String.valueOf(isNamespaceMapped));
         try (Connection conn = driver.connect(url, props)) {
@@ -149,29 +179,26 @@ public class MutableIndexFailureIT extends BaseTest {
             }
             conn.createStatement().execute("CREATE TABLE " + fullTableName
                     + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 
VARCHAR) " + tableDDLOptions);
-            conn.createStatement().execute("CREATE TABLE " + secondTableName
-                    + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 
VARCHAR) " + tableDDLOptions);
             query = "SELECT * FROM " + fullTableName;
             rs = conn.createStatement().executeQuery(query);
             assertFalse(rs.next());
 
             FailingRegionObserver.FAIL_WRITE = false;
             conn.createStatement().execute(
-                    "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + 
indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
+                    "CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + 
indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
             // Create other index which should be local/global if the other 
index is global/local to
             // check the drop index.
             conn.createStatement().execute(
-                "CREATE INDEX " + indexName + "_3" + " ON "
-                        + fullTableName + " (v2) INCLUDE (v1)");
-            conn.createStatement().execute(
-                    "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + 
secondIndexName + " ON " + secondTableName + " (v1) INCLUDE (v2)");
+                    "CREATE "  + (!localIndex ? "LOCAL " : "") + " INDEX " + 
secondIndexName + " ON " + fullTableName + " (v2) INCLUDE (v1)");
+//            conn.createStatement().execute(
+//                    "CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + 
thirdIndexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
 
             query = "SELECT * FROM " + fullIndexName;
             rs = conn.createStatement().executeQuery(query);
             assertFalse(rs.next());
 
             // Verify the metadata for index is correct.
-            rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schema), indexName+"%",
+            rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schema), null,
                     new String[] { PTableType.INDEX.toString() });
             assertTrue(rs.next());
             assertEquals(indexName, rs.getString(3));
@@ -179,11 +206,10 @@ public class MutableIndexFailureIT extends BaseTest {
             assertTrue(rs.next());
             assertEquals(secondIndexName, rs.getString(3));
             assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
-            assertTrue(rs.next());
-            assertEquals(indexName+"_3", rs.getString(3));
-            assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
+//            assertTrue(rs.next());
+//            assertEquals(thirdIndexName, rs.getString(3));
+//            assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
             initializeTable(conn, fullTableName);
-            initializeTable(conn, secondTableName);
             
             query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
             rs = conn.createStatement().executeQuery("EXPLAIN " + query);
@@ -203,15 +229,14 @@ public class MutableIndexFailureIT extends BaseTest {
             assertFalse(rs.next());
 
             FailingRegionObserver.FAIL_WRITE = true;
-            updateTable(conn, fullTableName);
-            updateTable(conn, secondTableName);
+            updateTable(conn, true);
             // Verify the metadata for index is correct.
-            rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schema), indexName,
+            rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schema), StringUtil.escapeLike(indexName),
                     new String[] { PTableType.INDEX.toString() });
             assertTrue(rs.next());
             assertEquals(indexName, rs.getString(3));
             // the index is only disabled for non-txn tables upon index table 
write failure
-            if (transactional) {
+            if (transactional || leaveIndexActiveOnFailure) {
                 assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
             } else {
                 String indexState = rs.getString("INDEX_STATE");
@@ -223,19 +248,7 @@ public class MutableIndexFailureIT extends BaseTest {
             // in an all or none manner. If the table is not transactional, 
then the data writes
             // would have succeeded while the index writes would have failed.
             if (!transactional) {
-                // Verify UPSERT on data table still work after index is 
disabled
-                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " 
+ fullTableName + " VALUES(?,?,?)");
-                stmt.setString(1, "a3");
-                stmt.setString(2, "x3");
-                stmt.setString(3, "3");
-                stmt.execute();
-                conn.commit();
-                stmt = conn.prepareStatement("UPSERT INTO " + secondTableName 
+ " VALUES(?,?,?)");
-                stmt.setString(1, "a3");
-                stmt.setString(2, "x3");
-                stmt.setString(3, "3");
-                stmt.execute();
-                conn.commit();
+                updateTableAgain(conn, leaveIndexActiveOnFailure);
                 // Verify previous writes succeeded to data table
                 query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
                 rs = conn.createStatement().executeQuery("EXPLAIN " + query);
@@ -257,14 +270,20 @@ public class MutableIndexFailureIT extends BaseTest {
                 assertEquals("d", rs.getString(2));
                 assertFalse(rs.next());
             }
+            // Comment back in when PHOENIX-3815 is fixed
+//            validateDataWithIndex(conn, fullTableName, thirdFullIndexName, 
false);
 
             // re-enable index table
             FailingRegionObserver.FAIL_WRITE = false;
-            waitForIndexToBeActive(conn,indexName);
-            waitForIndexToBeActive(conn,indexName+"_2");
-            waitForIndexToBeActive(conn,secondIndexName);
+            if (rebuildIndexOnWriteFailure) {
+                // wait for index to be rebuilt automatically
+                waitForIndexToBeRebuilt(conn,indexName);
+            } else {
+                // simulate replaying failed mutation
+                replayMutations();
+            }
 
-            // Verify UPSERT on data table still work after index table is 
recreated
+            // Verify UPSERT on data table still works after index table is 
recreated
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
fullTableName + " VALUES(?,?,?)");
             stmt.setString(1, "a3");
             stmt.setString(2, "x4");
@@ -272,33 +291,26 @@ public class MutableIndexFailureIT extends BaseTest {
             stmt.execute();
             conn.commit();
             
-            stmt = conn.prepareStatement("UPSERT INTO " + secondTableName + " 
VALUES(?,?,?)");
-            stmt.setString(1, "a3");
-            stmt.setString(2, "x4");
-            stmt.setString(3, "4");
-            stmt.execute();
-            conn.commit();
-            // To clear the index name from connection.
-            PhoenixConnection phoenixConn = 
conn.unwrap(PhoenixConnection.class);
-            phoenixConn.getMetaDataCache().removeTable(null, fullTableName, 
null, HConstants.LATEST_TIMESTAMP);
-            // verify index table has correct data
-            validateDataWithIndex(conn, fullTableName, fullIndexName);
-            validateDataWithIndex(conn, secondTableName, secondFullIndexName);
+            // verify index table has correct data (note that second index has 
been dropped)
+            validateDataWithIndex(conn, fullTableName, fullIndexName, 
localIndex);
         } finally {
             FAIL_WRITE = false;
         }
     }
 
-    private void waitForIndexToBeActive(Connection conn, String index) throws 
InterruptedException, SQLException {
+    private void waitForIndexToBeRebuilt(Connection conn, String index) throws 
InterruptedException, SQLException {
         boolean isActive = false;
         if (!transactional) {
-            int maxTries = 4, nTries = 0;
+            int maxTries = 12, nTries = 0;
             do {
-                Thread.sleep(15 * 1000); // sleep 15 secs
-                ResultSet rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schema), index,
-                        new String[] { PTableType.INDEX.toString() });
+                Thread.sleep(5 * 1000); // sleep 5 secs
+                String query = "SELECT CAST(" + 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " AS BIGINT) FROM " +
+                        PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE 
(" + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + 
PhoenixDatabaseMetaData.TABLE_NAME
+                        + ") = (" + "'" + schema + "','" + index + "') "
+                        + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " 
IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL";
+                ResultSet rs = conn.createStatement().executeQuery(query);
                 assertTrue(rs.next());
-                if 
(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) {
+                if (rs.getLong(1) == 0 && !rs.wasNull()) {
                     isActive = true;
                     break;
                 }
@@ -325,14 +337,14 @@ public class MutableIndexFailureIT extends BaseTest {
 
     }
 
-    private void validateDataWithIndex(Connection conn, String tableName, 
String indexName) throws SQLException {
-        String query = "SELECT /*+ INDEX(" + indexName + ")  */ k,v1 FROM " + 
tableName;
+    private void validateDataWithIndex(Connection conn, String fullTableName, 
String fullIndexName, boolean localIndex) throws SQLException {
+        String query = "SELECT /*+ INDEX(" + fullTableName + " " + 
SchemaUtil.getTableNameFromFullName(fullIndexName) + ")  */ k,v1 FROM " + 
fullTableName;
         ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
         String expectedPlan = " OVER "
                 + (localIndex
                         ? Bytes.toString(
-                                
SchemaUtil.getPhysicalTableName(tableName.getBytes(), 
isNamespaceMapped).getName())
-                        : 
SchemaUtil.getPhysicalTableName(indexName.getBytes(), 
isNamespaceMapped).getNameAsString());
+                                
SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), 
isNamespaceMapped).getName())
+                        : 
SchemaUtil.getPhysicalTableName(fullIndexName.getBytes(), 
isNamespaceMapped).getNameAsString());
         String explainPlan = QueryUtil.getExplainPlan(rs);
         assertTrue(explainPlan, explainPlan.contains(expectedPlan));
         rs = conn.createStatement().executeQuery(query);
@@ -367,8 +379,26 @@ public class MutableIndexFailureIT extends BaseTest {
         }
     }
     
-    private void updateTable(Connection conn, String tableName) throws 
SQLException {
-        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " VALUES(?,?,?)");
+    private void replayMutations() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        for (int i = 0; i < exceptions.size(); i++) {
+            CommitException e = exceptions.get(i);
+            long ts = e.getServerTimestamp();
+            props.setProperty(PhoenixRuntime.REPLAY_AT_ATTRIB, 
Long.toString(ts));
+            try (Connection conn = DriverManager.getConnection(getUrl(), 
props)) {
+                if (i == 0) {
+                    updateTable(conn, false);
+                } else if (i == 1) {
+                    updateTableAgain(conn, false);
+                } else {
+                    fail();
+                }
+            }
+        }
+    }
+    
+    private void updateTable(Connection conn, boolean commitShouldFail) throws 
SQLException {
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
fullTableName + " VALUES(?,?,?)");
         // Insert new row
         stmt.setString(1, "d");
         stmt.setString(2, "d");
@@ -380,36 +410,79 @@ public class MutableIndexFailureIT extends BaseTest {
         stmt.setString(3, "2");
         stmt.execute();
         // Delete existing row
-        stmt = conn.prepareStatement("DELETE FROM " + tableName + " WHERE 
k=?");
+        stmt = conn.prepareStatement("DELETE FROM " + fullTableName + " WHERE 
k=?");
         stmt.setString(1, "b");
         stmt.execute();
         try {
             conn.commit();
-            fail();
-        } catch (SQLException e) {
-        } catch (Exception e) {
+            if (commitShouldFail) {
+                fail();
+            }
+        } catch (CommitException e) {
+            if (!commitShouldFail) {
+                throw e;
+            }
+            exceptions.add(e);
         }
 
     }
 
+    private void updateTableAgain(Connection conn, boolean commitShouldFail) 
throws SQLException {
+        // Verify UPSERT on data table still work after index is disabled
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
fullTableName + " VALUES(?,?,?)");
+        stmt.setString(1, "a3");
+        stmt.setString(2, "x3");
+        stmt.setString(3, "3");
+        stmt.execute();
+        try {
+            conn.commit();
+            if (commitShouldFail) {
+                fail();
+            }
+        } catch (CommitException e) {
+            if (!commitShouldFail) {
+                throw e;
+            }
+            exceptions.add(e);
+        }
+    }
+
     public static class FailingRegionObserver extends SimpleRegionObserver {
         public static volatile boolean FAIL_WRITE = false;
-        public static final String INDEX_NAME = "IDX";
+        public static final String FAIL_INDEX_NAME = "FAIL_IDX";
+        public static final String FAIL_TABLE_NAME = "FAIL_TABLE";
+
         @Override
         public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
-            String tableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
-            if (tableName.contains(INDEX_NAME) && 
!tableName.contains(INDEX_NAME+"_3") && FAIL_WRITE) {
+            boolean throwException = false;
+            if 
(c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" 
+ FAIL_INDEX_NAME)
+                    && FAIL_WRITE) {
+                throwException = true;
+            } else {
+                // When local index updates are atomic with data updates, 
testing a write failure to a local
+                // index won't make sense.
+                Mutation operation = miniBatchOp.getOperation(0);
+                if (FAIL_WRITE) {
+                    Map<byte[],List<Cell>>cellMap = 
operation.getFamilyCellMap();
+                    for (Map.Entry<byte[],List<Cell>> entry : 
cellMap.entrySet()) {
+                        byte[] family = entry.getKey();
+                        if 
(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX))
 {
+                            int regionStartKeyLen = 
c.getEnvironment().getRegionInfo().getStartKey().length;
+                            Cell firstCell = entry.getValue().get(0);
+                            short indexId = 
MetaDataUtil.getViewIndexIdDataType().getCodec().decodeShort(firstCell.getRowArray(),
 firstCell.getRowOffset() + regionStartKeyLen, SortOrder.getDefault());
+                            // Only throw for first local index as the test 
may have multiple local indexes
+                            if (indexId == Short.MIN_VALUE) {
+                                throwException = true;
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+            if (throwException) {
                 dropIndex(c);
                 throw new DoNotRetryIOException();
             }
-            Mutation operation = miniBatchOp.getOperation(0);
-            Set<byte[]> keySet = operation.getFamilyMap().keySet();
-            for(byte[] family: keySet) {
-                
if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)
 && FAIL_WRITE) {
-                    dropIndex(c);
-                    throw new DoNotRetryIOException();
-                }
-            }
         }
 
          private void dropIndex(ObserverContext<RegionCoprocessorEnvironment> 
c) {
@@ -417,7 +490,7 @@ public class MutableIndexFailureIT extends BaseTest {
                  Connection connection =
                          
QueryUtil.getConnection(c.getEnvironment().getConfiguration());
                  connection.createStatement().execute(
-                     "DROP INDEX IF EXISTS " + INDEX_NAME + "_3" + " ON "
+                        "DROP INDEX IF EXISTS " + "B_" + FAIL_INDEX_NAME + " 
ON "
                              + fullTableName);
              } catch (ClassNotFoundException e) {
              } catch (SQLException e) {
@@ -425,4 +498,4 @@ public class MutableIndexFailureIT extends BaseTest {
          }
     }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
deleted file mode 100644
index 18d1744..0000000
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end.index;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.BaseOwnClusterIT;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.StringUtil;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.common.collect.Maps;
-/**
- * 
- * Test for failure of region server to write to index table.
- * For some reason dropping tables after running this test
- * fails unless it runs its own mini cluster. 
- * 
- * 
- * @since 2.1
- */
-
-@Category(NeedsOwnMiniClusterTest.class)
-@RunWith(Parameterized.class)
-public class ReadOnlyIndexFailureIT extends BaseOwnClusterIT {
-    public static volatile boolean FAIL_WRITE = false;
-    public static final String INDEX_NAME = "IDX";
-
-    private String tableName;
-    private String indexName;
-    private String fullTableName;
-    private String fullIndexName;
-    private final boolean localIndex;
-
-    public ReadOnlyIndexFailureIT(boolean localIndex) {
-        this.localIndex = localIndex;
-        this.tableName = (localIndex ? "L_" : "") + 
TestUtil.DEFAULT_DATA_TABLE_NAME;
-        this.indexName = INDEX_NAME;
-        this.fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-        this.fullIndexName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
-    }
-
-    @Parameters(name = "ReadOnlyIndexFailureIT_localIndex={0}") // name is 
used by failsafe as file name in reports
-    public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] { { false }, { true } });
-    }
-
-    @BeforeClass
-    public static void doSetup() throws Exception {
-        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
-        serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
-        serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
-        serverProps.put("hbase.client.pause", "5000");
-        serverProps.put("hbase.balancer.period", 
String.valueOf(Integer.MAX_VALUE));
-        serverProps.put(QueryServices.INDEX_FAILURE_BLOCK_WRITE, "true");
-        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, 
"true");
-        
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, 
"1000");
-        serverProps.put("hbase.coprocessor.region.classes", 
FailingRegionObserver.class.getName());
-        serverProps.put("hbase.coprocessor.abortonerror", "false");
-        serverProps.put(Indexer.CHECK_VERSION_CONF_KEY, "false");
-        NUM_SLAVES_BASE = 4;
-        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
-                ReadOnlyProps.EMPTY_PROPS);
-    }
-
-    @Test
-    public void testWriteFailureReadOnlyIndex() throws Exception {
-        helpTestWriteFailureReadOnlyIndex();
-    }
-
-    public void helpTestWriteFailureReadOnlyIndex() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        try (Connection conn = driver.connect(url, props)) {
-            String query;
-            ResultSet rs;
-            conn.setAutoCommit(false);
-            conn.createStatement().execute(
-                    "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL 
PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-            query = "SELECT * FROM " + fullTableName;
-            rs = conn.createStatement().executeQuery(query);
-            assertFalse(rs.next());
-
-            FAIL_WRITE = false;
-            if(localIndex) {
-                conn.createStatement().execute(
-                        "CREATE LOCAL INDEX " + indexName + " ON " + 
fullTableName 
-                        + " (v1) INCLUDE (v2)");
-            } else {
-                conn.createStatement().execute(
-                        "CREATE INDEX " + indexName + " ON " + fullTableName 
-                        + " (v1) INCLUDE (v2)");
-            }
-
-            query = "SELECT * FROM " + fullIndexName;
-            rs = conn.createStatement().executeQuery(query);
-            assertFalse(rs.next());
-
-            // Verify the metadata for index is correct.
-            rs = conn.getMetaData().getTables(null, 
-                    StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), 
indexName,
-                    new String[] { PTableType.INDEX.toString() });
-            assertTrue(rs.next());
-            assertEquals(indexName, rs.getString(3));
-            assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
-            assertFalse(rs.next());
-
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
fullTableName 
-                    + " VALUES(?,?,?)");
-            stmt.setString(1, "1");
-            stmt.setString(2, "aaa");
-            stmt.setString(3, "a1");
-            stmt.execute();
-            conn.commit();
-
-            FAIL_WRITE = true;
-            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " 
VALUES(?,?,?)");
-            stmt.setString(1, "2");
-            stmt.setString(2, "bbb");
-            stmt.setString(3, "b2");
-            stmt.execute();
-            try {
-                conn.commit();
-                fail();
-            } catch (SQLException e) {
-            }
-
-            // Only successfully committed row should be seen
-            query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName;
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals("aaa", rs.getString(1));
-            assertFalse(rs.next());
-            
-            // Verify the metadata for index is correct.
-            rs = conn.getMetaData().getTables(null, 
-                    StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), 
indexName,
-                    new String[] { PTableType.INDEX.toString() });
-            assertTrue(rs.next());
-            assertEquals(indexName, rs.getString(3));
-            // the index is always active for tables upon index table write 
failure
-            assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
-            assertFalse(rs.next());
-
-            // if the table is transactional the write to the index table will 
fail because the
-            // index has not been disabled
-            // Verify UPSERT on data table is blocked  after index write failed
-            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " 
VALUES(?,?,?)");
-            stmt.setString(1, "3");
-            stmt.setString(2, "ccc");
-            stmt.setString(3, "3c");
-            try {
-                stmt.execute();
-                /* Writes would be blocked */
-                conn.commit();
-                fail();
-            } catch (SQLException e) {
-                
assertEquals(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE.getErrorCode(), 
e.getErrorCode());
-            }
-
-            FAIL_WRITE = false;
-            // Second attempt at writing will succeed
-            int retries = 0;
-            do {
-                Thread.sleep(5 * 1000); // sleep 5 secs
-                if(!hasIndexDisableTimestamp(conn, indexName)){
-                    break;
-                }
-                if (++retries == 5) {
-                    fail("Failed to rebuild index with allowed time");
-                }
-            } while(true);
-
-            // Verify UPSERT on data table still work after index table is 
recreated
-            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " 
VALUES(?,?,?)");
-            stmt.setString(1, "4");
-            stmt.setString(2, "ddd");
-            stmt.setString(3, "4d");
-            stmt.execute();
-            conn.commit();
-
-            // verify index table has data
-            query = "SELECT count(1) FROM " + fullIndexName;
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals(3, rs.getInt(1));
-            
-            query = "SELECT /*+ INDEX(" + indexName + ") */ v1 FROM " + 
fullTableName;
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals("aaa", rs.getString(1));
-            assertTrue(rs.next());
-            assertEquals("bbb", rs.getString(1));
-            assertTrue(rs.next());
-            assertEquals("ddd", rs.getString(1));
-            assertFalse(rs.next());
-
-            query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName;
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals("aaa", rs.getString(1));
-            assertTrue(rs.next());
-            assertEquals("bbb", rs.getString(1));
-            assertTrue(rs.next());
-            assertEquals("ddd", rs.getString(1));
-            assertFalse(rs.next());
-        }
-    }
-    
-    private static boolean hasIndexDisableTimestamp(Connection conn, String 
indexName) throws SQLException {
-        ResultSet rs = conn.createStatement().executeQuery("SELECT " + 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP +
-                " FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + 
-                " WHERE " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL" +
-                " AND " + PhoenixDatabaseMetaData.TENANT_ID + " IS NULL" +
-                " AND " + PhoenixDatabaseMetaData.TABLE_SCHEM + " IS NULL" +
-                " AND " + PhoenixDatabaseMetaData.TABLE_NAME +  " = '" + 
indexName + "'");
-        assertTrue(rs.next());
-        long ts = rs.getLong(1);
-        return (!rs.wasNull() && ts > 0);
-    }
-
-    
-    public static class FailingRegionObserver extends SimpleRegionObserver {
-        @Override
-        public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
-            if 
(c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME)
 && FAIL_WRITE) {
-                throw new DoNotRetryIOException();
-            }
-            Mutation operation = miniBatchOp.getOperation(0);
-            Set<byte[]> keySet = operation.getFamilyMap().keySet();
-            for(byte[] family: keySet) {
-                
if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)
 && FAIL_WRITE) {
-                    throw new DoNotRetryIOException();
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index cee545a..fe9be6e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -79,6 +79,7 @@ import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
@@ -533,6 +534,10 @@ public class DeleteCompiler {
             } else if (runOnServer) {
                 // TODO: better abstraction
                 Scan scan = context.getScan();
+                // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations 
since there will be
+                // future dated data row mutations that will get in the way of 
generating the
+                // correct index rows on replay.
+                
scan.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
                 scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, 
QueryConstants.TRUE);
     
                 // Build an ungrouped aggregate query: select COUNT(*) from 
<table> where <where>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 931513a..7b50125 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -713,6 +713,10 @@ public class UpsertCompiler {
                      */
                     final StatementContext context = queryPlan.getContext();
                     final Scan scan = context.getScan();
+                    // Propagate IGNORE_NEWER_MUTATIONS when replaying 
mutations since there will be
+                    // future dated data row mutations that will get in the 
way of generating the
+                    // correct index rows on replay.
+                    
scan.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
                     
scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE, 
UngroupedAggregateRegionObserver.serialize(projectedTable));
                     
scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, 
UngroupedAggregateRegionObserver.serialize(projectedExpressions));
                     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 582c64d..3a2dd4f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -3417,7 +3417,11 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     Cell newDisableTimeStampCell = 
newKVs.get(disableTimeStampKVIndex);
                     long newDisableTimeStamp = (Long) 
PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(),
                             newDisableTimeStampCell.getValueOffset(), 
newDisableTimeStampCell.getValueLength());
-                    if(curTimeStampVal > 0 && curTimeStampVal < 
newDisableTimeStamp){
+                    // We use the sign of the INDEX_DISABLE_TIMESTAMP to 
differentiate the keep-index-active (negative)
+                    // from block-writes-to-data-table case. In either case, 
we want to keep the oldest timestamp to
+                    // drive the partial index rebuild rather than update it 
with each attempt to update the index
+                    // when a new data table write occurs.
+                    if (curTimeStampVal != 0 && Math.abs(curTimeStampVal) < 
Math.abs(newDisableTimeStamp)) {
                         // not reset disable timestamp
                         newKVs.remove(disableTimeStampKVIndex);
                         disableTimeStampKVIndex = -1;

Reply via email to