Repository: phoenix
Updated Branches:
  refs/heads/master 944bed735 -> 94601de5f


PHOENIX-4230 Write index updates in postBatchMutateIndispensably for 
transactional tables


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d13a2e5b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d13a2e5b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d13a2e5b

Branch: refs/heads/master
Commit: d13a2e5b27db8d22344442a9fc9890a37052f0f9
Parents: 944bed7
Author: James Taylor <[email protected]>
Authored: Mon Sep 25 18:52:48 2017 -0700
Committer: James Taylor <[email protected]>
Committed: Mon Sep 25 18:52:48 2017 -0700

----------------------------------------------------------------------
 .../index/PhoenixTransactionalIndexer.java      | 79 +++++++++++++++++---
 1 file changed, 70 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d13a2e5b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 5444360..969378d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -17,6 +17,11 @@
  */
 package org.apache.phoenix.index;
 
+import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_PAUSE;
+import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER;
+import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_PAUSE;
+import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -36,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
@@ -99,6 +105,15 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
 
     private static final Log LOG = 
LogFactory.getLog(PhoenixTransactionalIndexer.class);
 
+    // Hack to get around not being able to save any state between
+    // coprocessor calls. TODO: remove after HBASE-18127 when available
+    private static class BatchMutateContext {
+        public Collection<Pair<Mutation, byte[]>> indexUpdates = 
Collections.emptyList();
+    }
+    
+    private ThreadLocal<BatchMutateContext> batchMutateContext =
+            new ThreadLocal<BatchMutateContext>();
+    
     private PhoenixIndexCodec codec;
     private IndexWriter writer;
     private boolean stopped;
@@ -117,9 +132,15 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
          */
         clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
                 InterRegionServerIndexRpcControllerFactory.class, 
RpcControllerFactory.class);
+        // lower the number of rpc retries.  We inherit config from 
HConnectionManager#setServerSideHConnectionRetries,
+        // which by default uses a multiplier of 10.  That is too many retries 
for our synchronous index writes
+        clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+            env.getConfiguration().getInt(INDEX_WRITER_RPC_RETRIES_NUMBER,
+                DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER));
+        clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, 
env.getConfiguration()
+            .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE));
         DelegateRegionCoprocessorEnvironment indexWriterEnv = new 
DelegateRegionCoprocessorEnvironment(clonedConfig, env);
         // setup the actual index writer
-        // setup the actual index writer
         // For transactional tables, we keep the index active upon a write 
failure
         // since we have the all versus none behavior for transactions.
         this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), 
indexWriterEnv, serverName + "-tx-index-writer");
@@ -154,6 +175,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             
         };
     }
+    
     @Override
     public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
             MiniBatchOperationInProgress<Mutation> miniBatchOp) throws 
IOException {
@@ -164,6 +186,9 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             return;
         }
 
+        BatchMutateContext context = new BatchMutateContext();
+        setBatchMutateContext(c, context);
+        
         Map<String,byte[]> updateAttributes = m.getAttributesMap();
         PhoenixIndexMetaData indexMetaData = new 
PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
         byte[] txRollbackAttribute = 
m.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY);
@@ -176,15 +201,10 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             }
 
             // get the index updates for all elements in this batch
-            indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, 
getMutationIterator(miniBatchOp), txRollbackAttribute);
+            context.indexUpdates = getIndexUpdates(c.getEnvironment(), 
indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute);
 
             current.addTimelineAnnotation("Built index updates, doing 
preStep");
-            TracingUtils.addAnnotation(current, "index update count", 
indexUpdates.size());
-
-            // no index updates, so we are done
-            if (!indexUpdates.isEmpty()) {
-                this.writer.write(indexUpdates, true);
-            }
+            TracingUtils.addAnnotation(current, "index update count", 
context.indexUpdates.size());
         } catch (Throwable t) {
             String msg = "Failed to update index with entries:" + indexUpdates;
             LOG.error(msg, t);
@@ -192,7 +212,48 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         }
     }
 
-    public static void addMutation(Map<ImmutableBytesPtr, MultiMutation> 
mutations, ImmutableBytesPtr row, Mutation m) {
+    @Override
+    public void 
postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
+        MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean 
success) throws IOException {
+        BatchMutateContext context = getBatchMutateContext(c);
+        if (context == null || context.indexUpdates == null) {
+            return;
+        }
+        // get the current span, or just use a null-span to avoid a bunch of 
if statements
+        try (TraceScope scope = Trace.startSpan("Starting to write index 
updates")) {
+            Span current = scope.getSpan();
+            if (current == null) {
+                current = NullSpan.INSTANCE;
+            }
+
+            if (success) { // if miniBatchOp was successfully written, write 
index updates
+                if (!context.indexUpdates.isEmpty()) {
+                    this.writer.write(context.indexUpdates, true);
+                }
+                current.addTimelineAnnotation("Wrote index updates");
+            }
+        } catch (Throwable t) {
+            String msg = "Failed to write index updates:" + 
context.indexUpdates;
+            LOG.error(msg, t);
+            ServerUtil.throwIOException(msg, t);
+         } finally {
+             removeBatchMutateContext(c);
+         }
+    }
+
+    private void 
setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, 
BatchMutateContext context) {
+        this.batchMutateContext.set(context);
+    }
+    
+    private BatchMutateContext 
getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+        return this.batchMutateContext.get();
+    }
+    
+    private void 
removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+        this.batchMutateContext.remove();
+    }
+
+    private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> 
mutations, ImmutableBytesPtr row, Mutation m) {
         MultiMutation stored = mutations.get(row);
         // we haven't seen this row before, so add it
         if (stored == null) {

Reply via email to