Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 abb334db9 -> 30f6feb13
PHOENIX-4230 Write index updates in postBatchMutateIndispensably for transactional tables Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/30f6feb1 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/30f6feb1 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/30f6feb1 Branch: refs/heads/4.x-HBase-0.98 Commit: 30f6feb138effaf28ec5a6dacbc64218662d4561 Parents: abb334d Author: James Taylor <[email protected]> Authored: Mon Sep 25 18:52:48 2017 -0700 Committer: James Taylor <[email protected]> Committed: Tue Sep 26 10:26:39 2017 -0700 ---------------------------------------------------------------------- .../index/PhoenixTransactionalIndexer.java | 127 ++++++++++--------- 1 file changed, 64 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/30f6feb1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index a671eb1..4853cb1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -17,6 +17,11 @@ */ package org.apache.phoenix.index; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_PAUSE; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_PAUSE; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -29,7 +34,6 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,14 +41,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -55,7 +57,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.ScanRanges; @@ -104,11 +105,18 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { private static final Log LOG = LogFactory.getLog(PhoenixTransactionalIndexer.class); + // Hack to get around not being able to save any state between + // coprocessor calls. TODO: remove after HBASE-18127 when available + private static class BatchMutateContext { + public Collection<Pair<Mutation, byte[]>> indexUpdates = Collections.emptyList(); + } + + private ThreadLocal<BatchMutateContext> batchMutateContext = + new ThreadLocal<BatchMutateContext>(); + private PhoenixIndexCodec codec; private IndexWriter writer; private boolean stopped; - private Map<Long, Collection<Pair<Mutation, byte[]>>> localUpdates = - new ConcurrentHashMap<Long, Collection<Pair<Mutation, byte[]>>>(); @Override public void start(CoprocessorEnvironment e) throws IOException { @@ -124,9 +132,15 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { */ clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class); + // lower the number of rpc retries. We inherit config from HConnectionManager#setServerSideHConnectionRetries, + // which by default uses a multiplier of 10. That is too many retries for our synchronous index writes + clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + env.getConfiguration().getInt(INDEX_WRITER_RPC_RETRIES_NUMBER, + DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER)); + clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, env.getConfiguration() + .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE)); DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env); // setup the actual index writer - // setup the actual index writer // For transactional tables, we keep the index active upon a write failure // since we have the all versus none behavior for transactions. this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), indexWriterEnv, serverName + "-tx-index-writer"); @@ -161,6 +175,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { }; } + @Override public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { @@ -171,8 +186,10 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { return; } + BatchMutateContext context = new BatchMutateContext(); + setBatchMutateContext(c, context); + Map<String,byte[]> updateAttributes = m.getAttributesMap(); - String tableName = c.getEnvironment().getRegion().getTableDesc().getNameAsString(); PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes); byte[] txRollbackAttribute = m.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY); Collection<Pair<Mutation, byte[]>> indexUpdates = null; @@ -184,27 +201,10 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } // get the index updates for all elements in this batch - indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute); - Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator(); - List<Pair<Mutation, byte[]>> localIndexUpdates = new ArrayList<Pair<Mutation, byte[]>>(indexUpdates.size()); - while(indexUpdatesItr.hasNext()) { - Pair<Mutation, byte[]> next = indexUpdatesItr.next(); - if(tableName.equals(Bytes.toString(next.getSecond()))) { - localIndexUpdates.add(next); - indexUpdatesItr.remove(); - } - } - if(!localIndexUpdates.isEmpty()) { - byte[] bs = indexMetaData.getAttributes().get(PhoenixIndexCodec.INDEX_UUID); - localUpdates.put(Bytes.toLong(bs), localIndexUpdates); - } - current.addTimelineAnnotation("Built index updates, doing preStep"); - TracingUtils.addAnnotation(current, "index update count", indexUpdates.size()); + context.indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute); - // no index updates, so we are done - if (!indexUpdates.isEmpty()) { - this.writer.write(indexUpdates, false); - } + current.addTimelineAnnotation("Built index updates, doing preStep"); + TracingUtils.addAnnotation(current, "index update count", context.indexUpdates.size()); } catch (Throwable t) { String msg = "Failed to update index with entries:" + indexUpdates; LOG.error(msg, t); @@ -213,43 +213,44 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } @Override - public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, - Durability durability) throws IOException { - Map<String,byte[]> updateAttributes = put.getAttributesMap(); - PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(e.getEnvironment(),updateAttributes); - byte[] bs = indexMetaData.getAttributes().get(PhoenixIndexCodec.INDEX_UUID); - if (bs == null || localUpdates.get(Bytes.toLong(bs)) == null) { - super.prePut(e, put, edit, durability); - } else { - Collection<Pair<Mutation, byte[]>> localIndexUpdates = localUpdates.remove(Bytes.toLong(bs)); - try{ - this.writer.write(localIndexUpdates, true); - } catch (Throwable t) { - String msg = "Failed to update index with entries:" + localIndexUpdates; - LOG.error(msg, t); - ServerUtil.throwIOException(msg, t); - } + public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException { + BatchMutateContext context = getBatchMutateContext(c); + if (context == null || context.indexUpdates == null) { + return; } - } + // get the current span, or just use a null-span to avoid a bunch of if statements + try (TraceScope scope = Trace.startSpan("Starting to write index updates")) { + Span current = scope.getSpan(); + if (current == null) { + current = NullSpan.INSTANCE; + } - @Override - public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, - WALEdit edit, Durability durability) throws IOException { - Map<String,byte[]> updateAttributes = delete.getAttributesMap(); - PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(e.getEnvironment(),updateAttributes); - byte[] bs = indexMetaData.getAttributes().get(PhoenixIndexCodec.INDEX_UUID); - if (bs == null || localUpdates.get(Bytes.toLong(bs)) == null) { - super.postDelete(e, delete, edit, durability); - } else { - Collection<Pair<Mutation, byte[]>> localIndexUpdates = localUpdates.remove(Bytes.toLong(bs)); - try{ - this.writer.write(localIndexUpdates, true); - } catch (Throwable t) { - String msg = "Failed to update index with entries:" + localIndexUpdates; - LOG.error(msg, t); - ServerUtil.throwIOException(msg, t); + if (success) { // if miniBatchOp was successfully written, write index updates + if (!context.indexUpdates.isEmpty()) { + this.writer.write(context.indexUpdates, true); + } + current.addTimelineAnnotation("Wrote index updates"); } - } + } catch (Throwable t) { + String msg = "Failed to write index updates:" + context.indexUpdates; + LOG.error(msg, t); + ServerUtil.throwIOException(msg, t); + } finally { + removeBatchMutateContext(c); + } + } + + private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) { + this.batchMutateContext.set(context); + } + + private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) { + return this.batchMutateContext.get(); + } + + private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) { + this.batchMutateContext.remove(); } private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) {
