Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 abb334db9 -> 30f6feb13


PHOENIX-4230 Write index updates in postBatchMutateIndispensably for 
transactional tables


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/30f6feb1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/30f6feb1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/30f6feb1

Branch: refs/heads/4.x-HBase-0.98
Commit: 30f6feb138effaf28ec5a6dacbc64218662d4561
Parents: abb334d
Author: James Taylor <[email protected]>
Authored: Mon Sep 25 18:52:48 2017 -0700
Committer: James Taylor <[email protected]>
Committed: Tue Sep 26 10:26:39 2017 -0700

----------------------------------------------------------------------
 .../index/PhoenixTransactionalIndexer.java      | 127 ++++++++++---------
 1 file changed, 64 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/30f6feb1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index a671eb1..4853cb1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -17,6 +17,11 @@
  */
 package org.apache.phoenix.index;
 
+import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_PAUSE;
+import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER;
+import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_PAUSE;
+import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -29,7 +34,6 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,14 +41,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -55,7 +57,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import 
org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ScanRanges;
@@ -104,11 +105,18 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
 
     private static final Log LOG = 
LogFactory.getLog(PhoenixTransactionalIndexer.class);
 
+    // Hack to get around not being able to save any state between
+    // coprocessor calls. TODO: remove after HBASE-18127 when available
+    private static class BatchMutateContext {
+        public Collection<Pair<Mutation, byte[]>> indexUpdates = 
Collections.emptyList();
+    }
+    
+    private ThreadLocal<BatchMutateContext> batchMutateContext =
+            new ThreadLocal<BatchMutateContext>();
+    
     private PhoenixIndexCodec codec;
     private IndexWriter writer;
     private boolean stopped;
-    private Map<Long, Collection<Pair<Mutation, byte[]>>> localUpdates =
-            new ConcurrentHashMap<Long, Collection<Pair<Mutation, byte[]>>>();
 
     @Override
     public void start(CoprocessorEnvironment e) throws IOException {
@@ -124,9 +132,15 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
          */
         clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
                 InterRegionServerIndexRpcControllerFactory.class, 
RpcControllerFactory.class);
+        // lower the number of rpc retries.  We inherit config from 
HConnectionManager#setServerSideHConnectionRetries,
+        // which by default uses a multiplier of 10.  That is too many retries 
for our synchronous index writes
+        clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+            env.getConfiguration().getInt(INDEX_WRITER_RPC_RETRIES_NUMBER,
+                DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER));
+        clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, 
env.getConfiguration()
+            .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE));
         DelegateRegionCoprocessorEnvironment indexWriterEnv = new 
DelegateRegionCoprocessorEnvironment(clonedConfig, env);
         // setup the actual index writer
-        // setup the actual index writer
         // For transactional tables, we keep the index active upon a write 
failure
         // since we have the all versus none behavior for transactions.
         this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), 
indexWriterEnv, serverName + "-tx-index-writer");
@@ -161,6 +175,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             
         };
     }
+    
     @Override
     public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
             MiniBatchOperationInProgress<Mutation> miniBatchOp) throws 
IOException {
@@ -171,8 +186,10 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             return;
         }
 
+        BatchMutateContext context = new BatchMutateContext();
+        setBatchMutateContext(c, context);
+        
         Map<String,byte[]> updateAttributes = m.getAttributesMap();
-        String tableName = 
c.getEnvironment().getRegion().getTableDesc().getNameAsString();
         PhoenixIndexMetaData indexMetaData = new 
PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
         byte[] txRollbackAttribute = 
m.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY);
         Collection<Pair<Mutation, byte[]>> indexUpdates = null;
@@ -184,27 +201,10 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             }
 
             // get the index updates for all elements in this batch
-            indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, 
getMutationIterator(miniBatchOp), txRollbackAttribute);
-            Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = 
indexUpdates.iterator();
-            List<Pair<Mutation, byte[]>> localIndexUpdates = new 
ArrayList<Pair<Mutation, byte[]>>(indexUpdates.size());
-            while(indexUpdatesItr.hasNext()) {
-                Pair<Mutation, byte[]> next = indexUpdatesItr.next();
-                if(tableName.equals(Bytes.toString(next.getSecond()))) {
-                    localIndexUpdates.add(next);
-                    indexUpdatesItr.remove();
-                }
-            }
-            if(!localIndexUpdates.isEmpty()) {
-                byte[] bs = 
indexMetaData.getAttributes().get(PhoenixIndexCodec.INDEX_UUID); 
-                localUpdates.put(Bytes.toLong(bs), localIndexUpdates);
-            }
-            current.addTimelineAnnotation("Built index updates, doing 
preStep");
-            TracingUtils.addAnnotation(current, "index update count", 
indexUpdates.size());
+            context.indexUpdates = getIndexUpdates(c.getEnvironment(), 
indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute);
 
-            // no index updates, so we are done
-            if (!indexUpdates.isEmpty()) {
-                this.writer.write(indexUpdates, false);
-            }
+            current.addTimelineAnnotation("Built index updates, doing 
preStep");
+            TracingUtils.addAnnotation(current, "index update count", 
context.indexUpdates.size());
         } catch (Throwable t) {
             String msg = "Failed to update index with entries:" + indexUpdates;
             LOG.error(msg, t);
@@ -213,43 +213,44 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
     }
 
     @Override
-    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put 
put, WALEdit edit,
-            Durability durability) throws IOException {
-        Map<String,byte[]> updateAttributes = put.getAttributesMap();
-        PhoenixIndexMetaData indexMetaData = new 
PhoenixIndexMetaData(e.getEnvironment(),updateAttributes);
-        byte[] bs = 
indexMetaData.getAttributes().get(PhoenixIndexCodec.INDEX_UUID);
-        if (bs == null || localUpdates.get(Bytes.toLong(bs)) == null) {
-            super.prePut(e, put, edit, durability);
-        } else {
-            Collection<Pair<Mutation, byte[]>> localIndexUpdates = 
localUpdates.remove(Bytes.toLong(bs));
-            try{
-                this.writer.write(localIndexUpdates, true);
-            } catch (Throwable t) {
-                String msg = "Failed to update index with entries:" + 
localIndexUpdates;
-                LOG.error(msg, t);
-                ServerUtil.throwIOException(msg, t);
-            }
+    public void 
postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
+        MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean 
success) throws IOException {
+        BatchMutateContext context = getBatchMutateContext(c);
+        if (context == null || context.indexUpdates == null) {
+            return;
         }
-    }
+        // get the current span, or just use a null-span to avoid a bunch of 
if statements
+        try (TraceScope scope = Trace.startSpan("Starting to write index 
updates")) {
+            Span current = scope.getSpan();
+            if (current == null) {
+                current = NullSpan.INSTANCE;
+            }
 
-    @Override
-    public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, 
Delete delete,
-            WALEdit edit, Durability durability) throws IOException {
-        Map<String,byte[]> updateAttributes = delete.getAttributesMap();
-        PhoenixIndexMetaData indexMetaData = new 
PhoenixIndexMetaData(e.getEnvironment(),updateAttributes);
-        byte[] bs = 
indexMetaData.getAttributes().get(PhoenixIndexCodec.INDEX_UUID);
-        if (bs == null || localUpdates.get(Bytes.toLong(bs)) == null) {
-            super.postDelete(e, delete, edit, durability);
-        } else {
-            Collection<Pair<Mutation, byte[]>> localIndexUpdates = 
localUpdates.remove(Bytes.toLong(bs));
-            try{
-                this.writer.write(localIndexUpdates, true);
-            } catch (Throwable t) {
-                String msg = "Failed to update index with entries:" + 
localIndexUpdates;
-                LOG.error(msg, t);
-                ServerUtil.throwIOException(msg, t);
+            if (success) { // if miniBatchOp was successfully written, write 
index updates
+                if (!context.indexUpdates.isEmpty()) {
+                    this.writer.write(context.indexUpdates, true);
+                }
+                current.addTimelineAnnotation("Wrote index updates");
             }
-        }
+        } catch (Throwable t) {
+            String msg = "Failed to write index updates:" + 
context.indexUpdates;
+            LOG.error(msg, t);
+            ServerUtil.throwIOException(msg, t);
+         } finally {
+             removeBatchMutateContext(c);
+         }
+    }
+
+    private void 
setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, 
BatchMutateContext context) {
+        this.batchMutateContext.set(context);
+    }
+    
+    private BatchMutateContext 
getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+        return this.batchMutateContext.get();
+    }
+    
+    private void 
removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+        this.batchMutateContext.remove();
     }
 
     private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> 
mutations, ImmutableBytesPtr row, Mutation m) {

Reply via email to