Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.1 9838dcfc5 -> 70c139483


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 5da8be8..842e881 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -31,7 +31,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -67,7 +69,13 @@ import com.google.common.collect.Multimap;
  */
 public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
     private static final Log LOG = 
LogFactory.getLog(PhoenixIndexFailurePolicy.class);
+    public static final String DISABLE_INDEX_ON_WRITE_FAILURE = 
"DISABLE_INDEX_ON_WRITE_FAILURE";
+    public static final String REBUILD_INDEX_ON_WRITE_FAILURE = 
"REBUILD_INDEX_ON_WRITE_FAILURE";
+    public static final String BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE = 
"BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE";
     private RegionCoprocessorEnvironment env;
+    private boolean blockDataTableWritesOnFailure;
+    private boolean disableIndexOnFailure;
+    private boolean rebuildIndexOnFailure;
 
     public PhoenixIndexFailurePolicy() {
         super(new KillServerOnFailurePolicy());
@@ -77,6 +85,31 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
     public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
         super.setup(parent, env);
         this.env = env;
+        rebuildIndexOnFailure = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB,
+                QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD);
+        HTableDescriptor htd = env.getRegion().getTableDesc();
+        // If rebuild index is turned off globally, no need to check the table 
because the background thread
+        // won't be running in this case
+        if (rebuildIndexOnFailure) {
+            String value = htd.getValue(REBUILD_INDEX_ON_WRITE_FAILURE);
+            if (value != null) {
+                rebuildIndexOnFailure = Boolean.parseBoolean(value);
+            }
+        }
+        String value = htd.getValue(DISABLE_INDEX_ON_WRITE_FAILURE);
+        if (value == null) {
+            disableIndexOnFailure = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_DISABLE_INDEX, 
+                QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX);
+        } else {
+            disableIndexOnFailure = Boolean.parseBoolean(value);
+        }
+        value = htd.getValue(BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE);
+        if (value == null) {
+            blockDataTableWritesOnFailure = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, 
+                QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
+        } else {
+            blockDataTableWritesOnFailure = Boolean.parseBoolean(value);
+        }
     }
 
     /**
@@ -91,30 +124,34 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
      */
     @Override
     public void handleFailure(Multimap<HTableInterfaceReference, Mutation> 
attempted, Exception cause) throws IOException {
-        boolean blockWriteRebuildIndex = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, 
-                QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
         boolean throwing = true;
+        long timestamp = HConstants.LATEST_TIMESTAMP;
         try {
-            handleFailureWithExceptions(attempted, cause, 
blockWriteRebuildIndex);
+            timestamp = handleFailureWithExceptions(attempted, cause);
             throwing = false;
         } catch (Throwable t) {
             LOG.warn("handleFailure failed", t);
             super.handleFailure(attempted, cause);
             throwing = false;
         } finally {
-            if (!throwing) throw ServerUtil.createIOException(null, cause);
+            if (!throwing) {
+                throw ServerUtil.wrapInDoNotRetryIOException("Unable to update 
the following indexes: " + attempted.keySet(), cause, timestamp);
+            }
         }
     }
 
-    private void 
handleFailureWithExceptions(Multimap<HTableInterfaceReference, Mutation> 
attempted,
-            Exception cause, boolean blockWriteRebuildIndex) throws Throwable {
+    private long 
handleFailureWithExceptions(Multimap<HTableInterfaceReference, Mutation> 
attempted,
+            Exception cause) throws Throwable {
         Set<HTableInterfaceReference> refs = attempted.asMap().keySet();
         Map<String, Long> indexTableNames = new HashMap<String, 
Long>(refs.size());
         // start by looking at all the tables to which we attempted to write
+        long timestamp = 0;
+        boolean leaveIndexActive = blockDataTableWritesOnFailure || 
!disableIndexOnFailure;
         for (HTableInterfaceReference ref : refs) {
             long minTimeStamp = 0;
 
             // get the minimum timestamp across all the mutations we attempted 
on that table
+            // FIXME: all cell timestamps should be the same
             Collection<Mutation> mutations = attempted.get(ref);
             if (mutations != null) {
                 for (Mutation m : mutations) {
@@ -127,6 +164,7 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
                     }
                 }
             }
+            timestamp = minTimeStamp;
 
             // If the data table has local index column families then get 
local indexes to disable.
             if 
(ref.getTableName().equals(env.getRegion().getTableDesc().getNameAsString())
@@ -139,37 +177,59 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
             }
         }
 
+        // Nothing to do if we're not disabling the index and not rebuilding 
on failure
+        if (!disableIndexOnFailure && !rebuildIndexOnFailure) {
+            return timestamp;
+        }
+
+        PIndexState newState = disableIndexOnFailure ? PIndexState.DISABLE : 
PIndexState.ACTIVE;
         // for all the index tables that we've found, try to disable them and 
if that fails, try to
         for (Map.Entry<String, Long> tableTimeElement 
:indexTableNames.entrySet()){
             String indexTableName = tableTimeElement.getKey();
             long minTimeStamp = tableTimeElement.getValue();
+            // We need a way of differentiating the block writes to data table 
case from
+            // the leave index active case. In either case, we need to know 
the time stamp
+            // at which writes started failing so we can rebuild from that 
point. If we
+            // keep the index active *and* have a positive 
INDEX_DISABLE_TIMESTAMP_BYTES,
+            // then writes to the data table will be blocked (this is client 
side logic
+            // and we can't change this in a minor release). So we use the 
sign of the
+            // time stamp to differentiate.
+            if (!disableIndexOnFailure && !blockDataTableWritesOnFailure) {
+                minTimeStamp *= -1;
+            }
             // Disable the index by using the updateIndexState method of 
MetaDataProtocol end point coprocessor.
             HTableInterface systemTable = env.getTable(SchemaUtil
                     
.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, 
env.getConfiguration()));
-            MetaDataMutationResult result = 
IndexUtil.disableIndexWithTimestamp(indexTableName, minTimeStamp,
-                    systemTable, blockWriteRebuildIndex);
+            MetaDataMutationResult result = 
IndexUtil.setIndexDisableTimeStamp(indexTableName, minTimeStamp,
+                    systemTable, newState);
             if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) {
                 LOG.info("Index " + indexTableName + " has been dropped. 
Ignore uncommitted mutations");
                 continue;
             }
             if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) 
{
-                if (blockWriteRebuildIndex) {
+                if (leaveIndexActive) {
                     LOG.warn("Attempt to update INDEX_DISABLE_TIMESTAMP " + " 
failed with code = "
                             + result.getMutationCode());
-                    throw new DoNotRetryIOException("Attempt to update 
INDEX_DISABLE_TIMESTAMP failed.");
+                    // If we're not disabling the index, then we don't want to 
throw as throwing
+                    // will lead to the RS being shutdown.
+                    if (blockDataTableWritesOnFailure) {
+                        throw new DoNotRetryIOException("Attempt to update 
INDEX_DISABLE_TIMESTAMP failed.");
+                    }
                 } else {
                     LOG.warn("Attempt to disable index " + indexTableName + " 
failed with code = "
                             + result.getMutationCode() + ". Will use default 
failure policy instead.");
                     throw new DoNotRetryIOException("Attempt to disable " + 
indexTableName + " failed.");
                 } 
             }
-            if (blockWriteRebuildIndex)
+            if (leaveIndexActive)
                 LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + 
indexTableName + " due to an exception while writing updates.",
                         cause);
             else
                 LOG.info("Successfully disabled index " + indexTableName + " 
due to an exception while writing updates.",
                         cause);
         }
+        // Return the cell time stamp (note they should all be the same)
+        return timestamp;
     }
 
     private Collection<? extends String> 
getLocalIndexNames(HTableInterfaceReference ref,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 2a6e976..453f38f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -69,6 +69,7 @@ import 
org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.hbase.index.write.LeaveIndexActiveFailurePolicy;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
@@ -111,7 +112,9 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         codec.initialize(env);
 
         // setup the actual index writer
-        this.writer = new IndexWriter(env, serverName + "-tx-index-writer");
+        // For transactional tables, we keep the index active upon a write 
failure
+        // since we have the all versus none behavior for transactions.
+        this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), 
env, serverName + "-tx-index-writer");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 5f5237f..3650c2d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -145,8 +145,9 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     private final int mutateBatchSize;
     private final long mutateBatchSizeBytes;
     private final Long scn;
+    private final boolean replayMutations;
     private MutationState mutationState;
-    private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
+    private List<PhoenixStatement> statements = new ArrayList<>();
     private boolean isAutoFlush = false;
     private boolean isAutoCommit = false;
     private PMetaData metaData;
@@ -179,7 +180,7 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     }
 
     public PhoenixConnection(PhoenixConnection connection, boolean 
isDescRowKeyOrderUpgrade, boolean isRunningUpgrade) throws SQLException {
-        this(connection.getQueryServices(), connection.getURL(), 
connection.getClientInfo(), connection.metaData, connection.getMutationState(), 
isDescRowKeyOrderUpgrade, isRunningUpgrade);
+        this(connection.getQueryServices(), connection.getURL(), 
connection.getClientInfo(), connection.metaData, connection.getMutationState(), 
isDescRowKeyOrderUpgrade, isRunningUpgrade, connection.replayMutations);
         this.isAutoCommit = connection.isAutoCommit;
         this.isAutoFlush = connection.isAutoFlush;
         this.sampler = connection.sampler;
@@ -191,7 +192,7 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     }
     
     public PhoenixConnection(PhoenixConnection connection, MutationState 
mutationState) throws SQLException {
-        this(connection.getQueryServices(), connection.getURL(), 
connection.getClientInfo(), connection.getMetaDataCache(), mutationState, 
connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade());
+        this(connection.getQueryServices(), connection.getURL(), 
connection.getClientInfo(), connection.getMetaDataCache(), mutationState, 
connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade(), 
connection.replayMutations);
     }
     
     public PhoenixConnection(PhoenixConnection connection, long scn) throws 
SQLException {
@@ -199,7 +200,7 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     }
     
     public PhoenixConnection(ConnectionQueryServices services, 
PhoenixConnection connection, long scn) throws SQLException {
-        this(services, connection.getURL(), 
newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, 
connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade(), 
connection.isRunningUpgrade());
+        this(services, connection.getURL(), 
newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, 
connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade(), 
connection.isRunningUpgrade(), connection.replayMutations);
         this.isAutoCommit = connection.isAutoCommit;
         this.isAutoFlush = connection.isAutoFlush;
         this.sampler = connection.sampler;
@@ -207,14 +208,14 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     }
     
     public PhoenixConnection(ConnectionQueryServices services, String url, 
Properties info, PMetaData metaData) throws SQLException {
-        this(services, url, info, metaData, null, false, false);
+        this(services, url, info, metaData, null, false, false, false);
     }
     
     public PhoenixConnection(PhoenixConnection connection, 
ConnectionQueryServices services, Properties info) throws SQLException {
-        this(services, connection.url, info, connection.metaData, null, 
connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade());
+        this(services, connection.url, info, connection.metaData, null, 
connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade(), 
connection.replayMutations);
     }
     
-    public PhoenixConnection(ConnectionQueryServices services, String url, 
Properties info, PMetaData metaData, MutationState mutationState, boolean 
isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade) throws SQLException {
+    private PhoenixConnection(ConnectionQueryServices services, String url, 
Properties info, PMetaData metaData, MutationState mutationState, boolean 
isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade, boolean 
replayMutations) throws SQLException {
         GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.increment();
         this.url = url;
         this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade;
@@ -241,7 +242,12 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
         
         Long scnParam = JDBCUtil.getCurrentSCN(url, this.info);
         checkScn(scnParam);
-        this.scn = scnParam;
+        Long replayAtParam = JDBCUtil.getReplayAt(url, this.info);
+        checkReplayAt(replayAtParam);
+        checkScnAndReplayAtEquality(scnParam,replayAtParam);
+        
+        this.scn = scnParam != null ? scnParam : replayAtParam;
+        this.replayMutations = replayMutations || replayAtParam != null;
         this.isAutoFlush = 
this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, 
QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)
                 && 
this.services.getProps().getBoolean(QueryServices.AUTO_FLUSH_ATTRIB, 
QueryServicesOptions.DEFAULT_AUTO_FLUSH) ;
         this.isAutoCommit = JDBCUtil.getAutoCommit(
@@ -315,6 +321,18 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
         }
     }
 
+    private static void checkReplayAt(Long replayAtParam) throws SQLException {
+        if (replayAtParam != null && replayAtParam < 0) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_REPLAY_AT).build().buildException();
+        }
+    }
+
+    private static void checkScnAndReplayAtEquality(Long scnParam, Long 
replayAt) throws SQLException {
+        if (scnParam != null && replayAt != null && 
!scnParam.equals(replayAt)) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.UNEQUAL_SCN_AND_REPLAY_AT).build().buildException();
+        }
+    }
+
     private static Properties filterKnownNonProperties(Properties info) {
         Properties prunedProperties = info;
         for (String property : PhoenixRuntime.CONNECTION_PROPERTIES) {
@@ -444,6 +462,10 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
         return scn;
     }
     
+    public boolean isReplayMutations() {
+        return replayMutations;
+    }
+    
     public int getMutateBatchSize() {
         return mutateBatchSize;
     }
@@ -493,7 +515,7 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     }
 
     private void closeStatements() throws SQLException {
-        List<SQLCloseable> statements = this.statements;
+        List<? extends PhoenixStatement> statements = this.statements;
         // create new list to prevent close of statements
         // from modifying this list.
         this.statements = Lists.newArrayList();
@@ -569,6 +591,10 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
         throw new SQLFeatureNotSupportedException();
     }
 
+    public List<PhoenixStatement> getStatements() {
+        return statements;
+    }
+    
     @Override
     public Statement createStatement() throws SQLException {
         PhoenixStatement statement = new PhoenixStatement(this);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 3606593..da216ed 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -225,8 +225,16 @@ public class IndexTool extends Configured implements Tool {
                 if (index.getIndexState().equals(PIndexState.BUILDING)) {
                     disableIndexes.add(index.getTableName().getString());
                     disabledPIndexes.add(index);
-                    if (minDisableTimestamp > 
index.getIndexDisableTimestamp()) {
-                        minDisableTimestamp = index.getIndexDisableTimestamp();
+                    // We need a way of differentiating the block writes to 
data table case from
+                    // the leave index active case. In either case, we need to 
know the time stamp
+                    // at which writes started failing so we can rebuild from 
that point. If we
+                    // keep the index active *and* have a positive 
INDEX_DISABLE_TIMESTAMP_BYTES,
+                    // then writes to the data table will be blocked (this is 
client side logic
+                    // and we can't change this in a minor release). So we use 
the sign of the
+                    // time stamp to differentiate.
+                    long indexDisableTimestamp = 
Math.abs(index.getIndexDisableTimestamp());
+                    if (minDisableTimestamp > indexDisableTimestamp) {
+                        minDisableTimestamp = indexDisableTimestamp;
                         indexWithMinDisableTimestamp = index;
                     }
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 463819c..97f431f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -68,7 +68,6 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Types;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -783,6 +782,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         HTableDescriptor tableDescriptor = (existingDesc != null) ? new 
HTableDescriptor(existingDesc)
         : new HTableDescriptor(
                 SchemaUtil.getPhysicalHBaseTableName(tableName, 
isNamespaceMapped, tableType).getBytes());
+        // By default, do not automatically rebuild/catch up an index on a 
write failure
         for (Entry<String,Object> entry : tableProps.entrySet()) {
             String key = entry.getKey();
             if (!TableProperty.isPhoenixTableProperty(key)) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index c01d11f..81d05bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -138,8 +138,9 @@ public interface QueryServices extends SQLCloseable {
     
     public static final String 
INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE = 
"phoenix.index.rebuild.batch.perTable";
 
-    // A master switch if to block writes when index build failed
+    // Block writes to data table when index write fails
     public static final String INDEX_FAILURE_BLOCK_WRITE = 
"phoenix.index.failure.block.write";
+    public static final String INDEX_FAILURE_DISABLE_INDEX = 
"phoenix.index.failure.disable.index";
 
     // Index will be partially re-built from index disable time stamp - 
following overlap time
     public static final String 
INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB =

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 1ddf7eb..5541dcf 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -168,7 +168,8 @@ public class QueryServicesOptions {
     public static final int DEFAULT_CLOCK_SKEW_INTERVAL = 2000;
     public static final boolean DEFAULT_INDEX_FAILURE_HANDLING_REBUILD = true; 
// auto rebuild on
     public static final boolean DEFAULT_INDEX_FAILURE_BLOCK_WRITE = false; 
-    public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 
10000; // 10 secs
+    public static final boolean DEFAULT_INDEX_FAILURE_DISABLE_INDEX = false; 
+    public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 
60000; // 60 secs
     public static final long 
DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 1; // 1 ms
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 50ff64b..8005e4a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -159,6 +159,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
@@ -230,7 +231,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableListMultimap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
@@ -1877,6 +1877,11 @@ public class MetaDataClient {
             if (tableType == PTableType.TABLE) {
                 Boolean isAppendOnlySchemaProp = (Boolean) 
TableProperty.APPEND_ONLY_SCHEMA.getValue(tableProps);
                 isAppendOnlySchema = isAppendOnlySchemaProp!=null ? 
isAppendOnlySchemaProp : false;
+                
+                // By default, do not rebuild indexes on write failure
+                if 
(tableProps.get(PhoenixIndexFailurePolicy.REBUILD_INDEX_ON_WRITE_FAILURE) == 
null) {
+                    
tableProps.put(PhoenixIndexFailurePolicy.REBUILD_INDEX_ON_WRITE_FAILURE, 
Boolean.FALSE);
+                }
             }
 
             // Can't set any of these on views or shared indexes on views

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 3e2c9b5..986debd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -713,17 +713,13 @@ public class IndexUtil {
             HConstants.NO_NONCE, HConstants.NO_NONCE);
     }
 
-    public static MetaDataMutationResult disableIndexWithTimestamp(String 
indexTableName, long minTimeStamp,
-            HTableInterface metaTable, boolean blockWriteRebuildIndex) throws 
ServiceException, Throwable {
+    public static MetaDataMutationResult setIndexDisableTimeStamp(String 
indexTableName, long minTimeStamp,
+            HTableInterface metaTable, PIndexState newState) throws 
ServiceException, Throwable {
         byte[] indexTableKey = 
SchemaUtil.getTableKeyFromFullName(indexTableName);
         // Mimic the Put that gets generated by the client on an update of the 
index state
         Put put = new Put(indexTableKey);
-        if (blockWriteRebuildIndex)
-            put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
-                    PIndexState.ACTIVE.getSerializedBytes());
-        else
-            put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
-                    PIndexState.DISABLE.getSerializedBytes());
+        put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
+                newState.getSerializedBytes());
         put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
                 PLong.INSTANCE.toBytes(minTimeStamp));
         put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
index 2cab6fb..76d454b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
@@ -133,6 +133,11 @@ public class JDBCUtil {
         return (scnStr == null ? null : Long.parseLong(scnStr));
     }
 
+    public static Long getReplayAt(String url, Properties info) throws 
SQLException {
+        String scnStr = findProperty(url, info, 
PhoenixRuntime.REPLAY_AT_ATTRIB);
+        return (scnStr == null ? null : Long.parseLong(scnStr));
+    }
+
     @Deprecated // use getMutateBatchSizeBytes
     public static int getMutateBatchSize(String url, Properties info, 
ReadOnlyProps props) throws SQLException {
         String batchSizeStr = findProperty(url, info, 
PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 167a35c..7f8731a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -124,6 +124,20 @@ public class PhoenixRuntime {
     public static final String CURRENT_SCN_ATTRIB = "CurrentSCN";
 
     /**
+     * Use this connection property to set the long time stamp value at
+     * which to replay DML statements after a write failure. The time
+     * stamp value must match the value returned by 
+     * {@link org.apache.phoenix.execute.CommitException#getServerTimestamp()}
+     * when the exception occurred. Used in conjunction with the 
+     * {@link 
org.apache.phoenix.hbase.index.write.LeaveIndexActiveFailurePolicy}
+     * index write failure policy to provide a means of the client replaying
+     * updates to ensure that secondary indexes are correctly caught up
+     * with any data updates when a write failure occurs. The updates
+     * should be replayed in ascending time stamp order.
+     */
+    public static final String REPLAY_AT_ATTRIB = "ReplayAt";
+
+    /**
      * Use this connection property to help with fairness of resource 
allocation
      * for the client and server. The value of the attribute determines the
      * bucket used to rollup resource usage for a particular 
tenant/organization. Each tenant

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 6e2bbba..607cbc5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -25,6 +25,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -44,6 +45,8 @@ public class ServerUtil {
     
     private static final String FORMAT = "ERROR %d (%s): %s";
     private static final Pattern PATTERN = Pattern.compile("ERROR (\\d+) 
\\((\\w+)\\): (.*)");
+    private static final Pattern PATTERN_FOR_TS = 
Pattern.compile(",serverTimestamp=(\\d+),");
+    private static final String FORMAT_FOR_TIMESTAMP = ",serverTimestamp=%d,";
     private static final Map<Class<? extends Exception>, SQLExceptionCode> 
errorcodeMap
         = new HashMap<Class<? extends Exception>, SQLExceptionCode>();
     static {
@@ -176,4 +179,38 @@ public class ServerUtil {
         }
         return getTableFromSingletonPool(env, tableName);
     }
+    
+    public static long parseServerTimestamp(Throwable t) {
+        while (t.getCause() != null) {
+            t = t.getCause();
+        }
+        return parseTimestampFromRemoteException(t);
+    }
+
+    private static long parseTimestampFromRemoteException(Throwable t) {
+        String message = t.getLocalizedMessage();
+        if (message != null) {
+            // If the message matches the standard pattern, recover the 
SQLException and throw it.
+            Matcher matcher = PATTERN_FOR_TS.matcher(t.getLocalizedMessage());
+            if (matcher.find()) {
+                String tsString = matcher.group(1);
+                if (tsString != null) {
+                    return Long.parseLong(tsString);
+                }
+            }
+        }
+        return HConstants.LATEST_TIMESTAMP;
+    }
+
+    public static DoNotRetryIOException wrapInDoNotRetryIOException(String 
msg, Throwable t, long timestamp) {
+        if (msg == null) {
+            msg = "";
+        }
+        if (t instanceof SQLException) {
+            msg = constructSQLErrorMessage((SQLException) t, msg);
+        }
+        msg += String.format(FORMAT_FOR_TIMESTAMP, timestamp);
+        return new DoNotRetryIOException(msg, t);
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
index 76ea933..8317b5c 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
@@ -38,11 +38,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.VersionInfo;
@@ -71,11 +73,15 @@ public class TestIndexWriter {
     assertNotNull(IndexWriter.getCommitter(env));
   }
 
+  @SuppressWarnings("deprecation")
   @Test
   public void getDefaultFailurePolicy() throws Exception {
     Configuration conf = new Configuration(false);
     RegionCoprocessorEnvironment env = 
Mockito.mock(RegionCoprocessorEnvironment.class);
+    Region region = Mockito.mock(Region.class);
+    Mockito.when(env.getRegion()).thenReturn(region);
     Mockito.when(env.getConfiguration()).thenReturn(conf);
+    Mockito.when(region.getTableDesc()).thenReturn(new HTableDescriptor());
     assertNotNull(IndexWriter.getFailurePolicy(env));
   }
 

Reply via email to