Repository: ambari Updated Branches: refs/heads/trunk b440ae7f5 -> 23aa6eaca
AMBARI-17106 - Deadlock While Updating Stale Configuration Cache During Upgrade (jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/23aa6eac Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/23aa6eac Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/23aa6eac Branch: refs/heads/trunk Commit: 23aa6eacab0c19c689ee0888b3783bf16703309f Parents: b440ae7 Author: Jonathan Hurley <[email protected]> Authored: Tue Jun 7 23:44:22 2016 -0400 Committer: Jonathan Hurley <[email protected]> Committed: Wed Jun 8 11:01:45 2016 -0400 ---------------------------------------------------------------------- .../ambari/server/state/ConfigHelper.java | 133 ++++++++++++++++--- .../apache/ambari/server/state/ConfigImpl.java | 14 ++ .../server/state/cluster/ClusterImpl.java | 5 - .../ambari/server/state/ConfigHelperTest.java | 14 +- 4 files changed, 136 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/23aa6eac/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java index 1bd0ba9..4feba62 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java @@ -29,6 +29,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; @@ -51,6 +54,7 @@ import org.slf4j.LoggerFactory; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import com.google.inject.Singleton; import com.google.inject.persist.Transactional; @@ -68,6 +72,18 @@ public class ConfigHelper { public static final String CLUSTER_DEFAULT_TAG = "tag"; private final boolean STALE_CONFIGS_CACHE_ENABLED; private final int STALE_CONFIGS_CACHE_EXPIRATION_TIME; + + /** + * A cache of which {@link ServiceComponentHost}s have stale configurations. + * This cache may be invalidated within the context of a running transaction + * which could potentially cause old data to be re-cached before the + * transaction has completed. + * <p/> + * As a result, all invalidations of this cache should be done on a separate + * thread using the {@link LockArea#STALE_CONFIG_CACHE}. + * + * @see #cacheInvalidationExecutor + */ private final Cache<ServiceComponentHost, Boolean> staleConfigsCache; private static final Logger LOG = @@ -97,6 +113,25 @@ public class ConfigHelper { public static final String FIRST_VERSION_TAG = "version1"; /** + * A {@link ThreadFactory} for the {@link #cacheInvalidationExecutor}. + */ + private final ThreadFactory cacheInvalidationThreadFactory = new ThreadFactoryBuilder().setNameFormat( + "ambari-stale-config-invalidator-%d").setDaemon(true).build(); + + /** + * Used to invalidate the {@link #staleConfigsCache} on a separate thread. + * This helps ensure that the {@link TransactionalLock} / + * {@link LockArea#STALE_CONFIG_CACHE} doesn't cause a deadlock with the + * "cluster global lock". + * <p/> + * Cache invalidations must happen after the transaction which invoked them + * has completed, otherwise it's possible to cache invalid data before the + * transaction is committed. + */ + private final ExecutorService cacheInvalidationExecutor = Executors.newSingleThreadExecutor( + cacheInvalidationThreadFactory); + + /** * Used to ensure that methods which rely on the completion of * {@link Transactional} can detect when they are able to run. * @@ -449,34 +484,32 @@ public class ConfigHelper { } if (stale == null) { - ReadWriteLock lock = transactionLocks.getLock(LockArea.STALE_CONFIG_CACHE); - lock.readLock().lock(); - - try { - stale = calculateIsStaleConfigs(sch, desiredConfigs); - staleConfigsCache.put(sch, stale); - if (LOG.isDebugEnabled()) { - LOG.debug("Cache configuration staleness for host {} and component {} as {}", - sch.getHostName(), sch.getServiceComponentName(), stale); - } - } finally { - lock.readLock().unlock(); + stale = calculateIsStaleConfigs(sch, desiredConfigs); + staleConfigsCache.put(sch, stale); + if (LOG.isDebugEnabled()) { + LOG.debug("Cache configuration staleness for host {} and component {} as {}", + sch.getHostName(), sch.getServiceComponentName(), stale); } } return stale; } /** - * Invalidates cached isStale values for hostname + * Invalidates the stale configuration cache for all + * {@link ServiceComponentHost}s for the given host. This will execute the + * invalidation on a separate thread. The thread will attempt to acquire the + * {@link LockArea#STALE_CONFIG_CACHE} lock which is why this action is + * performed on a separate thread. This way, it won't interfere with any + * cluster global locks already acquired. * * @param hostname */ public void invalidateStaleConfigsCache(String hostname) { try { for (Cluster cluster : clusters.getClustersForHost(hostname)) { - for (ServiceComponentHost sch : cluster.getServiceComponentHosts(hostname)) { - invalidateStaleConfigsCache(sch); - } + List<ServiceComponentHost> serviceComponentHosts = cluster.getServiceComponentHosts(hostname); + Runnable invalidationRunnable = new StaleConfigInvalidationRunnable(serviceComponentHosts); + cacheInvalidationExecutor.execute(invalidationRunnable); } } catch (AmbariException e) { LOG.warn("Unable to find clusters for host " + hostname); @@ -484,19 +517,28 @@ public class ConfigHelper { } /** - * Invalidates isStale cache + * Invalidates the stale configuration cache for keys. This will execute the + * invalidation on a separate thread. The thread will attempt to acquire the + * {@link LockArea#STALE_CONFIG_CACHE} lock which is why this action is + * performed on a separate thread. This way, it won't interfere with any + * cluster global locks already acquired. */ public void invalidateStaleConfigsCache() { - staleConfigsCache.invalidateAll(); + Runnable invalidationRunnable = new StaleConfigInvalidationRunnable(); + cacheInvalidationExecutor.execute(invalidationRunnable); } /** - * Invalidates cached isStale value for sch - * - * @param sch + * Invalidates the stale configuration cache for a single + * {@link ServiceComponentHost}. This will execute the invalidation on a + * separate thread. The thread will attempt to acquire the + * {@link LockArea#STALE_CONFIG_CACHE} lock which is why this action is + * performed on a separate thread. This way, it won't interfere with any + * cluster global locks already acquired. */ public void invalidateStaleConfigsCache(ServiceComponentHost sch) { - staleConfigsCache.invalidate(sch); + Runnable invalidationRunnable = new StaleConfigInvalidationRunnable(Collections.singletonList(sch)); + cacheInvalidationExecutor.execute(invalidationRunnable); } /** @@ -1279,5 +1321,52 @@ public class ConfigHelper { return filename.substring(0, extIndex); } + /** + * Invalidates the {@link ConfigHelper#staleConfigsCache} after acquiring a + * lock around {@link LockArea#STALE_CONFIG_CACHE}. It is necessary to acquire + * this lock since the event which caused the config to become stale, such as + * a new configuration being created, may not have had its transaction + * committed yet. + */ + private final class StaleConfigInvalidationRunnable implements Runnable { + + private final List<ServiceComponentHost> m_keysToInvalidate; + + /** + * Constructor. + * + */ + private StaleConfigInvalidationRunnable() { + m_keysToInvalidate = null; + } + + /** + * Constructor. + * + * @param keysToInvalidate + * the keys to invalidate in the cache. + */ + private StaleConfigInvalidationRunnable(List<ServiceComponentHost> keysToInvalidate) { + m_keysToInvalidate = keysToInvalidate; + } + + /** + * {@inheritDoc} + */ + @Override + public void run() { + ReadWriteLock lock = transactionLocks.getLock(LockArea.STALE_CONFIG_CACHE); + lock.writeLock().lock(); + try { + if (null == m_keysToInvalidate || m_keysToInvalidate.isEmpty()) { + staleConfigsCache.invalidateAll(); + } else { + staleConfigsCache.invalidateAll(m_keysToInvalidate); + } + } finally { + lock.writeLock().unlock(); + } + } + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/23aa6eac/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java index b590c56..663556c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java @@ -27,6 +27,9 @@ import java.util.Set; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ambari.annotations.TransactionalLock; +import org.apache.ambari.annotations.TransactionalLock.LockArea; +import org.apache.ambari.annotations.TransactionalLock.LockType; import org.apache.ambari.server.events.ClusterConfigChangedEvent; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.dao.ClusterDAO; @@ -356,8 +359,19 @@ public class ConfigImpl implements Config { persist(true); } + /** + * {@inheritDoc} + * <p/> + * This method will attempt a lock around the + * {@link LockArea#STALE_CONFIG_CACHE} in order to ensure that the stale + * configuration cache can only be invalidated once this transaction has been + * committed. Without this lock, it's possible that the stale config cache + * might be invalidated and then re-populated with old data before this + * transaction commits. + */ @Override @Transactional + @TransactionalLock(lockArea = LockArea.STALE_CONFIG_CACHE, lockType = LockType.WRITE) public void persist(boolean newConfig) { cluster.getClusterGlobalLock().writeLock().lock(); //null cluster is not expected, NPE anyway later in code try { http://git-wip-us.apache.org/repos/asf/ambari/blob/23aa6eac/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index 9e9128e..3d2388e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -41,9 +41,6 @@ import javax.annotation.Nullable; import javax.persistence.EntityManager; import javax.persistence.RollbackException; -import org.apache.ambari.annotations.TransactionalLock; -import org.apache.ambari.annotations.TransactionalLock.LockArea; -import org.apache.ambari.annotations.TransactionalLock.LockType; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.ConfigGroupNotFoundException; import org.apache.ambari.server.DuplicateResourceException; @@ -2798,7 +2795,6 @@ public class ClusterImpl implements Cluster { } @Transactional - @TransactionalLock(lockArea = LockArea.STALE_CONFIG_CACHE, lockType = LockType.WRITE) void selectConfig(String type, String tag, String user) { Collection<ClusterConfigMappingEntity> entities = clusterDAO.getClusterConfigMappingEntitiesByCluster(getClusterId()); @@ -2827,7 +2823,6 @@ public class ClusterImpl implements Cluster { } @Transactional - @TransactionalLock(lockArea = LockArea.STALE_CONFIG_CACHE, lockType = LockType.WRITE) ServiceConfigVersionResponse applyConfigs(Set<Config> configs, String user, String serviceConfigVersionNote) { String serviceName = null; http://git-wip-us.apache.org/repos/asf/ambari/blob/23aa6eac/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java index b4f0c52..0ff143c 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java @@ -23,6 +23,7 @@ import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -39,8 +40,6 @@ import java.util.concurrent.locks.ReentrantLock; import javax.persistence.EntityManager; -import junit.framework.Assert; - import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.RequestFactory; import org.apache.ambari.server.api.services.AmbariMetaInfo; @@ -62,6 +61,7 @@ import org.apache.ambari.server.state.configgroup.ConfigGroup; import org.apache.ambari.server.state.configgroup.ConfigGroupFactory; import org.apache.ambari.server.state.host.HostFactory; import org.apache.ambari.server.state.stack.OsFamily; +import org.apache.ambari.server.utils.SynchronousThreadPoolExecutor; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -69,13 +69,15 @@ import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.security.core.context.SecurityContextHolder; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.persist.PersistService; import com.google.inject.persist.Transactional; -import org.springframework.security.core.context.SecurityContextHolder; + +import junit.framework.Assert; @RunWith(Enclosed.class) @@ -198,6 +200,12 @@ public class ConfigHelperTest { managementController.updateClusters(new HashSet<ClusterRequest>() {{ add(clusterRequest4); }}, null); + + // instrument the asynchronous stale config invalidation to be synchronous + // so that tests pass + Field field = ConfigHelper.class.getDeclaredField("cacheInvalidationExecutor"); + field.setAccessible(true); + field.set(configHelper, new SynchronousThreadPoolExecutor()); } @After
