Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 4beb182db -> 6bcee5776


http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 1fb8221..7e4f1a9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.coprocessor;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 import java.io.IOException;
-import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -42,11 +41,13 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -73,7 +74,6 @@ import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PLong;
@@ -99,7 +99,6 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
     protected ScheduledThreadPoolExecutor executor = new 
ScheduledThreadPoolExecutor(1);
     private boolean enableRebuildIndex = 
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD;
     private long rebuildIndexTimeInterval = 
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL;
-    private boolean blockWriteRebuildIndex = false;
     private static Map<PName, Long> batchExecutedPerTableMap = new 
HashMap<PName, Long>();
 
     @Override
@@ -108,13 +107,13 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
         executor.shutdownNow();
         
GlobalCache.getInstance(c.getEnvironment()).getMetaDataCache().invalidateAll();
     }
-    
+
     @Override
     public void start(CoprocessorEnvironment env) throws IOException {
-        // sleep a little bit to compensate time clock skew when 
SYSTEM.CATALOG moves 
+        // sleep a little bit to compensate time clock skew when 
SYSTEM.CATALOG moves
         // among region servers because we relies on server time of RS which 
is hosting
         // SYSTEM.CATALOG
-        long sleepTime = 
env.getConfiguration().getLong(QueryServices.CLOCK_SKEW_INTERVAL_ATTRIB, 
+        long sleepTime = 
env.getConfiguration().getLong(QueryServices.CLOCK_SKEW_INTERVAL_ATTRIB,
             QueryServicesOptions.DEFAULT_CLOCK_SKEW_INTERVAL);
         try {
             if(sleepTime > 0) {
@@ -123,12 +122,10 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
         }
-        enableRebuildIndex = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB,
 
+        enableRebuildIndex = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB,
             QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD);
-        rebuildIndexTimeInterval = 
env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB,
 
+        rebuildIndexTimeInterval = 
env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB,
             
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL);
-        blockWriteRebuildIndex = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
-               QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
         
     }
     
@@ -171,7 +168,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
         t.setDaemon(true);
         t.start();
 
-        if (!enableRebuildIndex && !blockWriteRebuildIndex) {
+        if (!enableRebuildIndex) {
             LOG.info("Failure Index Rebuild is skipped by configuration.");
             return;
         }
@@ -190,7 +187,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
             LOG.error("BuildIndexScheduleTask cannot start!", ex);
         }
     }
-    
+
     /**
      * Task runs periodically to build indexes whose 
INDEX_NEED_PARTIALLY_REBUILD is set true
      *
@@ -205,8 +202,8 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
 
         public BuildIndexScheduleTask(RegionCoprocessorEnvironment env) {
             this.env = env;
-            this.rebuildIndexBatchSize = env.getConfiguration()
-                    
.getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, 
HConstants.LATEST_TIMESTAMP);
+            this.rebuildIndexBatchSize = env.getConfiguration().getLong(
+                    QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, 
HConstants.LATEST_TIMESTAMP);
             this.configuredBatches = env.getConfiguration().getLong(
                     
QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE, 
configuredBatches);
         }
@@ -227,20 +224,20 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
             try {
                 Scan scan = new Scan();
                 SingleColumnValueFilter filter = new 
SingleColumnValueFilter(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                        PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, 
CompareFilter.CompareOp.GREATER,
-                        PLong.INSTANCE.toBytes(0L));
+                    PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
+                    CompareFilter.CompareOp.NOT_EQUAL, 
PLong.INSTANCE.toBytes(0L));
                 filter.setFilterIfMissing(true);
                 scan.setFilter(filter);
-                scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.TABLE_NAME_BYTES);
                 scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                        PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
-                scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
+                    PhoenixDatabaseMetaData.TABLE_NAME_BYTES);
                 scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                        PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
-                PreparedStatement updateDisabledTimeStampSmt = null;
+                    PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
+                scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
+                scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
 
                 Map<PTable, List<PTable>> dataTableToIndexesMap = null;
-                MetaDataClient client = null;
                 boolean hasMore = false;
                 List<Cell> results = new ArrayList<Cell>();
                 scanner = this.env.getRegion().getScanner(scan);
@@ -252,26 +249,17 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
 
                     Result r = Result.create(results);
                     byte[] disabledTimeStamp = 
r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                            
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
+                        PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
                     byte[] indexState = 
r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                             PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
 
-                    if (disabledTimeStamp == null || disabledTimeStamp.length 
== 0 || (indexState != null
-                            && PIndexState.BUILDING == 
PIndexState.fromSerializedValue(Bytes.toString(indexState)))) {
-                        // Don't rebuild the building index , because they are 
marked for aysnc
+                    if (disabledTimeStamp == null || disabledTimeStamp.length 
== 0) {
                         continue;
                     }
 
-                    // disableTimeStamp has to be a positive value
-                    long disabledTimeStampVal = 
PLong.INSTANCE.getCodec().decodeLong(disabledTimeStamp, 0,
-                            SortOrder.getDefault());
-                    if (disabledTimeStampVal <= 0) {
-                        continue;
-                    }
                     byte[] dataTable = 
r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                            PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
-                    if ((dataTable == null || dataTable.length == 0)
-                            || (indexState == null || indexState.length == 0)) 
{
+                        PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
+                    if ((dataTable == null || dataTable.length == 0) || 
(indexState == null || indexState.length == 0)) {
                         // data table name can't be empty
                         continue;
                     }
@@ -289,21 +277,18 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                     }
 
                     if (conn == null) {
-                        final Properties props = new Properties();
-                        // Set SCN so that we don't ping server and have the 
upper bound set back to
-                        // the timestamp when the failure occurred.
-                        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(Long.MAX_VALUE));
-
-                        // Set timeout to max value as rebuilding may take time
-                        
props.setProperty(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, 
Long.toString(Long.MAX_VALUE));
-                        
props.setProperty(QueryServices.HBASE_CLIENT_SCANNER_TIMEOUT_ATTRIB,
-                                Long.toString(Long.MAX_VALUE));
-                        props.setProperty(QueryServices.RPC_TIMEOUT_ATTRIB, 
Long.toString(Long.MAX_VALUE));
-                        // don't run a second index populations upsert select
-                        
props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0");
-                        conn = QueryUtil.getConnectionOnServer(props, 
env.getConfiguration())
-                                .unwrap(PhoenixConnection.class);
-                        client = new MetaDataClient(conn);
+                       final Properties props = new Properties();
+                       // Set SCN so that we don't ping server and have the 
upper bound set back to
+                       // the timestamp when the failure occurred.
+                       props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(Long.MAX_VALUE));
+                       
+                       //Set timeout to max value as rebuilding may take time
+                       
props.setProperty(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, 
Long.toString(Long.MAX_VALUE));
+                       
props.setProperty(QueryServices.HBASE_CLIENT_SCANNER_TIMEOUT_ATTRIB, 
Long.toString(Long.MAX_VALUE));
+                       props.setProperty(QueryServices.RPC_TIMEOUT_ATTRIB, 
Long.toString(Long.MAX_VALUE));
+                       // don't run a second index populations upsert select 
+                        
props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); 
+                        conn = QueryUtil.getConnectionOnServer(props, 
env.getConfiguration()).unwrap(PhoenixConnection.class);
                         dataTableToIndexesMap = Maps.newHashMap();
                     }
                     String dataTableFullName = 
SchemaUtil.getTableName(schemaName, dataTable);
@@ -315,7 +300,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                     if (!dataPTable.getIndexes().contains(indexPTable)) {
                         continue;
                     }
-
+                    
                     if 
(!MetaDataUtil.tableRegionsOnline(this.env.getConfiguration(), indexPTable)) {
                         LOG.debug("Index rebuild has been skipped because not 
all regions of index table="
                                 + indexPTable.getName() + " are online.");
@@ -332,173 +317,198 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                         dataTableToIndexesMap.put(dataPTable, 
indexesToPartiallyRebuild);
                     }
                     LOG.debug("We have found " + indexPTable.getIndexState() + 
" Index:" + indexPTable.getName()
-                            + " on data table:" + dataPTable.getName() + " 
which was disabled at "
+                            + " on data table:" + dataPTable.getName() + " 
which failed to be updated at "
                             + indexPTable.getIndexDisableTimestamp());
                     indexesToPartiallyRebuild.add(indexPTable);
                 } while (hasMore);
-                if (dataTableToIndexesMap != null) {
-                    long overlapTime = env.getConfiguration().getLong(
-                            
QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
-                            
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
-                    for (Map.Entry<PTable, List<PTable>> entry : 
dataTableToIndexesMap.entrySet()) {
-                        PTable dataPTable = entry.getKey();
-                        List<PTable> indexesToPartiallyRebuild = 
entry.getValue();
-                        ReadOnlyProps props = new 
ReadOnlyProps(env.getConfiguration().iterator());
-                        try (HTableInterface metaTable = env.getTable(
-                                
SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, 
props))) {
-                            long earliestDisableTimestamp = Long.MAX_VALUE;
-                            List<IndexMaintainer> maintainers = Lists
-                                    
.newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
-                            for (PTable index : indexesToPartiallyRebuild) {
-                                long disabledTimeStampVal = 
index.getIndexDisableTimestamp();
-                                if (disabledTimeStampVal > 0) {
-                                    if (disabledTimeStampVal < 
earliestDisableTimestamp) {
-                                        earliestDisableTimestamp = 
disabledTimeStampVal;
-                                    }
 
-                                    
maintainers.add(index.getIndexMaintainer(dataPTable, conn));
-                                }
-                            }
-                            // No indexes are disabled, so skip this table
-                            if (earliestDisableTimestamp == Long.MAX_VALUE) {
-                                continue;
-                            }
-                            long timeStamp = Math.max(0, 
earliestDisableTimestamp - overlapTime);
-                            LOG.info("Starting to build " + dataPTable + " 
indexes " + indexesToPartiallyRebuild
-                                    + " from timestamp=" + timeStamp);
-
-                            TableRef tableRef = new TableRef(null, dataPTable, 
HConstants.LATEST_TIMESTAMP, false);
-                            // TODO Need to set high timeout
-                            PostDDLCompiler compiler = new 
PostDDLCompiler(conn);
-                            MutationPlan plan = 
compiler.compile(Collections.singletonList(tableRef), null, null, null,
-                                    HConstants.LATEST_TIMESTAMP);
-                            Scan dataTableScan = 
IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(),
-                                    maintainers);
-
-                            long scanEndTime = getTimestampForBatch(timeStamp,
-                                    
batchExecutedPerTableMap.get(dataPTable.getName()));
-                            dataTableScan.setTimeRange(timeStamp, scanEndTime);
-                            dataTableScan.setCacheBlocks(false);
-                            
dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, 
TRUE_BYTES);
-
-                            ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable(
-                                    ByteUtil.EMPTY_BYTE_ARRAY);
-                            IndexMaintainer.serializeAdditional(dataPTable, 
indexMetaDataPtr, indexesToPartiallyRebuild,
-                                    conn);
-                            byte[] attribValue = 
ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
-                            
dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
-                            MutationState mutationState = plan.execute();
-                            long rowCount = mutationState.getUpdateCount();
-                            LOG.info(rowCount + " rows of index which are 
rebuild");
-                            for (PTable indexPTable : 
indexesToPartiallyRebuild) {
-                                String indexTableFullName = 
SchemaUtil.getTableName(
-                                        
indexPTable.getSchemaName().getString(),
-                                        
indexPTable.getTableName().getString());
-                                if (scanEndTime == 
HConstants.LATEST_TIMESTAMP) {
-                                    updateIndexState(conn, indexTableFullName, 
env, PIndexState.INACTIVE,
-                                            PIndexState.ACTIVE);
-                                    
batchExecutedPerTableMap.remove(dataPTable.getName());
-                                } else {
-
-                                    updateDisableTimestamp(conn, 
indexTableFullName, env, scanEndTime, metaTable);
-                                    Long noOfBatches = 
batchExecutedPerTableMap.get(dataPTable.getName());
-                                    if (noOfBatches == null) {
-                                        noOfBatches = 0l;
+                               if (dataTableToIndexesMap != null) {
+                                       long overlapTime = 
env.getConfiguration().getLong(
+                                                       
QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
+                                                       
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
+                                       for (Map.Entry<PTable, List<PTable>> 
entry : dataTableToIndexesMap.entrySet()) {
+                                               PTable dataPTable = 
entry.getKey();
+                                               List<PTable> 
indexesToPartiallyRebuild = entry.getValue();
+                                               ReadOnlyProps props = new 
ReadOnlyProps(env.getConfiguration().iterator());
+                                               try (HTableInterface metaTable 
= env.getTable(
+                                                               
SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, 
props))) {
+                                                       long 
earliestDisableTimestamp = Long.MAX_VALUE;
+                                                       List<IndexMaintainer> 
maintainers = Lists
+                                                                       
.newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
+                                                       int 
signOfDisableTimeStamp = 0;
+                                                       for (PTable index : 
indexesToPartiallyRebuild) {
+                                                   // We need a way of 
differentiating the block writes to data table case from
+                                                   // the leave index active 
case. In either case, we need to know the time stamp
+                                                   // at which writes started 
failing so we can rebuild from that point. If we
+                                                   // keep the index active 
*and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES,
+                                                   // then writes to the data 
table will be blocked (this is client side logic
+                                                   // and we can't change this 
in a minor release). So we use the sign of the
+                                                   // time stamp to 
differentiate.
+                                                               long 
disabledTimeStampVal = index.getIndexDisableTimestamp();
+                                                               if 
(disabledTimeStampVal != 0) {
+                                    if (signOfDisableTimeStamp != 0 && 
signOfDisableTimeStamp != Long.signum(disabledTimeStampVal)) {
+                                        LOG.warn("Found unexpected mix of 
signs with INDEX_DISABLE_TIMESTAMP for " + dataPTable.getName().getString() + " 
with " + indexesToPartiallyRebuild); 
                                     }
-                                    
batchExecutedPerTableMap.put(dataPTable.getName(), ++noOfBatches);
-                                    // clearing cache to get the updated
-                                    // disabled timestamp
-                                    new 
MetaDataClient(conn).updateCache(dataPTable.getSchemaName().getString(),
-                                            
dataPTable.getTableName().getString());
-                                    new 
MetaDataClient(conn).updateCache(indexPTable.getSchemaName().getString(),
-                                            
indexPTable.getTableName().getString());
-                                    LOG.info(
-                                            "During Round-robin build: 
Successfully updated index disabled timestamp  for "
-                                                    + indexTableFullName + " 
to " + scanEndTime);
-                                }
-
+                                                                   
signOfDisableTimeStamp = Long.signum(disabledTimeStampVal);
+                                       disabledTimeStampVal = 
Math.abs(disabledTimeStampVal);
+                                                                       if 
(disabledTimeStampVal < earliestDisableTimestamp) {
+                                                                               
earliestDisableTimestamp = disabledTimeStampVal;
+                                                                       }
+
+                                                                       
maintainers.add(index.getIndexMaintainer(dataPTable, conn));
+                                                               }
+                                                       }
+                                                       // No indexes are 
disabled, so skip this table
+                                                       if 
(earliestDisableTimestamp == Long.MAX_VALUE) {
+                                                               continue;
+                                                       }
+                                                       long timeStamp = 
Math.max(0, earliestDisableTimestamp - overlapTime);
+                                                       LOG.info("Starting to 
build " + dataPTable + " indexes " + indexesToPartiallyRebuild
+                                                                       + " 
from timestamp=" + timeStamp);
+                                                       
+                                                       TableRef tableRef = new 
TableRef(null, dataPTable, HConstants.LATEST_TIMESTAMP, false);
+                                                       // TODO Need to set 
high timeout
+                                                       PostDDLCompiler 
compiler = new PostDDLCompiler(conn);
+                                                       MutationPlan plan = 
compiler.compile(Collections.singletonList(tableRef), null, null, null,
+                                                                       
HConstants.LATEST_TIMESTAMP);
+                                                       Scan dataTableScan = 
IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(),
+                                                                       
maintainers);
+
+                                                       long scanEndTime = 
getTimestampForBatch(timeStamp,
+                                                                       
batchExecutedPerTableMap.get(dataPTable.getName()));
+                                                       
+                                                       
dataTableScan.setTimeRange(timeStamp, scanEndTime);
+                                                       
dataTableScan.setCacheBlocks(false);
+                                                       
dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, 
TRUE_BYTES);
+
+                                                       ImmutableBytesWritable 
indexMetaDataPtr = new ImmutableBytesWritable(
+                                                                       
ByteUtil.EMPTY_BYTE_ARRAY);
+                                                       
IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, 
indexesToPartiallyRebuild,
+                                                                       conn);
+                                                       byte[] attribValue = 
ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+                                                       
dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
+                            LOG.info("Starting to partially build indexes:" + 
indexesToPartiallyRebuild
+                                    + " on data table:" + dataPTable.getName() 
+ " with the earliest disable timestamp:"
+                                    + earliestDisableTimestamp + " till "
+                                    + (scanEndTime == 
HConstants.LATEST_TIMESTAMP ? "LATEST_TIMESTAMP" : scanEndTime));
+                                                       MutationState 
mutationState = plan.execute();
+                                                       long rowCount = 
mutationState.getUpdateCount();
+                            if (scanEndTime == HConstants.LATEST_TIMESTAMP) {
+                                LOG.info("Rebuild completed for all 
inactive/disabled indexes in data table:"
+                                        + dataPTable.getName());
                             }
-                        } catch (Exception e) { // Log, but try next table's
-                            // indexes
-                            LOG.warn("Unable to rebuild " + dataPTable + " 
indexes " + indexesToPartiallyRebuild
-                                    + ". Will try again next on next scheduled 
invocation.", e);
-                        }
-                    }
-                }
-            } catch (Throwable t) {
-                LOG.warn("ScheduledBuildIndexTask failed!", t);
-            } finally {
-                inProgress.decrementAndGet();
-                if (scanner != null) {
-                    try {
-                        scanner.close();
-                    } catch (IOException ignored) {
-                        LOG.debug("ScheduledBuildIndexTask can't close 
scanner.", ignored);
-                    }
-                }
-                if (conn != null) {
-                    try {
-                        conn.close();
-                    } catch (SQLException ignored) {
-                        LOG.debug("ScheduledBuildIndexTask can't close 
connection", ignored);
-                    }
-                }
-            }
-
+                            LOG.info(" no. of datatable rows read in 
rebuilding process is " + rowCount);
+                                                       for (PTable indexPTable 
: indexesToPartiallyRebuild) {
+                                                               String 
indexTableFullName = SchemaUtil.getTableName(
+                                                                               
indexPTable.getSchemaName().getString(),
+                                                                               
indexPTable.getTableName().getString());
+                                                               if (scanEndTime 
== HConstants.LATEST_TIMESTAMP) {
+                                                                       
updateIndexState(conn, indexTableFullName, env, PIndexState.INACTIVE,
+                                                                               
        PIndexState.ACTIVE);
+                                                                       
batchExecutedPerTableMap.remove(dataPTable.getName());
+                                    LOG.info("Making Index:" + 
indexPTable.getTableName() + " active after rebuilding");
+                                                               } else {
+                                                                   // Maintain 
sign of INDEX_DISABLE_TIMESTAMP (see comment above)
+                                                                       
updateDisableTimestamp(conn, indexTableFullName, env, scanEndTime * 
signOfDisableTimeStamp, metaTable);
+                                                                       Long 
noOfBatches = batchExecutedPerTableMap.get(dataPTable.getName());
+                                                                       if 
(noOfBatches == null) {
+                                                                               
noOfBatches = 0l;
+                                                                       }
+                                                                       
batchExecutedPerTableMap.put(dataPTable.getName(), ++noOfBatches);
+                                                                       // 
clearing cache to get the updated
+                                                                       // 
disabled timestamp
+                                                                       new 
MetaDataClient(conn).updateCache(dataPTable.getSchemaName().getString(),
+                                                                               
        dataPTable.getTableName().getString());
+                                                                       new 
MetaDataClient(conn).updateCache(indexPTable.getSchemaName().getString(),
+                                                                               
        indexPTable.getTableName().getString());
+                                                                       
LOG.info(
+                                                                               
        "During Round-robin build: Successfully updated index disabled 
timestamp  for "
+                                                                               
                        + indexTableFullName + " to " + scanEndTime);
+                                                               }
+
+                                                       }
+                                               } catch (Exception e) { // Log, 
but try next table's
+                                                                               
                // indexes
+                                                       LOG.warn("Unable to 
rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild
+                                                                       + ". 
Will try again next on next scheduled invocation.", e);
+                                               }
+                                       }
+                               }
+                       } catch (Throwable t) {
+                               LOG.warn("ScheduledBuildIndexTask failed!", t);
+                       } finally {
+                               inProgress.decrementAndGet();
+                               if (scanner != null) {
+                                       try {
+                                               scanner.close();
+                                       } catch (IOException ignored) {
+                                               
LOG.debug("ScheduledBuildIndexTask can't close scanner.", ignored);
+                                       }
+                               }
+                               if (conn != null) {
+                                       try {
+                                               conn.close();
+                                       } catch (SQLException ignored) {
+                                               
LOG.debug("ScheduledBuildIndexTask can't close connection", ignored);
+                                       }
+                               }
+                       }
         }
 
         private long getTimestampForBatch(long disabledTimeStamp, Long 
noOfBatches) {
             if (disabledTimeStamp < 0 || rebuildIndexBatchSize > 
(HConstants.LATEST_TIMESTAMP
                     - disabledTimeStamp)) { return 
HConstants.LATEST_TIMESTAMP; }
             long timestampForNextBatch = disabledTimeStamp + 
rebuildIndexBatchSize;
-            if (timestampForNextBatch < 0 || timestampForNextBatch > 
System.currentTimeMillis()
-                    || (noOfBatches != null && noOfBatches > 
configuredBatches)) {
-                // if timestampForNextBatch cross current time , then we should
-                // build the complete index
-                timestampForNextBatch = HConstants.LATEST_TIMESTAMP;
-            }
+                       if (timestampForNextBatch < 0 || timestampForNextBatch 
> System.currentTimeMillis()
+                                       || (noOfBatches != null && noOfBatches 
> configuredBatches)) {
+                               // if timestampForNextBatch cross current time 
, then we should
+                               // build the complete index
+                               timestampForNextBatch = 
HConstants.LATEST_TIMESTAMP;
+                       }
             return timestampForNextBatch;
         }
-
-        private static void updateIndexState(PhoenixConnection conn, String 
indexTableName,
-                RegionCoprocessorEnvironment env, PIndexState oldState, 
PIndexState newState)
-                throws ServiceException, Throwable {
-            byte[] indexTableKey = 
SchemaUtil.getTableKeyFromFullName(indexTableName);
-            String schemaName = 
SchemaUtil.getSchemaNameFromFullName(indexTableName);
-            String indexName = 
SchemaUtil.getTableNameFromFullName(indexTableName);
-            // Mimic the Put that gets generated by the client on an update of 
the
-            // index state
-            Put put = new Put(indexTableKey);
-            put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
-                    newState.getSerializedBytes());
-            if (newState == PIndexState.ACTIVE) {
-                put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                        PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, 
PLong.INSTANCE.toBytes(0));
-                put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                        PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES, 
PLong.INSTANCE.toBytes(0));
-            }
-            final List<Mutation> tableMetadata = Collections.<Mutation> 
singletonList(put);
-            MetaDataMutationResult result = 
conn.getQueryServices().updateIndexState(tableMetadata, null);
-            MutationCode code = result.getMutationCode();
-            if (code == MutationCode.TABLE_NOT_FOUND) { throw new 
TableNotFoundException(schemaName, indexName); }
-            if (code == MutationCode.UNALLOWED_TABLE_MUTATION) { throw new 
SQLExceptionInfo.Builder(
-                    SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION)
-                            .setMessage(" currentState=" + oldState + ". 
requestedState=" + newState)
-                            
.setSchemaName(schemaName).setTableName(indexName).build().buildException(); }
-        }
-
-        private static void updateDisableTimestamp(PhoenixConnection conn, 
String indexTableName,
-                RegionCoprocessorEnvironment env, long disabledTimestamp, 
HTableInterface metaTable)
-                throws IOException {
-            byte[] indexTableKey = 
SchemaUtil.getTableKeyFromFullName(indexTableName);
-            Put put = new Put(indexTableKey);
-            put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                    PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, 
PLong.INSTANCE.toBytes(disabledTimestamp));
-            metaTable.checkAndPut(indexTableKey, 
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                    PhoenixDatabaseMetaData.INDEX_STATE_BYTES, 
-                    PIndexState.INACTIVE.getSerializedBytes(), put);
-
-        }
-
     }
-}
+    
+       private static void updateIndexState(PhoenixConnection conn, String 
indexTableName,
+                       RegionCoprocessorEnvironment env, PIndexState oldState, 
PIndexState newState)
+                                       throws ServiceException, Throwable {
+               byte[] indexTableKey = 
SchemaUtil.getTableKeyFromFullName(indexTableName);
+               String schemaName = 
SchemaUtil.getSchemaNameFromFullName(indexTableName);
+               String indexName = 
SchemaUtil.getTableNameFromFullName(indexTableName);
+               // Mimic the Put that gets generated by the client on an update 
of the
+               // index state
+               Put put = new Put(indexTableKey);
+               put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
+                               newState.getSerializedBytes());
+               if (newState == PIndexState.ACTIVE) {
+                       put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                                       
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, 
PLong.INSTANCE.toBytes(0));
+                       put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                                       
PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES, 
PLong.INSTANCE.toBytes(0));
+               }
+               final List<Mutation> tableMetadata = Collections.<Mutation> 
singletonList(put);
+               MetaDataMutationResult result = 
conn.getQueryServices().updateIndexState(tableMetadata, null);
+               MutationCode code = result.getMutationCode();
+               if (code == MutationCode.TABLE_NOT_FOUND) {
+                       throw new TableNotFoundException(schemaName, indexName);
+               }
+               if (code == MutationCode.UNALLOWED_TABLE_MUTATION) {
+                       throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION)
+                                       .setMessage(" currentState=" + oldState 
+ ". requestedState=" + newState).setSchemaName(schemaName)
+                                       
.setTableName(indexName).build().buildException();
+               }
+       }
+
+       private static void updateDisableTimestamp(PhoenixConnection conn, 
String indexTableName,
+                       RegionCoprocessorEnvironment env, long 
disabledTimestamp, HTableInterface metaTable) throws IOException {
+               byte[] indexTableKey = 
SchemaUtil.getTableKeyFromFullName(indexTableName);
+               Put put = new Put(indexTableKey);
+               put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
+                               PLong.INSTANCE.toBytes(disabledTimestamp));
+               RowMutations rowMutations = new RowMutations(indexTableKey);
+               rowMutations.add(put);
+               metaTable.checkAndMutate(indexTableKey, 
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                               
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, CompareOp.NOT_EQUAL, 
PLong.INSTANCE.toBytes(0),
+                               rowMutations);
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 23b8be0..c9866c2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -378,6 +378,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         
         RegionScanner theScanner = s;
         
+        boolean replayMutations = 
scan.getAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null;
         byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
         byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
         List<Expression> selectExpressions = null;
@@ -609,6 +610,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                             Cell firstKV = results.get(0);
                             Delete delete = new Delete(firstKV.getRowArray(),
                                 firstKV.getRowOffset(), 
firstKV.getRowLength(),ts);
+                            if (replayMutations) {
+                                delete.setAttribute(IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
+                            }
                             mutations.add(delete);
                             // force tephra to ignore this deletes
                             
delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
@@ -660,6 +664,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                 }
                             }
                             for (Mutation mutation : row.toRowMutations()) {
+                                if (replayMutations) {
+                                    
mutation.setAttribute(IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
+                                }
                                 mutations.add(mutation);
                             }
                             for (i = 0; i < selectExpressions.size(); i++) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 2836c45..35ba187 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -182,6 +182,8 @@ public enum SQLExceptionCode {
      ROWTIMESTAMP_COL_INVALID_TYPE(530, "42907", "A column can be added as 
ROW_TIMESTAMP only if it is of type DATE, BIGINT, TIME OR TIMESTAMP."),
      ROWTIMESTAMP_NOT_ALLOWED_ON_VIEW(531, "42908", "Declaring a column as 
row_timestamp is not allowed for views."),
      INVALID_SCN(532, "42909", "Value of SCN cannot be less than zero."),
+     INVALID_REPLAY_AT(533, "42910", "Value of REPLAY_AT cannot be less than 
zero."),
+     UNEQUAL_SCN_AND_REPLAY_AT(534, "42911", "If both specified, values of 
CURRENT_SCN and REPLAY_AT must be equal."),
      /**
      * HBase and Phoenix specific implementation defined sub-classes.
      * Column family related exceptions.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
index a9d8311..b0d22d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
@@ -24,10 +24,16 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 public class CommitException extends SQLException {
     private static final long serialVersionUID = 2L;
     private final int[] uncommittedStatementIndexes;
+    private final long serverTimestamp;
 
-    public CommitException(Exception e, int[] uncommittedStatementIndexes) {
+    public CommitException(Exception e, int[] uncommittedStatementIndexes, 
long serverTimestamp) {
         super(e);
         this.uncommittedStatementIndexes = uncommittedStatementIndexes;
+        this.serverTimestamp = serverTimestamp;
+    }
+    
+    public long getServerTimestamp() {
+        return this.serverTimestamp;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index a04725c..aa6c195 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -80,6 +80,7 @@ import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ByteUtil;
@@ -673,6 +674,14 @@ public class MutationState implements SQLCloseable {
                 rowMutationsPertainingToIndex = rowMutations;
             }
             mutationList.addAll(rowMutations);
+            if (connection.isReplayMutations()) {
+                // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations 
since there will be
+                // future dated data row mutations that will get in the way of 
generating the
+                // correct index rows on replay.
+                for (Mutation mutation : rowMutations) {
+                    
mutation.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
+                }
+            }
             if (mutationsPertainingToIndex != null) mutationsPertainingToIndex
                     .addAll(rowMutationsPertainingToIndex);
         }
@@ -1030,6 +1039,7 @@ public class MutationState implements SQLCloseable {
                     joinMutationState(new TableRef(tableRef), valuesMap, 
txMutations);
                 }
             }
+            long serverTimestamp = HConstants.LATEST_TIMESTAMP;
             Iterator<Entry<TableInfo, List<Mutation>>> mutationsIterator = 
physicalTableMutationMap.entrySet().iterator();
             while (mutationsIterator.hasNext()) {
                 Entry<TableInfo, List<Mutation>> pair = 
mutationsIterator.next();
@@ -1106,6 +1116,7 @@ public class MutationState implements SQLCloseable {
                         // Remove batches as we process them
                         mutations.remove(origTableRef);
                     } catch (Exception e) {
+                        serverTimestamp = ServerUtil.parseServerTimestamp(e);
                         SQLException inferredE = 
ServerUtil.parseServerExceptionOrNull(e);
                         if (inferredE != null) {
                             if (shouldRetry && retryCount == 0 && 
inferredE.getErrorCode() == 
SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
@@ -1127,7 +1138,7 @@ public class MutationState implements SQLCloseable {
                         }
                         // Throw to client an exception that indicates the 
statements that
                         // were not committed successfully.
-                        sqlE = new CommitException(e, 
getUncommittedStatementIndexes());
+                        sqlE = new CommitException(e, 
getUncommittedStatementIndexes(), serverTimestamp);
                     } finally {
                         try {
                             if (cache!=null) 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
index 831aa16..a037e92 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
@@ -61,6 +61,10 @@ public class IndexWriter implements Stoppable {
     this(getCommitter(env), getFailurePolicy(env), env, name);
   }
 
+  public IndexWriter(IndexFailurePolicy failurePolicy, 
RegionCoprocessorEnvironment env, String name) throws IOException {
+      this(getCommitter(env), failurePolicy, env, name);
+    }
+
   public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env) 
throws IOException {
     Configuration conf = env.getConfiguration();
     try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java
new file mode 100644
index 0000000..edacd3a
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.util.ServerUtil;
+
+import com.google.common.collect.Multimap;
+
+/**
+ * 
+ * Implementation of IndexFailurePolicy which takes no action when an
+ * index cannot be updated. As with the standard flow of control, an
+ * exception will still be thrown back to the client. Using this failure
+ * policy means that the action to take upon failure is completely up
+ * to the client.
+ *
+ */
+public class LeaveIndexActiveFailurePolicy implements IndexFailurePolicy {
+
+    @Override
+    public boolean isStopped() {
+        return false;
+    }
+
+    @Override
+    public void stop(String arg0) {
+    }
+
+    @Override
+    public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
+    }
+
+    @Override
+    public void handleFailure(Multimap<HTableInterfaceReference, Mutation> 
attempted, Exception cause)
+            throws IOException {
+        // get timestamp of first cell
+        long ts = 
attempted.values().iterator().next().getFamilyCellMap().values().iterator().next().get(0).getTimestamp();
+        throw ServerUtil.wrapInDoNotRetryIOException("Unable to update the 
following indexes: " + attempted.keySet(), cause, ts);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 5da8be8..842e881 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -31,7 +31,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -67,7 +69,13 @@ import com.google.common.collect.Multimap;
  */
 public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
     private static final Log LOG = 
LogFactory.getLog(PhoenixIndexFailurePolicy.class);
+    public static final String DISABLE_INDEX_ON_WRITE_FAILURE = 
"DISABLE_INDEX_ON_WRITE_FAILURE";
+    public static final String REBUILD_INDEX_ON_WRITE_FAILURE = 
"REBUILD_INDEX_ON_WRITE_FAILURE";
+    public static final String BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE = 
"BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE";
     private RegionCoprocessorEnvironment env;
+    private boolean blockDataTableWritesOnFailure;
+    private boolean disableIndexOnFailure;
+    private boolean rebuildIndexOnFailure;
 
     public PhoenixIndexFailurePolicy() {
         super(new KillServerOnFailurePolicy());
@@ -77,6 +85,31 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
     public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
         super.setup(parent, env);
         this.env = env;
+        rebuildIndexOnFailure = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB,
+                QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD);
+        HTableDescriptor htd = env.getRegion().getTableDesc();
+        // If rebuild index is turned off globally, no need to check the table 
because the background thread
+        // won't be running in this case
+        if (rebuildIndexOnFailure) {
+            String value = htd.getValue(REBUILD_INDEX_ON_WRITE_FAILURE);
+            if (value != null) {
+                rebuildIndexOnFailure = Boolean.parseBoolean(value);
+            }
+        }
+        String value = htd.getValue(DISABLE_INDEX_ON_WRITE_FAILURE);
+        if (value == null) {
+            disableIndexOnFailure = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_DISABLE_INDEX, 
+                QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX);
+        } else {
+            disableIndexOnFailure = Boolean.parseBoolean(value);
+        }
+        value = htd.getValue(BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE);
+        if (value == null) {
+            blockDataTableWritesOnFailure = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, 
+                QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
+        } else {
+            blockDataTableWritesOnFailure = Boolean.parseBoolean(value);
+        }
     }
 
     /**
@@ -91,30 +124,34 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
      */
     @Override
     public void handleFailure(Multimap<HTableInterfaceReference, Mutation> 
attempted, Exception cause) throws IOException {
-        boolean blockWriteRebuildIndex = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, 
-                QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
         boolean throwing = true;
+        long timestamp = HConstants.LATEST_TIMESTAMP;
         try {
-            handleFailureWithExceptions(attempted, cause, 
blockWriteRebuildIndex);
+            timestamp = handleFailureWithExceptions(attempted, cause);
             throwing = false;
         } catch (Throwable t) {
             LOG.warn("handleFailure failed", t);
             super.handleFailure(attempted, cause);
             throwing = false;
         } finally {
-            if (!throwing) throw ServerUtil.createIOException(null, cause);
+            if (!throwing) {
+                throw ServerUtil.wrapInDoNotRetryIOException("Unable to update 
the following indexes: " + attempted.keySet(), cause, timestamp);
+            }
         }
     }
 
-    private void 
handleFailureWithExceptions(Multimap<HTableInterfaceReference, Mutation> 
attempted,
-            Exception cause, boolean blockWriteRebuildIndex) throws Throwable {
+    private long 
handleFailureWithExceptions(Multimap<HTableInterfaceReference, Mutation> 
attempted,
+            Exception cause) throws Throwable {
         Set<HTableInterfaceReference> refs = attempted.asMap().keySet();
         Map<String, Long> indexTableNames = new HashMap<String, 
Long>(refs.size());
         // start by looking at all the tables to which we attempted to write
+        long timestamp = 0;
+        boolean leaveIndexActive = blockDataTableWritesOnFailure || 
!disableIndexOnFailure;
         for (HTableInterfaceReference ref : refs) {
             long minTimeStamp = 0;
 
             // get the minimum timestamp across all the mutations we attempted 
on that table
+            // FIXME: all cell timestamps should be the same
             Collection<Mutation> mutations = attempted.get(ref);
             if (mutations != null) {
                 for (Mutation m : mutations) {
@@ -127,6 +164,7 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
                     }
                 }
             }
+            timestamp = minTimeStamp;
 
             // If the data table has local index column families then get 
local indexes to disable.
             if 
(ref.getTableName().equals(env.getRegion().getTableDesc().getNameAsString())
@@ -139,37 +177,59 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
             }
         }
 
+        // Nothing to do if we're not disabling the index and not rebuilding 
on failure
+        if (!disableIndexOnFailure && !rebuildIndexOnFailure) {
+            return timestamp;
+        }
+
+        PIndexState newState = disableIndexOnFailure ? PIndexState.DISABLE : 
PIndexState.ACTIVE;
         // for all the index tables that we've found, try to disable them and 
if that fails, try to
         for (Map.Entry<String, Long> tableTimeElement 
:indexTableNames.entrySet()){
             String indexTableName = tableTimeElement.getKey();
             long minTimeStamp = tableTimeElement.getValue();
+            // We need a way of differentiating the block writes to data table 
case from
+            // the leave index active case. In either case, we need to know 
the time stamp
+            // at which writes started failing so we can rebuild from that 
point. If we
+            // keep the index active *and* have a positive 
INDEX_DISABLE_TIMESTAMP_BYTES,
+            // then writes to the data table will be blocked (this is client 
side logic
+            // and we can't change this in a minor release). So we use the 
sign of the
+            // time stamp to differentiate.
+            if (!disableIndexOnFailure && !blockDataTableWritesOnFailure) {
+                minTimeStamp *= -1;
+            }
             // Disable the index by using the updateIndexState method of 
MetaDataProtocol end point coprocessor.
             HTableInterface systemTable = env.getTable(SchemaUtil
                     
.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, 
env.getConfiguration()));
-            MetaDataMutationResult result = 
IndexUtil.disableIndexWithTimestamp(indexTableName, minTimeStamp,
-                    systemTable, blockWriteRebuildIndex);
+            MetaDataMutationResult result = 
IndexUtil.setIndexDisableTimeStamp(indexTableName, minTimeStamp,
+                    systemTable, newState);
             if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) {
                 LOG.info("Index " + indexTableName + " has been dropped. 
Ignore uncommitted mutations");
                 continue;
             }
             if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) 
{
-                if (blockWriteRebuildIndex) {
+                if (leaveIndexActive) {
                     LOG.warn("Attempt to update INDEX_DISABLE_TIMESTAMP " + " 
failed with code = "
                             + result.getMutationCode());
-                    throw new DoNotRetryIOException("Attempt to update 
INDEX_DISABLE_TIMESTAMP failed.");
+                    // If we're not disabling the index, then we don't want to 
throw as throwing
+                    // will lead to the RS being shutdown.
+                    if (blockDataTableWritesOnFailure) {
+                        throw new DoNotRetryIOException("Attempt to update 
INDEX_DISABLE_TIMESTAMP failed.");
+                    }
                 } else {
                     LOG.warn("Attempt to disable index " + indexTableName + " 
failed with code = "
                             + result.getMutationCode() + ". Will use default 
failure policy instead.");
                     throw new DoNotRetryIOException("Attempt to disable " + 
indexTableName + " failed.");
                 } 
             }
-            if (blockWriteRebuildIndex)
+            if (leaveIndexActive)
                 LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + 
indexTableName + " due to an exception while writing updates.",
                         cause);
             else
                 LOG.info("Successfully disabled index " + indexTableName + " 
due to an exception while writing updates.",
                         cause);
         }
+        // Return the cell time stamp (note they should all be the same)
+        return timestamp;
     }
 
     private Collection<? extends String> 
getLocalIndexNames(HTableInterfaceReference ref,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index b794ddb..2599f59 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -66,6 +66,7 @@ import 
org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.hbase.index.write.LeaveIndexActiveFailurePolicy;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
@@ -111,7 +112,9 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         codec.initialize(env);
 
         // setup the actual index writer
-        this.writer = new IndexWriter(env, serverName + "-tx-index-writer");
+        // For transactional tables, we keep the index active upon a write 
failure
+        // since we have the all versus none behavior for transactions.
+        this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), 
env, serverName + "-tx-index-writer");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index b489583..5af418d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -143,8 +143,9 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     private final int mutateBatchSize;
     private final long mutateBatchSizeBytes;
     private final Long scn;
+    private final boolean replayMutations;
     private MutationState mutationState;
-    private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
+    private List<PhoenixStatement> statements = new ArrayList<>();
     private boolean isAutoFlush = false;
     private boolean isAutoCommit = false;
     private PMetaData metaData;
@@ -176,7 +177,7 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     }
 
     public PhoenixConnection(PhoenixConnection connection, boolean 
isDescRowKeyOrderUpgrade, boolean isRunningUpgrade) throws SQLException {
-        this(connection.getQueryServices(), connection.getURL(), 
connection.getClientInfo(), connection.metaData, connection.getMutationState(), 
isDescRowKeyOrderUpgrade, isRunningUpgrade);
+        this(connection.getQueryServices(), connection.getURL(), 
connection.getClientInfo(), connection.metaData, connection.getMutationState(), 
isDescRowKeyOrderUpgrade, isRunningUpgrade, connection.replayMutations);
         this.isAutoCommit = connection.isAutoCommit;
         this.isAutoFlush = connection.isAutoFlush;
         this.sampler = connection.sampler;
@@ -188,7 +189,7 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     }
     
     public PhoenixConnection(PhoenixConnection connection, MutationState 
mutationState) throws SQLException {
-        this(connection.getQueryServices(), connection.getURL(), 
connection.getClientInfo(), connection.getMetaDataCache(), mutationState, 
connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade());
+        this(connection.getQueryServices(), connection.getURL(), 
connection.getClientInfo(), connection.getMetaDataCache(), mutationState, 
connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade(), 
connection.replayMutations);
     }
     
     public PhoenixConnection(PhoenixConnection connection, long scn) throws 
SQLException {
@@ -196,7 +197,7 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     }
     
     public PhoenixConnection(ConnectionQueryServices services, 
PhoenixConnection connection, long scn) throws SQLException {
-        this(services, connection.getURL(), 
newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, 
connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade(), 
connection.isRunningUpgrade());
+        this(services, connection.getURL(), 
newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, 
connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade(), 
connection.isRunningUpgrade(), connection.replayMutations);
         this.isAutoCommit = connection.isAutoCommit;
         this.isAutoFlush = connection.isAutoFlush;
         this.sampler = connection.sampler;
@@ -204,14 +205,14 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     }
     
     public PhoenixConnection(ConnectionQueryServices services, String url, 
Properties info, PMetaData metaData) throws SQLException {
-        this(services, url, info, metaData, null, false, false);
+        this(services, url, info, metaData, null, false, false, false);
     }
     
     public PhoenixConnection(PhoenixConnection connection, 
ConnectionQueryServices services, Properties info) throws SQLException {
-        this(services, connection.url, info, connection.metaData, null, 
connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade());
+        this(services, connection.url, info, connection.metaData, null, 
connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade(), 
connection.replayMutations);
     }
     
-    public PhoenixConnection(ConnectionQueryServices services, String url, 
Properties info, PMetaData metaData, MutationState mutationState, boolean 
isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade) throws SQLException {
+    private PhoenixConnection(ConnectionQueryServices services, String url, 
Properties info, PMetaData metaData, MutationState mutationState, boolean 
isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade, boolean 
replayMutations) throws SQLException {
         GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.increment();
         this.url = url;
         this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade;
@@ -238,7 +239,12 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
         
         Long scnParam = JDBCUtil.getCurrentSCN(url, this.info);
         checkScn(scnParam);
-        this.scn = scnParam;
+        Long replayAtParam = JDBCUtil.getReplayAt(url, this.info);
+        checkReplayAt(replayAtParam);
+        checkScnAndReplayAtEquality(scnParam,replayAtParam);
+        
+        this.scn = scnParam != null ? scnParam : replayAtParam;
+        this.replayMutations = replayMutations || replayAtParam != null;
         this.isAutoFlush = 
this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, 
QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)
                 && 
this.services.getProps().getBoolean(QueryServices.AUTO_FLUSH_ATTRIB, 
QueryServicesOptions.DEFAULT_AUTO_FLUSH) ;
         this.isAutoCommit = JDBCUtil.getAutoCommit(
@@ -309,6 +315,18 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
         }
     }
 
+    private static void checkReplayAt(Long replayAtParam) throws SQLException {
+        if (replayAtParam != null && replayAtParam < 0) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_REPLAY_AT).build().buildException();
+        }
+    }
+
+    private static void checkScnAndReplayAtEquality(Long scnParam, Long 
replayAt) throws SQLException {
+        if (scnParam != null && replayAt != null && 
!scnParam.equals(replayAt)) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.UNEQUAL_SCN_AND_REPLAY_AT).build().buildException();
+        }
+    }
+
     private static Properties filterKnownNonProperties(Properties info) {
         Properties prunedProperties = info;
         for (String property : PhoenixRuntime.CONNECTION_PROPERTIES) {
@@ -438,6 +456,10 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
         return scn;
     }
     
+    public boolean isReplayMutations() {
+        return replayMutations;
+    }
+    
     public int getMutateBatchSize() {
         return mutateBatchSize;
     }
@@ -487,7 +509,7 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     }
 
     private void closeStatements() throws SQLException {
-        List<SQLCloseable> statements = this.statements;
+        List<? extends PhoenixStatement> statements = this.statements;
         // create new list to prevent close of statements
         // from modifying this list.
         this.statements = Lists.newArrayList();
@@ -563,6 +585,10 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
         throw new SQLFeatureNotSupportedException();
     }
 
+    public List<PhoenixStatement> getStatements() {
+        return statements;
+    }
+    
     @Override
     public Statement createStatement() throws SQLException {
         PhoenixStatement statement = new PhoenixStatement(this);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index cb649d1..cc207d9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -225,8 +225,16 @@ public class IndexTool extends Configured implements Tool {
                 if (index.getIndexState().equals(PIndexState.BUILDING)) {
                     disableIndexes.add(index.getTableName().getString());
                     disabledPIndexes.add(index);
-                    if (minDisableTimestamp > 
index.getIndexDisableTimestamp()) {
-                        minDisableTimestamp = index.getIndexDisableTimestamp();
+                    // We need a way of differentiating the block writes to 
data table case from
+                    // the leave index active case. In either case, we need to 
know the time stamp
+                    // at which writes started failing so we can rebuild from 
that point. If we
+                    // keep the index active *and* have a positive 
INDEX_DISABLE_TIMESTAMP_BYTES,
+                    // then writes to the data table will be blocked (this is 
client side logic
+                    // and we can't change this in a minor release). So we use 
the sign of the
+                    // time stamp to differentiate.
+                    long indexDisableTimestamp = 
Math.abs(index.getIndexDisableTimestamp());
+                    if (minDisableTimestamp > indexDisableTimestamp) {
+                        minDisableTimestamp = indexDisableTimestamp;
                         indexWithMinDisableTimestamp = index;
                     }
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 16db802..0da67ad 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -784,6 +784,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         HTableDescriptor tableDescriptor = (existingDesc != null) ? new 
HTableDescriptor(existingDesc)
         : new HTableDescriptor(
                 SchemaUtil.getPhysicalHBaseTableName(tableName, 
isNamespaceMapped, tableType).getBytes());
+        // By default, do not automatically rebuild/catch up an index on a 
write failure
         for (Entry<String,Object> entry : tableProps.entrySet()) {
             String key = entry.getKey();
             if (!TableProperty.isPhoenixTableProperty(key)) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 2627207..0c3bb85 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -137,8 +137,9 @@ public interface QueryServices extends SQLCloseable {
     
     public static final String 
INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE = 
"phoenix.index.rebuild.batch.perTable";
 
-    // A master switch if to block writes when index build failed
+    // Block writes to data table when index write fails
     public static final String INDEX_FAILURE_BLOCK_WRITE = 
"phoenix.index.failure.block.write";
+    public static final String INDEX_FAILURE_DISABLE_INDEX = 
"phoenix.index.failure.disable.index";
 
     // Index will be partially re-built from index disable time stamp - 
following overlap time
     public static final String 
INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB =

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index eef964f..b9c01f3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -167,7 +167,8 @@ public class QueryServicesOptions {
     public static final int DEFAULT_CLOCK_SKEW_INTERVAL = 2000;
     public static final boolean DEFAULT_INDEX_FAILURE_HANDLING_REBUILD = true; 
// auto rebuild on
     public static final boolean DEFAULT_INDEX_FAILURE_BLOCK_WRITE = false; 
-    public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 
10000; // 10 secs
+    public static final boolean DEFAULT_INDEX_FAILURE_DISABLE_INDEX = false; 
+    public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 
60000; // 60 secs
     public static final long 
DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 1; // 1 ms
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 8b79867..042ab7f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -159,6 +159,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
@@ -1877,6 +1878,11 @@ public class MetaDataClient {
             if (tableType == PTableType.TABLE) {
                 Boolean isAppendOnlySchemaProp = (Boolean) 
TableProperty.APPEND_ONLY_SCHEMA.getValue(tableProps);
                 isAppendOnlySchema = isAppendOnlySchemaProp!=null ? 
isAppendOnlySchemaProp : false;
+                
+                // By default, do not rebuild indexes on write failure
+                if 
(tableProps.get(PhoenixIndexFailurePolicy.REBUILD_INDEX_ON_WRITE_FAILURE) == 
null) {
+                    
tableProps.put(PhoenixIndexFailurePolicy.REBUILD_INDEX_ON_WRITE_FAILURE, 
Boolean.FALSE);
+                }
             }
 
             // Can't set any of these on views or shared indexes on views

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 8d48204..e473198 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -714,17 +714,13 @@ public class IndexUtil {
             HConstants.NO_NONCE, HConstants.NO_NONCE);
     }
 
-    public static MetaDataMutationResult disableIndexWithTimestamp(String 
indexTableName, long minTimeStamp,
-            HTableInterface metaTable, boolean blockWriteRebuildIndex) throws 
ServiceException, Throwable {
+    public static MetaDataMutationResult setIndexDisableTimeStamp(String 
indexTableName, long minTimeStamp,
+            HTableInterface metaTable, PIndexState newState) throws 
ServiceException, Throwable {
         byte[] indexTableKey = 
SchemaUtil.getTableKeyFromFullName(indexTableName);
         // Mimic the Put that gets generated by the client on an update of the 
index state
         Put put = new Put(indexTableKey);
-        if (blockWriteRebuildIndex)
-            put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
-                    PIndexState.ACTIVE.getSerializedBytes());
-        else
-            put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
-                    PIndexState.DISABLE.getSerializedBytes());
+        put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
+                newState.getSerializedBytes());
         put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
                 PLong.INSTANCE.toBytes(minTimeStamp));
         put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
index c081904..d4cfa34 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
@@ -126,6 +126,11 @@ public class JDBCUtil {
         return (scnStr == null ? null : Long.parseLong(scnStr));
     }
 
+    public static Long getReplayAt(String url, Properties info) throws 
SQLException {
+        String scnStr = findProperty(url, info, 
PhoenixRuntime.REPLAY_AT_ATTRIB);
+        return (scnStr == null ? null : Long.parseLong(scnStr));
+    }
+
     @Deprecated // use getMutateBatchSizeBytes
     public static int getMutateBatchSize(String url, Properties info, 
ReadOnlyProps props) throws SQLException {
         String batchSizeStr = findProperty(url, info, 
PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 6af881b..0a1fd79 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -124,6 +124,20 @@ public class PhoenixRuntime {
     public static final String CURRENT_SCN_ATTRIB = "CurrentSCN";
 
     /**
+     * Use this connection property to set the long time stamp value at
+     * which to replay DML statements after a write failure. The time
+     * stamp value must match the value returned by 
+     * {@link org.apache.phoenix.execute.CommitException#getServerTimestamp()}
+     * when the exception occurred. Used in conjunction with the 
+     * {@link 
org.apache.phoenix.hbase.index.write.LeaveIndexActiveFailurePolicy}
+     * index write failure policy to provide a means of the client replaying
+     * updates to ensure that secondary indexes are correctly caught up
+     * with any data updates when a write failure occurs. The updates
+     * should be replayed in ascending time stamp order.
+     */
+    public static final String REPLAY_AT_ATTRIB = "ReplayAt";
+
+    /**
      * Use this connection property to help with fairness of resource 
allocation
      * for the client and server. The value of the attribute determines the
      * bucket used to rollup resource usage for a particular 
tenant/organization. Each tenant

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index aee1c2e..c5adfa4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -25,6 +25,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -44,6 +45,8 @@ public class ServerUtil {
     
     private static final String FORMAT = "ERROR %d (%s): %s";
     private static final Pattern PATTERN = Pattern.compile("ERROR (\\d+) 
\\((\\w+)\\): (.*)");
+    private static final Pattern PATTERN_FOR_TS = 
Pattern.compile(",serverTimestamp=(\\d+),");
+    private static final String FORMAT_FOR_TIMESTAMP = ",serverTimestamp=%d,";
     private static final Map<Class<? extends Exception>, SQLExceptionCode> 
errorcodeMap
         = new HashMap<Class<? extends Exception>, SQLExceptionCode>();
     static {
@@ -181,4 +184,38 @@ public class ServerUtil {
         }
         return getTableFromSingletonPool(env, tableName);
     }
+    
+    public static long parseServerTimestamp(Throwable t) {
+        while (t.getCause() != null) {
+            t = t.getCause();
+        }
+        return parseTimestampFromRemoteException(t);
+    }
+
+    private static long parseTimestampFromRemoteException(Throwable t) {
+        String message = t.getLocalizedMessage();
+        if (message != null) {
+            // If the message matches the standard pattern, recover the 
SQLException and throw it.
+            Matcher matcher = PATTERN_FOR_TS.matcher(t.getLocalizedMessage());
+            if (matcher.find()) {
+                String tsString = matcher.group(1);
+                if (tsString != null) {
+                    return Long.parseLong(tsString);
+                }
+            }
+        }
+        return HConstants.LATEST_TIMESTAMP;
+    }
+
+    public static DoNotRetryIOException wrapInDoNotRetryIOException(String 
msg, Throwable t, long timestamp) {
+        if (msg == null) {
+            msg = "";
+        }
+        if (t instanceof SQLException) {
+            msg = constructSQLErrorMessage((SQLException) t, msg);
+        }
+        msg += String.format(FORMAT_FOR_TIMESTAMP, timestamp);
+        return new DoNotRetryIOException(msg, t);
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
index dda4248..257ebfc 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
@@ -38,11 +38,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.VersionInfo;
@@ -71,11 +73,15 @@ public class TestIndexWriter {
     assertNotNull(IndexWriter.getCommitter(env));
   }
 
+  @SuppressWarnings("deprecation")
   @Test
   public void getDefaultFailurePolicy() throws Exception {
     Configuration conf = new Configuration(false);
     RegionCoprocessorEnvironment env = 
Mockito.mock(RegionCoprocessorEnvironment.class);
+    HRegion region = Mockito.mock(HRegion.class);
+    Mockito.when(env.getRegion()).thenReturn(region);
     Mockito.when(env.getConfiguration()).thenReturn(conf);
+    Mockito.when(region.getTableDesc()).thenReturn(new HTableDescriptor());
     assertNotNull(IndexWriter.getFailurePolicy(env));
   }
 

Reply via email to