gokceni commented on a change in pull request #469: PHOENIX-5156 Consistent 
Global Indexes for Non-Transactional Tables
URL: https://github.com/apache/phoenix/pull/469#discussion_r280635199
 
 

 ##########
 File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java
 ##########
 @@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import 
org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
+import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
+import org.apache.phoenix.util.IndexUtil;
+
+import com.google.common.collect.Multimap;
+
+/**
+ * Abstract class to Write index updates to the index tables in parallel.
+ */
+public abstract class AbstractParallelWriterIndexCommitter implements 
IndexCommitter {
+
+    public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = 
"index.writer.threads.max";
+    private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
+    public static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = 
"index.writer.threads.keepalivetime";
+    private static final Log LOG = 
LogFactory.getLog(AbstractParallelWriterIndexCommitter.class);
+
+    protected HTableFactory retryingFactory;
+    protected HTableFactory noRetriesfactory;
+    protected Stoppable stopped;
+    protected QuickFailingTaskRunner pool;
+    protected KeyValueBuilder kvBuilder;
+    protected RegionCoprocessorEnvironment env;
+    protected TaskBatch<Void> tasks;
+
+    public AbstractParallelWriterIndexCommitter() {}
+
+    // For testing
+    public AbstractParallelWriterIndexCommitter(String hbaseVersion) {
+        kvBuilder = KeyValueBuilder.get(hbaseVersion);
+    }
+
+    @Override
+    public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, 
String name) {
+        this.env = env;
+        Configuration conf = env.getConfiguration();
+        setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
+                ThreadPoolManager.getExecutor(
+                        new ThreadPoolBuilder(name, 
conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
+                                
DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
+                                INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), 
parent, env);
+        this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
+    }
+
+    /**
+     * Setup <tt>this</tt>.
+     * <p>
+     * Exposed for TESTING
+     */
+    public void setup(HTableFactory factory, ExecutorService pool,Stoppable 
stop, RegionCoprocessorEnvironment env) {
+        this.retryingFactory = factory;
+        this.noRetriesfactory = 
IndexWriterUtils.getNoRetriesHTableFactory(env);
+        this.pool = new QuickFailingTaskRunner(pool);
+        this.stopped = stop;
+    }
+
+    abstract HTableFactory getHTableFactory (int clientVersion);
+
+    @Override
+    public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, 
final boolean allowLocalUpdates, final int clientVersion) throws 
SingleIndexWriteFailureException {
+        /*
+         * This bit here is a little odd, so let's explain what's going on. 
Basically, we want to do the writes in
+         * parallel to each index table, so each table gets its own task and 
is submitted to the pool. Where it gets
+         * tricky is that we want to block the calling thread until one of two 
things happens: (1) all index tables get
+         * successfully updated, or (2) any one of the index table writes 
fail; in either case, we should return as
+         * quickly as possible. We get a little more complicated in that if we 
do get a single failure, but any of the
+         * index writes hasn't been started yet (its been queued up, but not 
submitted to a thread) we want to that task
+         * to fail immediately as we know that write is a waste and will need 
to be replayed anyways.
+         */
+
+        Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = 
toWrite.asMap().entrySet();
+        tasks = new TaskBatch<Void>(entries.size());
+        for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : 
entries) {
+            // get the mutations for each table. We leak the implementation 
here a little bit to save
+            // doing a complete copy over of all the index update for each 
table.
+            final List<Mutation> mutations = 
kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue());
+            final HTableInterfaceReference tableReference = entry.getKey();
+                       if (env != null
+                                       && !allowLocalUpdates
+                                       && tableReference.getTableName().equals(
+                                                       
env.getRegion().getTableDescriptor().getTableName().getNameAsString())) {
+                               continue;
+                       }
+            /*
+             * Write a batch of index updates to an index table. This 
operation stops (is cancelable) via two
+             * mechanisms: (1) setting aborted or stopped on the IndexWriter 
or, (2) interrupting the running thread.
+             * The former will only work if we are not in the midst of writing 
the current batch to the table, though we
+             * do check these status variables before starting and before 
writing the batch. The latter usage,
+             * interrupting the thread, will work in the previous situations 
as was at some points while writing the
+             * batch, depending on the underlying writer implementation 
(HTableInterface#batch is blocking, but doesn't
+             * elaborate when is supports an interrupt).
+             */
+            tasks.add(new Task<Void>() {
+
+                /**
+                 * Do the actual write to the primary table.
+                 * 
+                 * @return
+                 */
+                @SuppressWarnings("deprecation")
+                @Override
+                public Void call() throws Exception {
+                    Table table = null;
+                    // this may have been queued, so another task infront of 
us may have failed, so we should
+                    // early exit, if that's the case
+                    throwFailureIfDone();
+
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Writing index update:" + mutations + " to 
table: " + tableReference);
+                    }
+                    try {
+                        if (allowLocalUpdates
+                                && env != null
+                                && tableReference.getTableName().equals(
+                                    
env.getRegion().getTableDescriptor().getTableName().getNameAsString())) {
+                            try {
+                                throwFailureIfDone();
+                                IndexUtil.writeLocalUpdates(env.getRegion(), 
mutations, true);
+                                return null;
+                            } catch (IOException ignored) {
+                                // when it's failed we fall back to the 
standard & slow way
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("indexRegion.batchMutate failed 
and fall back to HTable.batch(). Got error="
+                                            + ignored);
+                                }
+                            }
+                        }
+                     // if the client can retry index writes, then we don't 
need to retry here
 
 Review comment:
   nit: spacing

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to