Repository: phoenix Updated Branches: refs/heads/master 9c458fa3d -> 54d9e1c36
PHOENIX-4053 Lock row exclusively when necessary for mutable secondary indexing Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/54d9e1c3 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/54d9e1c3 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/54d9e1c3 Branch: refs/heads/master Commit: 54d9e1c36c46e7c50c29def08cf866599c7a4e45 Parents: 9c458fa Author: James Taylor <[email protected]> Authored: Mon Jul 31 10:57:22 2017 -0700 Committer: James Taylor <[email protected]> Committed: Mon Jul 31 10:57:22 2017 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/ConcurrentMutationsIT.java | 405 +++++++++++++++++++ .../org/apache/phoenix/hbase/index/Indexer.java | 61 ++- .../apache/phoenix/hbase/index/LockManager.java | 252 ++++++++++++ .../hbase/index/builder/IndexBuildManager.java | 12 +- .../java/org/apache/phoenix/util/TestUtil.java | 2 + 5 files changed, 713 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/54d9e1c3/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java new file mode 100644 index 0000000..19cb70e --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class ConcurrentMutationsIT extends BaseUniqueNamesOwnClusterIT { + private static final Random RAND = new Random(); + private static final String MVCC_LOCK_TEST_TABLE_PREFIX = "MVCCLOCKTEST_"; + private static final String LOCK_TEST_TABLE_PREFIX = "LOCKTEST_"; + private static final int ROW_LOCK_WAIT_TIME = 10000; + + private final Object lock = new Object(); + private long scn = 100; + + private static void addDelayingCoprocessor(Connection conn, String tableName) throws SQLException, IOException { + int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100; + ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName)); + descriptor.addCoprocessor(DelayingRegionObserver.class.getName(), null, priority, null); + HBaseAdmin admin = services.getAdmin(); + try { + admin.modifyTable(Bytes.toBytes(tableName), descriptor); + } finally { + admin.close(); + } + } + + @Test + @Ignore + public void testSynchronousDeletesAndUpsertValues() throws Exception { + final String tableName = generateUniqueName(); + final String indexName = generateUniqueName(); + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2))"); + addDelayingCoprocessor(conn, tableName); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)"); + final CountDownLatch doneSignal = new CountDownLatch(2); + Runnable r1 = new Runnable() { + + @Override + public void run() { + try { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + for (int i = 0; i < 50; i++) { + Thread.sleep(20); + synchronized (lock) { + scn += 10; + PhoenixConnection conn = null; + try { + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn)); + conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class); + conn.setAutoCommit(true); + conn.createStatement().execute("DELETE FROM " + tableName); + } finally { + if (conn != null) conn.close(); + } + } + } + } catch (SQLException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + Thread.interrupted(); + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Runnable r2 = new Runnable() { + + @Override + public void run() { + try { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + int nRowsToUpsert = 1000; + for (int i = 0; i < nRowsToUpsert; i++) { + synchronized(lock) { + scn += 10; + PhoenixConnection conn = null; + try { + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn)); + conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (" + (i % 10) + ", 0, 1)"); + if ((i % 20) == 0 || i == nRowsToUpsert-1 ) { + conn.commit(); + } + } finally { + if (conn != null) conn.close(); + } + } + } + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Thread t1 = new Thread(r1); + t1.start(); + Thread t2 = new Thread(r2); + t2.start(); + + doneSignal.await(60, TimeUnit.SECONDS); + long count1 = getRowCount(conn, tableName); + long count2 = getRowCount(conn, indexName); + assertTrue("Expected table row count ( " + count1 + ") to match index row count (" + count2 + ")", count1 == count2); + } + + @Test + @Ignore + public void testConcurrentDeletesAndUpsertValues() throws Exception { + final String tableName = generateUniqueName(); + final String indexName = generateUniqueName(); + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2))"); + addDelayingCoprocessor(conn, tableName); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)"); + final CountDownLatch doneSignal = new CountDownLatch(2); + Runnable r1 = new Runnable() { + + @Override + public void run() { + try { + Connection conn = DriverManager.getConnection(getUrl()); + conn.setAutoCommit(true); + for (int i = 0; i < 50; i++) { + Thread.sleep(20); + conn.createStatement().execute("DELETE FROM " + tableName); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + Thread.interrupted(); + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Runnable r2 = new Runnable() { + + @Override + public void run() { + try { + Connection conn = DriverManager.getConnection(getUrl()); + for (int i = 0; i < 1000; i++) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (" + (i % 10) + ", 0, 1)"); + if ((i % 20) == 0) { + conn.commit(); + } + } + conn.commit(); + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Thread t1 = new Thread(r1); + t1.start(); + Thread t2 = new Thread(r2); + t2.start(); + + doneSignal.await(60, TimeUnit.SECONDS); + long count1 = getRowCount(conn, tableName); + long count2 = getRowCount(conn, indexName); + assertTrue("Expected table row count ( " + count1 + ") to match index row count (" + count2 + ")", count1 == count2); + } + + @Test + public void testRowLockDuringPreBatchMutateWhenIndexed() throws Exception { + final String tableName = LOCK_TEST_TABLE_PREFIX + generateUniqueName(); + final String indexName = generateUniqueName(); + Connection conn = DriverManager.getConnection(getUrl()); + + conn.createStatement().execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v INTEGER)"); + addDelayingCoprocessor(conn, tableName); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v)"); + final CountDownLatch doneSignal = new CountDownLatch(2); + final String[] failedMsg = new String[1]; + Runnable r1 = new Runnable() { + + @Override + public void run() { + try { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',0)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',1)"); + conn.commit(); + } catch (Exception e) { + failedMsg[0] = e.getMessage(); + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Runnable r2 = new Runnable() { + + @Override + public void run() { + try { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',2)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',3)"); + conn.commit(); + } catch (Exception e) { + failedMsg[0] = e.getMessage(); + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Thread t1 = new Thread(r1); + t1.start(); + Thread t2 = new Thread(r2); + t2.start(); + + doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS); + assertNull(failedMsg[0], failedMsg[0]); + } + + @Test + public void testLockUntilMVCCAdvanced() throws Exception { + final String tableName = MVCC_LOCK_TEST_TABLE_PREFIX + generateUniqueName(); + final String indexName = generateUniqueName(); + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v INTEGER)"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v,k)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',0)"); + conn.commit(); + addDelayingCoprocessor(conn, tableName); + final CountDownLatch doneSignal = new CountDownLatch(2); + final String[] failedMsg = new String[1]; + Runnable r1 = new Runnable() { + + @Override + public void run() { + try { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',1)"); + conn.commit(); + } catch (Exception e) { + failedMsg[0] = e.getMessage(); + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Runnable r2 = new Runnable() { + + @Override + public void run() { + try { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',2)"); + conn.commit(); + } catch (Exception e) { + failedMsg[0] = e.getMessage(); + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Thread t1 = new Thread(r1); + t1.start(); + Thread t2 = new Thread(r2); + t2.start(); + + doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS); + + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); + + long count1 = getRowCount(conn, tableName); + long count2 = getRowCount(conn, indexName); + assertTrue("Expected table row count ( " + count1 + ") to match index row count (" + count2 + ")", count1 == count2); + + ResultSet rs1 = conn.createStatement().executeQuery("SELECT * FROM " + indexName); + assertTrue(rs1.next()); + ResultSet rs2 = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName + " WHERE k = '" + rs1.getString(2) + "'"); + assertTrue("Could not find row in table where k = '" + rs1.getString(2) + "'", rs2.next()); + assertEquals(rs1.getInt(1), rs2.getInt(2)); + assertFalse(rs1.next()); + assertFalse(rs2.next()); + } + + private static long getRowCount(Connection conn, String tableName) throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM " + tableName); + assertTrue(rs.next()); + return rs.getLong(1); + } + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(10); + clientProps.put(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, Integer.toString(0)); + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10); + serverProps.put("hbase.rowlock.wait.duration", Integer.toString(ROW_LOCK_WAIT_TIME)); + serverProps.put(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(3)); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); + } + + + public static class DelayingRegionObserver extends SimpleRegionObserver { + private volatile boolean lockedTableRow; + + @Override + public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + try { + String tableName = c.getEnvironment().getRegionInfo().getTable().getNameAsString(); + if (tableName.startsWith(MVCC_LOCK_TEST_TABLE_PREFIX)) { + Thread.sleep(ROW_LOCK_WAIT_TIME/2); // Wait long enough that they'll both have the same mvcc + } + } catch (InterruptedException e) { + } + } + + @Override + public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException { + try { + String tableName = c.getEnvironment().getRegionInfo().getTable().getNameAsString(); + if (tableName.startsWith(LOCK_TEST_TABLE_PREFIX)) { + if (lockedTableRow) { + throw new DoNotRetryIOException("Expected lock in preBatchMutate to be exclusive, but it wasn't for row " + Bytes.toStringBinary(miniBatchOp.getOperation(0).getRow())); + } + lockedTableRow = true; + Thread.sleep(ROW_LOCK_WAIT_TIME + 2000); + } + Thread.sleep(Math.abs(RAND.nextInt()) % 10); + } catch (InterruptedException e) { + } finally { + lockedTableRow = false; + } + + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/54d9e1c3/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index ea5bf4f..8523977 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -117,6 +117,7 @@ public class Indexer extends BaseRegionObserver { protected IndexWriter writer; protected IndexBuildManager builder; + private LockManager lockManager; /** Configuration key for the {@link IndexBuilder} to use */ public static final String INDEX_BUILDER_CONF_KEY = "index.builder"; @@ -162,16 +163,19 @@ public class Indexer extends BaseRegionObserver { private long slowPreWALRestoreThreshold; private long slowPostOpenThreshold; private long slowPreIncrementThreshold; - + private int rowLockWaitDuration; + public static final String RecoveryFailurePolicyKeyForTesting = INDEX_RECOVERY_FAILURE_POLICY_KEY; - public static final int INDEXING_SUPPORTED_MAJOR_VERSION = VersionUtil + public static final int INDEXING_SUPPORTED_MAJOR_VERSION = VersionUtil .encodeMaxPatchVersion(0, 94); - public static final int INDEXING_SUPPORTED__MIN_MAJOR_VERSION = VersionUtil + public static final int INDEXING_SUPPORTED__MIN_MAJOR_VERSION = VersionUtil .encodeVersion("0.94.0"); - private static final int INDEX_WAL_COMPRESSION_MINIMUM_SUPPORTED_VERSION = VersionUtil + private static final int INDEX_WAL_COMPRESSION_MINIMUM_SUPPORTED_VERSION = VersionUtil .encodeVersion("0.94.9"); + private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; + @Override public void start(CoprocessorEnvironment e) throws IOException { try { @@ -206,6 +210,10 @@ public class Indexer extends BaseRegionObserver { DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env); // setup the actual index writer this.writer = new IndexWriter(indexWriterEnv, serverName + "-index-writer"); + + this.rowLockWaitDuration = clonedConfig.getInt("hbase.rowlock.wait.duration", + DEFAULT_ROWLOCK_WAIT_DURATION); + this.lockManager = new LockManager(); // Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat this.metricSource = MetricsIndexerSourceFactory.getInstance().create(); @@ -346,8 +354,9 @@ public class Indexer extends BaseRegionObserver { "Somehow didn't return an index update but also didn't propagate the failure to the client!"); } - private static final OperationStatus SUCCESS = new OperationStatus(OperationStatusCode.SUCCESS); - + private static final OperationStatus IGNORE = new OperationStatus(OperationStatusCode.SUCCESS); + private static final OperationStatus FAILURE = new OperationStatus(OperationStatusCode.FAILURE, "Unable to acquire row lock"); + public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable { @@ -365,7 +374,7 @@ public class Indexer extends BaseRegionObserver { for (int i = 0; i < miniBatchOp.size(); i++) { Mutation m = miniBatchOp.getOperation(i); if (this.builder.isAtomicOp(m)) { - miniBatchOp.setOperationStatus(i, SUCCESS); + miniBatchOp.setOperationStatus(i, IGNORE); continue; } // skip this mutation if we aren't enabling indexing @@ -373,13 +382,40 @@ public class Indexer extends BaseRegionObserver { // should be indexed, which means we need to expose another method on the builder. Such is the // way optimization go though. if (this.builder.isEnabled(m)) { + boolean success = false; + try { + lockManager.lockRow(m.getRow(), rowLockWaitDuration); + success = true; + } finally { + if (!success) { + // We're throwing here as a result of either a timeout while waiting + // for the row lock or an interrupt. Either way, the lock on the + // current row was unsuccessful and we won't be locking any more rows + // since we're throwing. By setting the operation status to FAILURE + // here, we prevent the attempt to unlock rows we've never locked when + // postBatchMutateIndispensably is executed. We're very limited wrt + // the state that can be shared between the batch mutate coprocessor + // calls (see HBASE-18127). + // Note that we shouldn't necessarily be throwing here, since we're + // essentially failing the data write because we can't do the locking + // necessary for performing consistent index maintenance. We'd ideally + // want to go through the index failure policy to determine what action + // to perform. We currently cannot ignore this lock failure as we lack + // the ability to keep that state (PHOENIX-4055). + for (int j = i; j < miniBatchOp.size(); j++) { + miniBatchOp.setOperationStatus(j,FAILURE); + } + } + } Durability effectiveDurablity = (m.getDurability() == Durability.USE_DEFAULT) ? defaultDurability : m.getDurability(); if (effectiveDurablity.ordinal() > durability.ordinal()) { durability = effectiveDurablity; } - // add the mutation to the batch set + // TODO: remove this code as Phoenix prevents any duplicate + // rows in the batch mutation from the client side (PHOENIX-4054). + // Add the mutation to the batch set ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); MultiMutation stored = mutations.get(row); // we haven't seen this row before, so add it @@ -390,7 +426,7 @@ public class Indexer extends BaseRegionObserver { stored.addAll(m); } } - + // early exit if it turns out we don't have any edits if (mutations.isEmpty()) { return; @@ -403,6 +439,7 @@ public class Indexer extends BaseRegionObserver { edit = new WALEdit(); miniBatchOp.setWalEdit(0, edit); } + // get the current span, or just use a null-span to avoid a bunch of if statements try (TraceScope scope = Trace.startSpan("Starting to build index updates")) { @@ -488,6 +525,12 @@ public class Indexer extends BaseRegionObserver { } long start = EnvironmentEdgeManager.currentTimeMillis(); try { + for (int i = 0; i < miniBatchOp.size(); i++) { + OperationStatus status = miniBatchOp.getOperationStatus(i); + if (status != IGNORE && status != FAILURE) { + lockManager.unlockRow(miniBatchOp.getOperation(i).getRow()); + } + } this.builder.batchCompleted(miniBatchOp); if (success) { // if miniBatchOp was successfully written, write index updates http://git-wip-us.apache.org/repos/asf/phoenix/blob/54d9e1c3/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java new file mode 100644 index 0000000..02e4c3c --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; + +/** + * + * Class, copied for the most part from HRegion.getRowLockInternal implementation + * that manages reentrant row locks based on the row key. Phoenix needs to manage + * it's own locking due to secondary indexes needing a consistent snapshot from + * the time the mvcc is acquired until the time it is advanced (PHOENIX-4053). + * + */ +public class LockManager { + private static final Log LOG = LogFactory.getLog(LockManager.class); + + private final ConcurrentHashMap<ImmutableBytesPtr, RowLockContext> lockedRows = + new ConcurrentHashMap<ImmutableBytesPtr, RowLockContext>(); + + public LockManager () { + } + + /** + * Lock the row or throw otherwise + * @param row the row key + * @return RowLock used to eventually release the lock + * @throws TimeoutIOException if the lock could not be acquired within the + * allowed rowLockWaitDuration and InterruptedException if interrupted while + * waiting to acquire lock. + */ + public RowLock lockRow(byte[] row, int waitDuration) throws IOException { + // create an object to use a a key in the row lock map + ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row); + + RowLockContext rowLockContext = null; + RowLockImpl result = null; + TraceScope traceScope = null; + + // If we're tracing start a span to show how long this took. + if (Trace.isTracing()) { + traceScope = Trace.startSpan("LockManager.getRowLock"); + traceScope.getSpan().addTimelineAnnotation("Getting a lock"); + } + + boolean success = false; + try { + // Keep trying until we have a lock or error out. + // TODO: do we need to add a time component here? + while (result == null) { + + // Try adding a RowLockContext to the lockedRows. + // If we can add it then there's no other transactions currently running. + rowLockContext = new RowLockContext(rowKey); + RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext); + + // if there was a running transaction then there's already a context. + if (existingContext != null) { + rowLockContext = existingContext; + } + + result = rowLockContext.newRowLock(); + } + if (!result.getLock().tryLock(waitDuration, TimeUnit.MILLISECONDS)) { + if (traceScope != null) { + traceScope.getSpan().addTimelineAnnotation("Failed to get row lock"); + } + throw new TimeoutIOException("Timed out waiting for lock for row: " + rowKey); + } + rowLockContext.setThreadName(Thread.currentThread().getName()); + success = true; + return result; + } catch (InterruptedException ie) { + LOG.warn("Thread interrupted waiting for lock on row: " + rowKey); + InterruptedIOException iie = new InterruptedIOException(); + iie.initCause(ie); + if (traceScope != null) { + traceScope.getSpan().addTimelineAnnotation("Interrupted exception getting row lock"); + } + Thread.currentThread().interrupt(); + throw iie; + } finally { + // On failure, clean up the counts just in case this was the thing keeping the context alive. + if (!success && rowLockContext != null) rowLockContext.cleanUp(); + if (traceScope != null) { + traceScope.close(); + } + } + } + + /** + * Unlock the row. We need this stateless way of unlocking because + * we have no means of passing the RowLock instances between + * coprocessor calls (see HBASE-18482). Once we have that, we + * can have the caller collect RowLock instances and free when + * needed. + * @param row the row key + * @throws IOException + */ + public void unlockRow(byte[] row) throws IOException { + ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row); + RowLockContext lockContext = lockedRows.get(rowKey); + if (lockContext != null) { + lockContext.releaseRowLock(); + } + } + + class RowLockContext { + private final ImmutableBytesPtr rowKey; + // TODO: consider making this non atomic. It's only saving one + // synchronization in the case of cleanup() when more than one + // thread is holding on to the lock. + private final AtomicInteger count = new AtomicInteger(0); + private final ReentrantLock reentrantLock = new ReentrantLock(true); + // TODO: remove once we can pass List<RowLock> as needed through + // coprocessor calls. + private volatile RowLockImpl rowLock = RowLockImpl.UNINITIALIZED; + private String threadName; + + RowLockContext(ImmutableBytesPtr rowKey) { + this.rowKey = rowKey; + } + + RowLockImpl newRowLock() { + count.incrementAndGet(); + synchronized (this) { + if (rowLock != null) { + rowLock = new RowLockImpl(this, reentrantLock); + return rowLock; + } else { + return null; + } + } + } + + void releaseRowLock() { + synchronized (this) { + if (rowLock != null) { + rowLock.release(); + } + } + } + + void cleanUp() { + long c = count.decrementAndGet(); + if (c <= 0) { + synchronized (this) { + if (count.get() <= 0 && rowLock != null){ + rowLock = null; + RowLockContext removed = lockedRows.remove(rowKey); + assert removed == this: "we should never remove a different context"; + } + } + } + } + + void setThreadName(String threadName) { + this.threadName = threadName; + } + + @Override + public String toString() { + return "RowLockContext{" + + "row=" + rowKey + + ", readWriteLock=" + reentrantLock + + ", count=" + count + + ", threadName=" + threadName + + '}'; + } + } + + /** + * Class used to represent a lock on a row. + */ + public static class RowLockImpl implements RowLock { + static final RowLockImpl UNINITIALIZED = new RowLockImpl(); + private final RowLockContext context; + private final Lock lock; + + private RowLockImpl() { + context = null; + lock = null; + } + + RowLockImpl(RowLockContext context, Lock lock) { + this.context = context; + this.lock = lock; + } + + Lock getLock() { + return lock; + } + + @Override + public void release() { + lock.unlock(); + context.cleanUp(); + } + + @Override + public String toString() { + return "RowLockImpl{" + + "context=" + context + + ", lock=" + lock + + '}'; + } + } + + /** + * Row lock held by a given thread. + * One thread may acquire multiple locks on the same row simultaneously. + * The locks must be released by calling release() from the same thread. + */ + public interface RowLock { + /** + * Release the given lock. If there are no remaining locks held by the current thread + * then unlock the row and allow other threads to acquire the lock. + * @throws IllegalArgumentException if called by a different thread than the lock owning + * thread + */ + void release(); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/54d9e1c3/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java index c015a77..0567d35 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java @@ -21,8 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,12 +34,6 @@ import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.covered.IndexMetaData; -import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner; -import org.apache.phoenix.hbase.index.parallel.Task; -import org.apache.phoenix.hbase.index.parallel.TaskBatch; -import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder; - -import com.google.common.util.concurrent.MoreExecutors; /** * Manage the building of index updates from primary table updates. @@ -90,7 +82,8 @@ public class IndexBuildManager implements Stoppable { // Avoid the Object overhead of the executor when it's not actually parallelizing anything. ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size()); for (Mutation m : mutations) { - results.addAll(delegate.getIndexUpdate(m, indexMetaData)); + Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData); + results.addAll(updates); } return results; } @@ -139,5 +132,4 @@ public class IndexBuildManager implements Stoppable { public IndexBuilder getBuilderForTesting() { return this.delegate; } - } http://git-wip-us.apache.org/repos/asf/phoenix/blob/54d9e1c3/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 012c663..8a5a8e4 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -838,6 +838,8 @@ public class TestUtil { public static void dumpTable(HTableInterface table) throws IOException { System.out.println("************ dumping " + table + " **************"); Scan s = new Scan(); + s.setRaw(true);; + s.setMaxVersions(); try (ResultScanner scanner = table.getScanner(s)) { Result result = null; while ((result = scanner.next()) != null) {
