Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 c7bbdf0e8 -> b43dc09f0
Revert "PHOENIX-3815 Only disable indexes on which write failures occurred (Vincent Poon)" This reverts commit c7bbdf0e8b35fba3d8caff0409712e842acfb042. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b43dc09f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b43dc09f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b43dc09f Branch: refs/heads/4.x-HBase-0.98 Commit: b43dc09f0c9e5046f2121f10a33f5dace3be2925 Parents: c7bbdf0 Author: James Taylor <[email protected]> Authored: Wed Aug 30 17:59:41 2017 -0700 Committer: James Taylor <[email protected]> Committed: Wed Aug 30 17:59:41 2017 -0700 ---------------------------------------------------------------------- .../end2end/index/MutableIndexFailureIT.java | 25 +- .../phoenix/hbase/index/write/IndexWriter.java | 4 +- .../write/ParallelWriterIndexCommitter.java | 238 ++++++++++++++++++ .../hbase/index/write/RecoveryIndexWriter.java | 1 + .../TrackingParallelWriterIndexCommitter.java | 241 ------------------- .../recovery/StoreFailuresInCachePolicy.java | 1 - .../TrackingParallelWriterIndexCommitter.java | 234 ++++++++++++++++++ .../index/PhoenixIndexFailurePolicy.java | 15 -- .../hbase/index/write/TestIndexWriter.java | 6 +- .../index/write/TestParalleIndexWriter.java | 4 +- .../write/TestParalleWriterIndexCommitter.java | 4 +- 11 files changed, 489 insertions(+), 284 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b43dc09f/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index df295fa..8bab163 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -46,9 +46,7 @@ import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.execute.CommitException; -import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.hbase.index.write.IndexWriterUtils; -import org.apache.phoenix.hbase.index.write.TrackingParallelWriterIndexCommitter; import org.apache.phoenix.index.PhoenixIndexFailurePolicy; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryConstants; @@ -175,7 +173,7 @@ public class MutableIndexFailureIT extends BaseTest { @Test public void testIndexWriteFailure() throws Exception { String secondIndexName = "B_" + FailingRegionObserver.FAIL_INDEX_NAME; - String thirdIndexName = "C_" + "IDX"; +// String thirdIndexName = "C_" + INDEX_NAME; // String thirdFullIndexName = SchemaUtil.getTableName(schema, thirdIndexName); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, String.valueOf(isNamespaceMapped)); @@ -199,8 +197,8 @@ public class MutableIndexFailureIT extends BaseTest { // check the drop index. conn.createStatement().execute( "CREATE " + (!localIndex ? "LOCAL " : "") + " INDEX " + secondIndexName + " ON " + fullTableName + " (v2) INCLUDE (v1)"); - conn.createStatement().execute( - "CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + thirdIndexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); +// conn.createStatement().execute( +// "CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + thirdIndexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); query = "SELECT * FROM " + fullIndexName; rs = conn.createStatement().executeQuery(query); @@ -250,10 +248,6 @@ public class MutableIndexFailureIT extends BaseTest { } else { String indexState = rs.getString("INDEX_STATE"); assertTrue(PIndexState.DISABLE.toString().equals(indexState) || PIndexState.INACTIVE.toString().equals(indexState)); - // non-failing index should remain active - ResultSet thirdRs = conn.createStatement().executeQuery(getSysCatQuery(thirdIndexName)); - assertTrue(thirdRs.next()); - assertEquals(PIndexState.ACTIVE.getSerializedValue(), thirdRs.getString(1)); } assertFalse(rs.next()); @@ -314,7 +308,10 @@ public class MutableIndexFailureIT extends BaseTest { waitForIndexRebuild(conn, fullIndexName, PIndexState.DISABLE); // verify that the index was marked as disabled and the index disable // timestamp set to 0 - String q = getSysCatQuery(indexName); + String q = + "SELECT INDEX_STATE, INDEX_DISABLE_TIMESTAMP FROM SYSTEM.CATALOG WHERE TABLE_SCHEM = '" + + schema + "' AND TABLE_NAME = '" + indexName + "'" + + " AND COLUMN_NAME IS NULL AND COLUMN_FAMILY IS NULL"; try (ResultSet r = conn.createStatement().executeQuery(q)) { assertTrue(r.next()); assertEquals(PIndexState.DISABLE.getSerializedValue(), r.getString(1)); @@ -328,14 +325,6 @@ public class MutableIndexFailureIT extends BaseTest { } } - private String getSysCatQuery(String iName) { - String q = - "SELECT INDEX_STATE, INDEX_DISABLE_TIMESTAMP FROM SYSTEM.CATALOG WHERE TABLE_SCHEM = '" - + schema + "' AND TABLE_NAME = '" + iName + "'" - + " AND COLUMN_NAME IS NULL AND COLUMN_FAMILY IS NULL"; - return q; - } - private void waitForIndexRebuild(Connection conn, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException { if (!transactional) { TestUtil.waitForIndexRebuild(conn, fullIndexName, expectedIndexState); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b43dc09f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java index d8f5527..a037e92 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java @@ -47,7 +47,7 @@ import com.google.common.collect.Multimap; public class IndexWriter implements Stoppable { private static final Log LOG = LogFactory.getLog(IndexWriter.class); - public static final String INDEX_COMMITTER_CONF_KEY = "index.writer.commiter.class"; + private static final String INDEX_COMMITTER_CONF_KEY = "index.writer.commiter.class"; public static final String INDEX_FAILURE_POLICY_CONF_KEY = "index.writer.failurepolicy.class"; private AtomicBoolean stopped = new AtomicBoolean(false); private IndexCommitter writer; @@ -69,7 +69,7 @@ public class IndexWriter implements Stoppable { Configuration conf = env.getConfiguration(); try { IndexCommitter committer = - conf.getClass(INDEX_COMMITTER_CONF_KEY, TrackingParallelWriterIndexCommitter.class, + conf.getClass(INDEX_COMMITTER_CONF_KEY, ParallelWriterIndexCommitter.class, IndexCommitter.class).newInstance(); return committer; } catch (InstantiationException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b43dc09f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java new file mode 100644 index 0000000..59a7bd6 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.apache.phoenix.hbase.index.write; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException; +import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure; +import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner; +import org.apache.phoenix.hbase.index.parallel.Task; +import org.apache.phoenix.hbase.index.parallel.TaskBatch; +import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder; +import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager; +import org.apache.phoenix.hbase.index.table.HTableFactory; +import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; +import org.apache.phoenix.hbase.index.util.KeyValueBuilder; +import org.apache.phoenix.util.IndexUtil; + +import com.google.common.collect.Multimap; + +/** + * Write index updates to the index tables in parallel. We attempt to early exit from the writes if any of the index + * updates fails. Completion is determined by the following criteria: * + * <ol> + * <li>All index writes have returned, OR</li> + * <li>Any single index write has failed</li> + * </ol> + * We attempt to quickly determine if any write has failed and not write to the remaining indexes to ensure a timely + * recovery of the failed index writes. + */ +public class ParallelWriterIndexCommitter implements IndexCommitter { + + public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max"; + private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10; + public static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime"; + private static final Log LOG = LogFactory.getLog(ParallelWriterIndexCommitter.class); + + private HTableFactory factory; + private Stoppable stopped; + private QuickFailingTaskRunner pool; + private KeyValueBuilder kvBuilder; + private RegionCoprocessorEnvironment env; + + public ParallelWriterIndexCommitter() {} + + // For testing + public ParallelWriterIndexCommitter(String hbaseVersion) { + kvBuilder = KeyValueBuilder.get(hbaseVersion); + } + + @Override + public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) { + this.env = env; + Configuration conf = env.getConfiguration(); + setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env), + ThreadPoolManager.getExecutor( + new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, + DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout( + INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env); + this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion()); + } + + /** + * Setup <tt>this</tt>. + * <p> + * Exposed for TESTING + */ + void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, RegionCoprocessorEnvironment env) { + this.factory = factory; + this.pool = new QuickFailingTaskRunner(pool); + this.stopped = stop; + } + + @Override + public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws SingleIndexWriteFailureException { + /* + * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the writes in + * parallel to each index table, so each table gets its own task and is submitted to the pool. Where it gets + * tricky is that we want to block the calling thread until one of two things happens: (1) all index tables get + * successfully updated, or (2) any one of the index table writes fail; in either case, we should return as + * quickly as possible. We get a little more complicated in that if we do get a single failure, but any of the + * index writes hasn't been started yet (its been queued up, but not submitted to a thread) we want to that task + * to fail immediately as we know that write is a waste and will need to be replayed anyways. + */ + + Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet(); + TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size()); + for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) { + // get the mutations for each table. We leak the implementation here a little bit to save + // doing a complete copy over of all the index update for each table. + final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue()); + final HTableInterfaceReference tableReference = entry.getKey(); + if (env != null + && !allowLocalUpdates + && tableReference.getTableName().equals( + env.getRegion().getTableDesc().getNameAsString())) { + continue; + } + /* + * Write a batch of index updates to an index table. This operation stops (is cancelable) via two + * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread. + * The former will only work if we are not in the midst of writing the current batch to the table, though we + * do check these status variables before starting and before writing the batch. The latter usage, + * interrupting the thread, will work in the previous situations as was at some points while writing the + * batch, depending on the underlying writer implementation (HTableInterface#batch is blocking, but doesn't + * elaborate when is supports an interrupt). + */ + tasks.add(new Task<Void>() { + + /** + * Do the actual write to the primary table. + * + * @return + */ + @SuppressWarnings("deprecation") + @Override + public Void call() throws Exception { + // this may have been queued, so another task infront of us may have failed, so we should + // early exit, if that's the case + throwFailureIfDone(); + + if (LOG.isTraceEnabled()) { + LOG.trace("Writing index update:" + mutations + " to table: " + tableReference); + } + HTableInterface table = null; + try { + if (allowLocalUpdates + && env != null + && tableReference.getTableName().equals( + env.getRegion().getTableDesc().getNameAsString())) { + try { + throwFailureIfDone(); + IndexUtil.writeLocalUpdates(env.getRegion(), mutations, true); + return null; + } catch (IOException ignord) { + // when it's failed we fall back to the standard & slow way + if (LOG.isDebugEnabled()) { + LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error=" + + ignord); + } + } + } + table = factory.getTable(tableReference.get()); + throwFailureIfDone(); + table.batch(mutations); + } catch (SingleIndexWriteFailureException e) { + throw e; + } catch (IOException e) { + throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e); + } catch (InterruptedException e) { + // reset the interrupt status on the thread + Thread.currentThread().interrupt(); + throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e); + } + finally{ + if (table != null) { + table.close(); + } + } + return null; + } + + private void throwFailureIfDone() throws SingleIndexWriteFailureException { + if (this.isBatchFailed() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException( + "Pool closed, not attempting to write to the index!", null); } + + } + }); + } + + // actually submit the tasks to the pool and wait for them to finish/fail + try { + pool.submitUninterruptible(tasks); + } catch (EarlyExitFailure e) { + propagateFailure(e); + } catch (ExecutionException e) { + LOG.error("Found a failed index update!"); + propagateFailure(e.getCause()); + } + + } + + private void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException { + try { + throw throwable; + } catch (SingleIndexWriteFailureException e1) { + throw e1; + } catch (Throwable e1) { + throw new SingleIndexWriteFailureException("Got an abort notification while writing to the index!", e1); + } + + } + + /** + * {@inheritDoc} + * <p> + * This method should only be called <b>once</b>. Stopped state ({@link #isStopped()}) is managed by the external + * {@link Stoppable}. This call does not delegate the stop down to the {@link Stoppable} passed in the constructor. + * + * @param why + * the reason for stopping + */ + @Override + public void stop(String why) { + LOG.info("Shutting down " + this.getClass().getSimpleName() + " because " + why); + this.pool.stop(why); + this.factory.shutdown(); + } + + @Override + public boolean isStopped() { + return this.stopped.isStopped(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b43dc09f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java index e340784..be542bb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.write.recovery.TrackingParallelWriterIndexCommitter; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b43dc09f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java deleted file mode 100644 index 154a90c..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by - * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language - * governing permissions and limitations under the License. - */ -package org.apache.phoenix.hbase.index.write; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.phoenix.hbase.index.CapturingAbortable; -import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException; -import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException; -import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure; -import org.apache.phoenix.hbase.index.parallel.Task; -import org.apache.phoenix.hbase.index.parallel.TaskBatch; -import org.apache.phoenix.hbase.index.parallel.TaskRunner; -import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder; -import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager; -import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner; -import org.apache.phoenix.hbase.index.table.HTableFactory; -import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; -import org.apache.phoenix.hbase.index.util.KeyValueBuilder; -import org.apache.phoenix.util.IndexUtil; - -import com.google.common.collect.Multimap; - -/** - * Like the {@link ParallelWriterIndexCommitter}, but blocks until all writes have attempted to allow the caller to - * retrieve the failed and succeeded index updates. Therefore, this class will be a lot slower, in the face of failures, - * when compared to the {@link ParallelWriterIndexCommitter} (though as fast for writes), so it should be used only when - * you need to at least attempt all writes and know their result; for instance, this is fine for doing WAL recovery - - * it's not a performance intensive situation and we want to limit the the edits we need to retry. - * <p> - * On failure to {@link #write(Multimap)}, we return a {@link MultiIndexWriteFailureException} that contains the list of - * {@link HTableInterfaceReference} that didn't complete successfully. - * <p> - * Failures to write to the index can happen several different ways: - * <ol> - * <li><tt>this</tt> is {@link #stop(String) stopped} or aborted (via the passed {@link Abortable}. This causing any - * pending tasks to fail whatever they are doing as fast as possible. Any writes that have not begun are not even - * attempted and marked as failures.</li> - * <li>A batch write fails. This is the generic HBase write failure - it may occur because the index table is not - * available, .META. or -ROOT- is unavailable, or any other (of many) possible HBase exceptions.</li> - * </ol> - * Regardless of how the write fails, we still wait for all writes to complete before passing the failure back to the - * client. - */ -public class TrackingParallelWriterIndexCommitter implements IndexCommitter { - private static final Log LOG = LogFactory.getLog(TrackingParallelWriterIndexCommitter.class); - - public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max"; - private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10; - private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime"; - - private TaskRunner pool; - private HTableFactory factory; - private CapturingAbortable abortable; - private Stoppable stopped; - private RegionCoprocessorEnvironment env; - private KeyValueBuilder kvBuilder; - - // for testing - public TrackingParallelWriterIndexCommitter(String hbaseVersion) { - kvBuilder = KeyValueBuilder.get(hbaseVersion); - } - - public TrackingParallelWriterIndexCommitter() { - } - - @Override - public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) { - this.env = env; - Configuration conf = env.getConfiguration(); - setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env), - ThreadPoolManager.getExecutor( - new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, - DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout( - INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env); - this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion()); - } - - /** - * Setup <tt>this</tt>. - * <p> - * Exposed for TESTING - */ - void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, - RegionCoprocessorEnvironment env) { - this.pool = new WaitForCompletionTaskRunner(pool); - this.factory = factory; - this.abortable = new CapturingAbortable(abortable); - this.stopped = stop; - } - - @Override - public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws MultiIndexWriteFailureException { - Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet(); - TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size()); - List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size()); - for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) { - // get the mutations for each table. We leak the implementation here a little bit to save - // doing a complete copy over of all the index update for each table. - final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue()); - // track each reference so we can get at it easily later, when determing failures - final HTableInterfaceReference tableReference = entry.getKey(); - final RegionCoprocessorEnvironment env = this.env; - if (env != null - && !allowLocalUpdates - && tableReference.getTableName().equals( - env.getRegion().getTableDesc().getNameAsString())) { - continue; - } - tables.add(tableReference); - - /* - * Write a batch of index updates to an index table. This operation stops (is cancelable) via two - * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread. - * The former will only work if we are not in the midst of writing the current batch to the table, though we - * do check these status variables before starting and before writing the batch. The latter usage, - * interrupting the thread, will work in the previous situations as was at some points while writing the - * batch, depending on the underlying writer implementation (HTableInterface#batch is blocking, but doesn't - * elaborate when is supports an interrupt). - */ - tasks.add(new Task<Boolean>() { - - /** - * Do the actual write to the primary table. - */ - @SuppressWarnings("deprecation") - @Override - public Boolean call() throws Exception { - HTableInterface table = null; - try { - // this may have been queued, but there was an abort/stop so we try to early exit - throwFailureIfDone(); - if (LOG.isTraceEnabled()) { - LOG.trace("Writing index update:" + mutations + " to table: " + tableReference); - } - if (allowLocalUpdates && env!=null && tableReference.getTableName().equals( - env.getRegion().getTableDesc().getNameAsString())) { - try { - throwFailureIfDone(); - IndexUtil.writeLocalUpdates(env.getRegion(), mutations, true); - return Boolean.TRUE; - } catch (IOException ignord) { - // when it's failed we fall back to the standard & slow way - if (LOG.isTraceEnabled()) { - LOG.trace("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error=" - + ignord); - } - } - } - table = factory.getTable(tableReference.get()); - throwFailureIfDone(); - table.batch(mutations); - } catch (InterruptedException e) { - // reset the interrupt status on the thread - Thread.currentThread().interrupt(); - throw e; - } catch (Exception e) { - throw e; - } finally { - if (table != null) { - table.close(); - } - } - return Boolean.TRUE; - } - - private void throwFailureIfDone() throws SingleIndexWriteFailureException { - if (stopped.isStopped() || abortable.isAborted() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException( - "Pool closed, not attempting to write to the index!", null); } - - } - }); - } - - List<Boolean> results = null; - try { - LOG.debug("Waiting on index update tasks to complete..."); - results = this.pool.submitUninterruptible(tasks); - } catch (ExecutionException e) { - throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e); - } catch (EarlyExitFailure e) { - throw new RuntimeException("Stopped while waiting for batch, quiting!", e); - } - - // track the failures. We only ever access this on return from our calls, so no extra - // synchronization is needed. We could update all the failures as we find them, but that add a - // lot of locking overhead, and just doing the copy later is about as efficient. - List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>(); - int index = 0; - for (Boolean result : results) { - // there was a failure - if (result == null) { - // we know which table failed by the index of the result - failures.add(tables.get(index)); - } - index++; - } - - // if any of the tasks failed, then we need to propagate the failure - if (failures.size() > 0) { - // make the list unmodifiable to avoid any more synchronization concerns - throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures)); - } - return; - } - - @Override - public void stop(String why) { - LOG.info("Shutting down " + this.getClass().getSimpleName()); - this.pool.stop(why); - this.factory.shutdown(); - } - - @Override - public boolean isStopped() { - return this.stopped.isStopped(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b43dc09f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java index 3558a90..f36affb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java @@ -30,7 +30,6 @@ import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.write.IndexFailurePolicy; import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy; -import org.apache.phoenix.hbase.index.write.TrackingParallelWriterIndexCommitter; /** * Tracks any failed writes in The {@link PerRegionIndexWriteCache}, given a http://git-wip-us.apache.org/repos/asf/phoenix/blob/b43dc09f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java new file mode 100644 index 0000000..48ddd25 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.apache.phoenix.hbase.index.write.recovery; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.phoenix.hbase.index.CapturingAbortable; +import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException; +import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException; +import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure; +import org.apache.phoenix.hbase.index.parallel.Task; +import org.apache.phoenix.hbase.index.parallel.TaskBatch; +import org.apache.phoenix.hbase.index.parallel.TaskRunner; +import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder; +import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager; +import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner; +import org.apache.phoenix.hbase.index.table.HTableFactory; +import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; +import org.apache.phoenix.hbase.index.write.IndexCommitter; +import org.apache.phoenix.hbase.index.write.IndexWriter; +import org.apache.phoenix.hbase.index.write.IndexWriterUtils; +import org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter; +import org.apache.phoenix.util.IndexUtil; + +import com.google.common.collect.Multimap; + +/** + * Like the {@link ParallelWriterIndexCommitter}, but blocks until all writes have attempted to allow the caller to + * retrieve the failed and succeeded index updates. Therefore, this class will be a lot slower, in the face of failures, + * when compared to the {@link ParallelWriterIndexCommitter} (though as fast for writes), so it should be used only when + * you need to at least attempt all writes and know their result; for instance, this is fine for doing WAL recovery - + * it's not a performance intensive situation and we want to limit the the edits we need to retry. + * <p> + * On failure to {@link #write(Multimap)}, we return a {@link MultiIndexWriteFailureException} that contains the list of + * {@link HTableInterfaceReference} that didn't complete successfully. + * <p> + * Failures to write to the index can happen several different ways: + * <ol> + * <li><tt>this</tt> is {@link #stop(String) stopped} or aborted (via the passed {@link Abortable}. This causing any + * pending tasks to fail whatever they are doing as fast as possible. Any writes that have not begun are not even + * attempted and marked as failures.</li> + * <li>A batch write fails. This is the generic HBase write failure - it may occur because the index table is not + * available, .META. or -ROOT- is unavailable, or any other (of many) possible HBase exceptions.</li> + * </ol> + * Regardless of how the write fails, we still wait for all writes to complete before passing the failure back to the + * client. + */ +public class TrackingParallelWriterIndexCommitter implements IndexCommitter { + private static final Log LOG = LogFactory.getLog(TrackingParallelWriterIndexCommitter.class); + + public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.trackingwriter.threads.max"; + private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10; + private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.trackingwriter.threads.keepalivetime"; + + private TaskRunner pool; + private HTableFactory factory; + private CapturingAbortable abortable; + private Stoppable stopped; + private RegionCoprocessorEnvironment env; + + @Override + public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) { + this.env = env; + Configuration conf = env.getConfiguration(); + setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env), + ThreadPoolManager.getExecutor( + new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, + DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout( + INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env); + } + + /** + * Setup <tt>this</tt>. + * <p> + * Exposed for TESTING + */ + void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, + RegionCoprocessorEnvironment env) { + this.pool = new WaitForCompletionTaskRunner(pool); + this.factory = factory; + this.abortable = new CapturingAbortable(abortable); + this.stopped = stop; + } + + @Override + public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws MultiIndexWriteFailureException { + Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet(); + TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size()); + List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size()); + for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) { + // get the mutations for each table. We leak the implementation here a little bit to save + // doing a complete copy over of all the index update for each table. + final List<Mutation> mutations = (List<Mutation>)entry.getValue(); + // track each reference so we can get at it easily later, when determing failures + final HTableInterfaceReference tableReference = entry.getKey(); + final RegionCoprocessorEnvironment env = this.env; + if (env != null + && !allowLocalUpdates + && tableReference.getTableName().equals( + env.getRegion().getTableDesc().getNameAsString())) { + continue; + } + tables.add(tableReference); + + /* + * Write a batch of index updates to an index table. This operation stops (is cancelable) via two + * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread. + * The former will only work if we are not in the midst of writing the current batch to the table, though we + * do check these status variables before starting and before writing the batch. The latter usage, + * interrupting the thread, will work in the previous situations as was at some points while writing the + * batch, depending on the underlying writer implementation (HTableInterface#batch is blocking, but doesn't + * elaborate when is supports an interrupt). + */ + tasks.add(new Task<Boolean>() { + + /** + * Do the actual write to the primary table. + */ + @SuppressWarnings("deprecation") + @Override + public Boolean call() throws Exception { + HTableInterface table = null; + try { + // this may have been queued, but there was an abort/stop so we try to early exit + throwFailureIfDone(); + if (LOG.isTraceEnabled()) { + LOG.trace("Writing index update:" + mutations + " to table: " + tableReference); + } + if (allowLocalUpdates && env!=null && tableReference.getTableName().equals( + env.getRegion().getTableDesc().getNameAsString())) { + try { + throwFailureIfDone(); + IndexUtil.writeLocalUpdates(env.getRegion(), mutations, true); + return Boolean.TRUE; + } catch (IOException ignord) { + // when it's failed we fall back to the standard & slow way + if (LOG.isTraceEnabled()) { + LOG.trace("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error=" + + ignord); + } + } + } + table = factory.getTable(tableReference.get()); + throwFailureIfDone(); + table.batch(mutations); + } catch (InterruptedException e) { + // reset the interrupt status on the thread + Thread.currentThread().interrupt(); + throw e; + } catch (Exception e) { + throw e; + } finally { + if (table != null) { + table.close(); + } + } + return Boolean.TRUE; + } + + private void throwFailureIfDone() throws SingleIndexWriteFailureException { + if (stopped.isStopped() || abortable.isAborted() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException( + "Pool closed, not attempting to write to the index!", null); } + + } + }); + } + + List<Boolean> results = null; + try { + LOG.debug("Waiting on index update tasks to complete..."); + results = this.pool.submitUninterruptible(tasks); + } catch (ExecutionException e) { + throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e); + } catch (EarlyExitFailure e) { + throw new RuntimeException("Stopped while waiting for batch, quiting!", e); + } + + // track the failures. We only ever access this on return from our calls, so no extra + // synchronization is needed. We could update all the failures as we find them, but that add a + // lot of locking overhead, and just doing the copy later is about as efficient. + List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>(); + int index = 0; + for (Boolean result : results) { + // there was a failure + if (result == null) { + // we know which table failed by the index of the result + failures.add(tables.get(index)); + } + index++; + } + + // if any of the tasks failed, then we need to propagate the failure + if (failures.size() > 0) { + // make the list unmodifiable to avoid any more synchronization concerns + throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures)); + } + return; + } + + @Override + public void stop(String why) { + LOG.info("Shutting down " + this.getClass().getSimpleName()); + this.pool.stop(why); + this.factory.shutdown(); + } + + @Override + public boolean isStopped() { + return this.stopped.isStopped(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b43dc09f/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index a36e7bb..c91e36e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; -import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.write.DelegateIndexFailurePolicy; import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy; @@ -60,7 +59,6 @@ import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; /** * @@ -167,20 +165,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { // start by looking at all the tables to which we attempted to write long timestamp = 0; boolean leaveIndexActive = blockDataTableWritesOnFailure || !disableIndexOnFailure; - // if using TrackingParallelWriter, we know which indexes failed and only disable those - Set<HTableInterfaceReference> failedTables = Sets.newHashSetWithExpectedSize(attempted.size()); - try { - throw cause; - } catch (MultiIndexWriteFailureException miwfe) { - failedTables.addAll(miwfe.getFailedTables()); - } catch (Exception e) { - // do nothing - } - for (HTableInterfaceReference ref : refs) { - if (failedTables.size() > 0 && !failedTables.contains(ref)) { - continue; // leave index active if its writes succeeded - } long minTimeStamp = 0; // get the minimum timestamp across all the mutations we attempted on that table http://git-wip-us.apache.org/repos/asf/phoenix/blob/b43dc09f/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java index 6c1d8f1..88e4a96 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java @@ -131,7 +131,7 @@ public class TestIndexWriter { tables.put(new ImmutableBytesPtr(tableName), table); // setup the writer and failure policy - TrackingParallelWriterIndexCommitter committer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion()); + ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); committer.setup(factory, exec, abort, stop, e); KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy(); policy.setup(stop, abort); @@ -199,7 +199,7 @@ public class TestIndexWriter { tables.put(new ImmutableBytesPtr(tableName), table); tables.put(new ImmutableBytesPtr(tableName2), table2); - TrackingParallelWriterIndexCommitter committer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion()); + ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); committer.setup(factory, exec, abort, stop, e); KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy(); policy.setup(stop, abort); @@ -279,7 +279,7 @@ public class TestIndexWriter { indexUpdates.add(new Pair<Mutation, byte[]>(m, tableName)); // setup the writer - TrackingParallelWriterIndexCommitter committer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion()); + ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); committer.setup(factory, exec, abort, stop, e ); KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy(); policy.setup(stop, abort); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b43dc09f/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java index 3e2b47c..e62af7a 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java @@ -66,7 +66,7 @@ public class TestParalleIndexWriter { Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>()); FakeTableFactory factory = new FakeTableFactory( Collections.<ImmutableBytesPtr, HTableInterface> emptyMap()); - TrackingParallelWriterIndexCommitter writer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion()); + ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); Abortable mockAbort = Mockito.mock(Abortable.class); Stoppable mockStop = Mockito.mock(Stoppable.class); // create a simple writer @@ -116,7 +116,7 @@ public class TestParalleIndexWriter { tables.put(tableName, table); // setup the writer and failure policy - TrackingParallelWriterIndexCommitter writer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion()); + ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); writer.setup(factory, exec, abort, stop, e); writer.write(indexUpdates, true); assertTrue("Writer returned before the table batch completed! Likely a race condition tripped", http://git-wip-us.apache.org/repos/asf/phoenix/blob/b43dc09f/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java index 32a6661..789e7a1 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java @@ -62,7 +62,7 @@ public class TestParalleWriterIndexCommitter { ExecutorService exec = Executors.newFixedThreadPool(1); FakeTableFactory factory = new FakeTableFactory( Collections.<ImmutableBytesPtr, HTableInterface> emptyMap()); - TrackingParallelWriterIndexCommitter writer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion()); + ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); Abortable mockAbort = Mockito.mock(Abortable.class); Stoppable mockStop = Mockito.mock(Stoppable.class); RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class); @@ -117,7 +117,7 @@ public class TestParalleWriterIndexCommitter { tables.put(tableName, table); // setup the writer and failure policy - TrackingParallelWriterIndexCommitter writer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion()); + ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); writer.setup(factory, exec, abort, stop, e); writer.write(indexUpdates, true); assertTrue("Writer returned before the table batch completed! Likely a race condition tripped",
