Repository: incubator-atlas Updated Branches: refs/heads/master 59268875d -> 83d053978
ATLAS-503 Lock exceptions occurring due to concurrent updates to backend stores (yhemanth) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/83d05397 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/83d05397 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/83d05397 Branch: refs/heads/master Commit: 83d053978873e988604de2f07021878b5b987764 Parents: 5926887 Author: Hemanth Yamijala <[email protected]> Authored: Thu Jun 2 15:05:56 2016 +0530 Committer: Hemanth Yamijala <[email protected]> Committed: Thu Jun 2 15:05:56 2016 +0530 ---------------------------------------------------------------------- distro/src/conf/atlas-application.properties | 4 + docs/src/site/twiki/Configuration.twiki | 15 ++ release-log.txt | 1 + .../hbase/HBaseKeyColumnValueStore.java | 43 ++++-- .../hbase/HBaseKeyColumnValueStoreTest.java | 139 +++++++++++++++++++ 5 files changed, 194 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/83d05397/distro/src/conf/atlas-application.properties ---------------------------------------------------------------------- diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties index bfa40e8..1cdd424 100755 --- a/distro/src/conf/atlas-application.properties +++ b/distro/src/conf/atlas-application.properties @@ -119,3 +119,7 @@ atlas.auth.policy.file=${sys:atlas.home}/conf/policy-store.txt #########authorizer impl class ######### atlas.authorizer.impl=SIMPLE + +######### Performance Configs ######### +#atlas.graph.storage.lock.retries=10 +#atlas.graph.storage.cache.db-cache-time=120000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/83d05397/docs/src/site/twiki/Configuration.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki index 2884f42..7150483 100644 --- a/docs/src/site/twiki/Configuration.twiki +++ b/docs/src/site/twiki/Configuration.twiki @@ -220,3 +220,18 @@ atlas.client.ha.sleep.interval.ms=5000 # Set the following property to true, to enable the setup steps to run on each server start. Default = false. atlas.server.run.setup.on.start=false </verbatim> + +---++ Performance configuration items + +The following properties can be used to tune performance of Atlas under specific circumstances: + +<verbatim> +# The number of times Atlas code tries to acquire a lock (to ensure consistency) while committing a transaction. +# This should be related to the amount of concurrency expected to be supported by the server. For e.g. with retries set to 10, upto 100 threads can concurrently create types in the Atlas system. +# If this is set to a low value (default is 3), concurrent operations might fail with a PermanentLockingException. +atlas.graph.storage.lock.retries=10 + +# Milliseconds to wait before evicting a cached entry. This should be > atlas.graph.storage.lock.wait-time x atlas.graph.storage.lock.retries +# If this is set to a low value (default is 10000), warnings on transactions taking too long will occur in the Atlas application log. +atlas.graph.storage.cache.db-cache-time=120000 +</verbatim> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/83d05397/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 8077741..86cff05 100644 --- a/release-log.txt +++ b/release-log.txt @@ -22,6 +22,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-503 Lock exceptions occurring due to concurrent updates to backend stores (yhemanth) ATLAS-766 Atlas policy file does not honour standard hash as comment format ( saqeeb.s via sumasai ) ATLAS-843 Atlas UI: Feature to search terms in left navigation. (Kalyanikashikar via sumasai) ATLAS-731 Remove dashboard module in Atlas, replaced by dashboardv2 (kevalbhatt18 via sumasai) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/83d05397/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java index b4dc12e..fc8f2c4 100644 --- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java @@ -20,6 +20,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.thinkaurelius.titan.core.attribute.Duration; import com.thinkaurelius.titan.diskstorage.*; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*; import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator; import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException; @@ -50,8 +51,6 @@ import java.util.*; import java.util.Map; import java.util.concurrent.TimeUnit; -import static com.thinkaurelius.titan.diskstorage.EntryMetaData.*; - /** * Here are some areas that might need work: * <p/> @@ -85,7 +84,9 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore { private LocalLockMediator<StoreTransaction> localLockMediator; - private Duration lockExpiryTime; + private final Duration lockExpiryTimeMs; + private final Duration lockMaxWaitTimeMs; + private final Integer lockMaxRetries; HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName, LocalLockMediator<StoreTransaction> llm) { this.storeManager = storeManager; @@ -96,7 +97,10 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore { this.columnFamilyBytes = columnFamily.getBytes(); this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName)); this.localLockMediator = llm; - this.lockExpiryTime = storeManager.getStorageConfig().get(GraphDatabaseConfiguration.LOCK_EXPIRE); + Configuration storageConfig = storeManager.getStorageConfig(); + this.lockExpiryTimeMs = storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE); + this.lockMaxWaitTimeMs = storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT); + this.lockMaxRetries = storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY); } @Override @@ -128,14 +132,37 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore { KeyColumn lockID = new KeyColumn(key, column); logger.debug("Attempting to acquireLock on {} ", lockID); - final Timepoint lockStartTime = Timestamps.NANO.getTime(System.nanoTime(), TimeUnit.NANOSECONDS); - boolean locked = localLockMediator.lock(lockID, txh, lockStartTime.add(lockExpiryTime)); - if (!locked) { - throw new PermanentLockingException("Could not lock the keyColumn " + lockID + " on CF {} " + Bytes.toString(columnFamilyBytes)); + int trialCount = 0; + boolean locked; + while (trialCount < lockMaxRetries) { + final Timepoint lockStartTime = Timestamps.MILLI.getTime(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + locked = localLockMediator.lock(lockID, txh, lockStartTime.add(lockExpiryTimeMs)); + trialCount++; + if (!locked) { + handleLockFailure(txh, lockID, trialCount); + } else { + logger.debug("Acquired lock on {}, {}", lockID, txh); + break; + } } ((HBaseTransaction) txh).updateLocks(lockID, expectedValue); } + void handleLockFailure(StoreTransaction txh, KeyColumn lockID, int trialCount) throws PermanentLockingException { + if (trialCount < lockMaxRetries) { + try { + Thread.sleep(lockMaxWaitTimeMs.getLength(TimeUnit.DAYS.MILLISECONDS)); + } catch (InterruptedException e) { + throw new PermanentLockingException( + "Interrupted while waiting for acquiring lock for transaction " + + txh + " lockID " + lockID + " on retry " + trialCount, e); + } + } else { + throw new PermanentLockingException("Could not lock the keyColumn " + + lockID + " on CF {} " + Bytes.toString(columnFamilyBytes)); + } + } + @Override public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException { return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY), http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/83d05397/titan/src/test/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStoreTest.java ---------------------------------------------------------------------- diff --git a/titan/src/test/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStoreTest.java b/titan/src/test/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStoreTest.java new file mode 100644 index 0000000..7ed636a --- /dev/null +++ b/titan/src/test/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStoreTest.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.thinkaurelius.titan.diskstorage.hbase; + +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.EntryMetaData; +import com.thinkaurelius.titan.diskstorage.StaticBuffer; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; +import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator; +import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException; +import com.thinkaurelius.titan.diskstorage.util.KeyColumn; +import com.thinkaurelius.titan.diskstorage.util.time.StandardDuration; +import com.thinkaurelius.titan.diskstorage.util.time.Timepoint; +import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.fail; + +public class HBaseKeyColumnValueStoreTest { + + @Mock + HBaseStoreManager storeManager; + + @Mock + ConnectionMask connectionMask; + + @Mock + LocalLockMediator localLockMediator; + + @Mock + StaticBuffer key; + + @Mock + StaticBuffer column; + + @Mock + StaticBuffer expectedValue; + + @Mock + HBaseTransaction transaction; + + @Mock + Configuration storageConfig; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void shouldSucceedInLockingIfLockMediatorSucceeds() throws BackendException { + + when(storeManager.getMetaDataSchema("hbase")).thenReturn(new EntryMetaData[] {EntryMetaData.TIMESTAMP}); + when(storeManager.getStorageConfig()).thenReturn(storageConfig); + when(storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE)).thenReturn( + new StandardDuration(300L, TimeUnit.MILLISECONDS)); + when(storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT)).thenReturn( + new StandardDuration(10L, TimeUnit.MILLISECONDS)); + when(storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY)).thenReturn(3); + KeyColumn lockID = new KeyColumn(key, column); + when(localLockMediator.lock(eq(lockID), eq(transaction), any(Timepoint.class))). + thenReturn(true); + + HBaseKeyColumnValueStore hBaseKeyColumnValueStore = + new HBaseKeyColumnValueStore(storeManager, connectionMask, "titan", "e", "hbase", localLockMediator); + hBaseKeyColumnValueStore.acquireLock(key, column, expectedValue, transaction); + + verify(transaction).updateLocks(lockID, expectedValue); + verify(localLockMediator, times(1)).lock(eq(lockID), eq(transaction), any(Timepoint.class)); + } + + @Test + public void shouldRetryRightNumberOfTimesIfLockMediationFails() throws BackendException { + when(storeManager.getMetaDataSchema("hbase")).thenReturn(new EntryMetaData[] {EntryMetaData.TIMESTAMP}); + when(storeManager.getStorageConfig()).thenReturn(storageConfig); + when(storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE)).thenReturn( + new StandardDuration(300L, TimeUnit.MILLISECONDS)); + when(storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT)).thenReturn( + new StandardDuration(10L, TimeUnit.MILLISECONDS)); + when(storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY)).thenReturn(3); + KeyColumn lockID = new KeyColumn(key, column); + when(localLockMediator.lock(eq(lockID), eq(transaction), any(Timepoint.class))). + thenReturn(false).thenReturn(false).thenReturn(true); + + HBaseKeyColumnValueStore hBaseKeyColumnValueStore = + new HBaseKeyColumnValueStore(storeManager, connectionMask, "titan", "e", "hbase", localLockMediator); + hBaseKeyColumnValueStore.acquireLock(key, column, expectedValue, transaction); + + verify(transaction).updateLocks(lockID, expectedValue); + verify(localLockMediator, times(3)).lock(eq(lockID), eq(transaction), any(Timepoint.class)); + } + + @Test(expectedExceptions = PermanentLockingException.class) + public void shouldThrowExceptionAfterConfiguredRetriesIfLockMediationFails() throws BackendException { + when(storeManager.getMetaDataSchema("hbase")).thenReturn(new EntryMetaData[] {EntryMetaData.TIMESTAMP}); + when(storeManager.getStorageConfig()).thenReturn(storageConfig); + when(storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE)).thenReturn( + new StandardDuration(300L, TimeUnit.MILLISECONDS)); + when(storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT)).thenReturn( + new StandardDuration(10L, TimeUnit.MILLISECONDS)); + when(storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY)).thenReturn(3); + KeyColumn lockID = new KeyColumn(key, column); + when(localLockMediator.lock(eq(lockID), eq(transaction), any(Timepoint.class))). + thenReturn(false).thenReturn(false).thenReturn(false); + + HBaseKeyColumnValueStore hBaseKeyColumnValueStore = + new HBaseKeyColumnValueStore(storeManager, connectionMask, "titan", "e", "hbase", localLockMediator); + hBaseKeyColumnValueStore.acquireLock(key, column, expectedValue, transaction); + + fail("Should fail as lock could not be acquired after 3 retries."); + } +}
