Repository: phoenix Updated Branches: refs/heads/master 944bed735 -> 94601de5f
PHOENIX-4230 Write index updates in postBatchMutateIndispensably for transactional tables Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d13a2e5b Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d13a2e5b Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d13a2e5b Branch: refs/heads/master Commit: d13a2e5b27db8d22344442a9fc9890a37052f0f9 Parents: 944bed7 Author: James Taylor <[email protected]> Authored: Mon Sep 25 18:52:48 2017 -0700 Committer: James Taylor <[email protected]> Committed: Mon Sep 25 18:52:48 2017 -0700 ---------------------------------------------------------------------- .../index/PhoenixTransactionalIndexer.java | 79 +++++++++++++++++--- 1 file changed, 70 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d13a2e5b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 5444360..969378d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -17,6 +17,11 @@ */ package org.apache.phoenix.index; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_PAUSE; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_PAUSE; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -36,6 +41,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; @@ -99,6 +105,15 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { private static final Log LOG = LogFactory.getLog(PhoenixTransactionalIndexer.class); + // Hack to get around not being able to save any state between + // coprocessor calls. TODO: remove after HBASE-18127 when available + private static class BatchMutateContext { + public Collection<Pair<Mutation, byte[]>> indexUpdates = Collections.emptyList(); + } + + private ThreadLocal<BatchMutateContext> batchMutateContext = + new ThreadLocal<BatchMutateContext>(); + private PhoenixIndexCodec codec; private IndexWriter writer; private boolean stopped; @@ -117,9 +132,15 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { */ clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class); + // lower the number of rpc retries. We inherit config from HConnectionManager#setServerSideHConnectionRetries, + // which by default uses a multiplier of 10. That is too many retries for our synchronous index writes + clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + env.getConfiguration().getInt(INDEX_WRITER_RPC_RETRIES_NUMBER, + DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER)); + clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, env.getConfiguration() + .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE)); DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env); // setup the actual index writer - // setup the actual index writer // For transactional tables, we keep the index active upon a write failure // since we have the all versus none behavior for transactions. this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), indexWriterEnv, serverName + "-tx-index-writer"); @@ -154,6 +175,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { }; } + @Override public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { @@ -164,6 +186,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { return; } + BatchMutateContext context = new BatchMutateContext(); + setBatchMutateContext(c, context); + Map<String,byte[]> updateAttributes = m.getAttributesMap(); PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes); byte[] txRollbackAttribute = m.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY); @@ -176,15 +201,10 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } // get the index updates for all elements in this batch - indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute); + context.indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute); current.addTimelineAnnotation("Built index updates, doing preStep"); - TracingUtils.addAnnotation(current, "index update count", indexUpdates.size()); - - // no index updates, so we are done - if (!indexUpdates.isEmpty()) { - this.writer.write(indexUpdates, true); - } + TracingUtils.addAnnotation(current, "index update count", context.indexUpdates.size()); } catch (Throwable t) { String msg = "Failed to update index with entries:" + indexUpdates; LOG.error(msg, t); @@ -192,7 +212,48 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } } - public static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) { + @Override + public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException { + BatchMutateContext context = getBatchMutateContext(c); + if (context == null || context.indexUpdates == null) { + return; + } + // get the current span, or just use a null-span to avoid a bunch of if statements + try (TraceScope scope = Trace.startSpan("Starting to write index updates")) { + Span current = scope.getSpan(); + if (current == null) { + current = NullSpan.INSTANCE; + } + + if (success) { // if miniBatchOp was successfully written, write index updates + if (!context.indexUpdates.isEmpty()) { + this.writer.write(context.indexUpdates, true); + } + current.addTimelineAnnotation("Wrote index updates"); + } + } catch (Throwable t) { + String msg = "Failed to write index updates:" + context.indexUpdates; + LOG.error(msg, t); + ServerUtil.throwIOException(msg, t); + } finally { + removeBatchMutateContext(c); + } + } + + private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) { + this.batchMutateContext.set(context); + } + + private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) { + return this.batchMutateContext.get(); + } + + private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) { + this.batchMutateContext.remove(); + } + + private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) { MultiMutation stored = mutations.get(row); // we haven't seen this row before, so add it if (stored == null) {
