Repository: ambari
Updated Branches:
  refs/heads/branch-2.4 bd1247fce -> b02400061


AMBARI-17106 - Deadlock While Updating Stale Configuration Cache During Upgrade 
(jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b0240006
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b0240006
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b0240006

Branch: refs/heads/branch-2.4
Commit: b02400061522ef559f8d34022ee7a57b2d9e43ed
Parents: bd1247f
Author: Jonathan Hurley <[email protected]>
Authored: Tue Jun 7 23:44:22 2016 -0400
Committer: Jonathan Hurley <[email protected]>
Committed: Wed Jun 8 11:03:03 2016 -0400

----------------------------------------------------------------------
 .../ambari/server/state/ConfigHelper.java       | 133 ++++++++++++++++---
 .../apache/ambari/server/state/ConfigImpl.java  |  14 ++
 .../server/state/cluster/ClusterImpl.java       |   5 -
 .../ambari/server/state/ConfigHelperTest.java   |  14 +-
 4 files changed, 136 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/b0240006/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java 
b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
index 1bd0ba9..4feba62 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
@@ -29,6 +29,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 
@@ -51,6 +54,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.google.inject.persist.Transactional;
@@ -68,6 +72,18 @@ public class ConfigHelper {
   public static final String CLUSTER_DEFAULT_TAG = "tag";
   private final boolean STALE_CONFIGS_CACHE_ENABLED;
   private final int STALE_CONFIGS_CACHE_EXPIRATION_TIME;
+
+  /**
+   * A cache of which {@link ServiceComponentHost}s have stale configurations.
+   * This cache may be invalidated within the context of a running transaction
+   * which could potentially cause old data to be re-cached before the
+   * transaction has completed.
+   * <p/>
+   * As a result, all invalidations of this cache should be done on a separate
+   * thread using the {@link LockArea#STALE_CONFIG_CACHE}.
+   *
+   * @see #cacheInvalidationExecutor
+   */
   private final Cache<ServiceComponentHost, Boolean> staleConfigsCache;
 
   private static final Logger LOG =
@@ -97,6 +113,25 @@ public class ConfigHelper {
   public static final String FIRST_VERSION_TAG = "version1";
 
   /**
+   * A {@link ThreadFactory} for the {@link #cacheInvalidationExecutor}.
+   */
+  private final ThreadFactory cacheInvalidationThreadFactory = new 
ThreadFactoryBuilder().setNameFormat(
+      "ambari-stale-config-invalidator-%d").setDaemon(true).build();
+
+  /**
+   * Used to invalidate the {@link #staleConfigsCache} on a separate thread.
+   * This helps ensure that the {@link TransactionalLock} /
+   * {@link LockArea#STALE_CONFIG_CACHE} doesn't cause a deadlock with the
+   * "cluster global lock".
+   * <p/>
+   * Cache invalidations must happen after the transaction which invoked them
+   * has completed, otherwise it's possible to cache invalid data before the
+   * transaction is committed.
+   */
+  private final ExecutorService cacheInvalidationExecutor = 
Executors.newSingleThreadExecutor(
+      cacheInvalidationThreadFactory);
+
+  /**
    * Used to ensure that methods which rely on the completion of
    * {@link Transactional} can detect when they are able to run.
    *
@@ -449,34 +484,32 @@ public class ConfigHelper {
     }
 
     if (stale == null) {
-      ReadWriteLock lock = 
transactionLocks.getLock(LockArea.STALE_CONFIG_CACHE);
-      lock.readLock().lock();
-
-      try {
-        stale = calculateIsStaleConfigs(sch, desiredConfigs);
-        staleConfigsCache.put(sch, stale);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Cache configuration staleness for host {} and component 
{} as {}",
-              sch.getHostName(), sch.getServiceComponentName(), stale);
-        }
-      } finally {
-        lock.readLock().unlock();
+      stale = calculateIsStaleConfigs(sch, desiredConfigs);
+      staleConfigsCache.put(sch, stale);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Cache configuration staleness for host {} and component {} 
as {}",
+            sch.getHostName(), sch.getServiceComponentName(), stale);
       }
     }
     return stale;
   }
 
   /**
-   * Invalidates cached isStale values for hostname
+   * Invalidates the stale configuration cache for all
+   * {@link ServiceComponentHost}s for the given host. This will execute the
+   * invalidation on a separate thread. The thread will attempt to acquire the
+   * {@link LockArea#STALE_CONFIG_CACHE} lock which is why this action is
+   * performed on a separate thread. This way, it won't interfere with any
+   * cluster global locks already acquired.
    *
    * @param hostname
    */
   public void invalidateStaleConfigsCache(String hostname) {
     try {
       for (Cluster cluster : clusters.getClustersForHost(hostname)) {
-        for (ServiceComponentHost sch : 
cluster.getServiceComponentHosts(hostname)) {
-          invalidateStaleConfigsCache(sch);
-        }
+        List<ServiceComponentHost> serviceComponentHosts = 
cluster.getServiceComponentHosts(hostname);
+        Runnable invalidationRunnable = new 
StaleConfigInvalidationRunnable(serviceComponentHosts);
+        cacheInvalidationExecutor.execute(invalidationRunnable);
       }
     } catch (AmbariException e) {
       LOG.warn("Unable to find clusters for host " + hostname);
@@ -484,19 +517,28 @@ public class ConfigHelper {
   }
 
   /**
-   * Invalidates isStale cache
+   * Invalidates the stale configuration cache for keys. This will execute the
+   * invalidation on a separate thread. The thread will attempt to acquire the
+   * {@link LockArea#STALE_CONFIG_CACHE} lock which is why this action is
+   * performed on a separate thread. This way, it won't interfere with any
+   * cluster global locks already acquired.
    */
   public void invalidateStaleConfigsCache() {
-    staleConfigsCache.invalidateAll();
+    Runnable invalidationRunnable = new StaleConfigInvalidationRunnable();
+    cacheInvalidationExecutor.execute(invalidationRunnable);
   }
 
   /**
-   * Invalidates cached isStale value for sch
-   *
-   * @param sch
+   * Invalidates the stale configuration cache for a single
+   * {@link ServiceComponentHost}. This will execute the invalidation on a
+   * separate thread. The thread will attempt to acquire the
+   * {@link LockArea#STALE_CONFIG_CACHE} lock which is why this action is
+   * performed on a separate thread. This way, it won't interfere with any
+   * cluster global locks already acquired.
    */
   public void invalidateStaleConfigsCache(ServiceComponentHost sch) {
-    staleConfigsCache.invalidate(sch);
+    Runnable invalidationRunnable = new 
StaleConfigInvalidationRunnable(Collections.singletonList(sch));
+    cacheInvalidationExecutor.execute(invalidationRunnable);
   }
 
   /**
@@ -1279,5 +1321,52 @@ public class ConfigHelper {
     return filename.substring(0, extIndex);
   }
 
+  /**
+   * Invalidates the {@link ConfigHelper#staleConfigsCache} after acquiring a
+   * lock around {@link LockArea#STALE_CONFIG_CACHE}. It is necessary to 
acquire
+   * this lock since the event which caused the config to become stale, such as
+   * a new configuration being created, may not have had its transaction
+   * committed yet.
+   */
+  private final class StaleConfigInvalidationRunnable implements Runnable {
+
+    private final List<ServiceComponentHost> m_keysToInvalidate;
+
+    /**
+     * Constructor.
+     *
+     */
+    private StaleConfigInvalidationRunnable() {
+      m_keysToInvalidate = null;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param keysToInvalidate
+     *          the keys to invalidate in the cache.
+     */
+    private StaleConfigInvalidationRunnable(List<ServiceComponentHost> 
keysToInvalidate) {
+      m_keysToInvalidate = keysToInvalidate;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void run() {
+      ReadWriteLock lock = 
transactionLocks.getLock(LockArea.STALE_CONFIG_CACHE);
+      lock.writeLock().lock();
+      try {
+        if (null == m_keysToInvalidate || m_keysToInvalidate.isEmpty()) {
+          staleConfigsCache.invalidateAll();
+        } else {
+          staleConfigsCache.invalidateAll(m_keysToInvalidate);
+        }
+      } finally {
+        lock.writeLock().unlock();
+      }
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/b0240006/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java 
b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java
index 713b1bb..92ec843 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java
@@ -27,6 +27,9 @@ import java.util.Set;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.ambari.annotations.TransactionalLock;
+import org.apache.ambari.annotations.TransactionalLock.LockArea;
+import org.apache.ambari.annotations.TransactionalLock.LockType;
 import org.apache.ambari.server.events.ClusterConfigChangedEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
@@ -347,8 +350,19 @@ public class ConfigImpl implements Config {
     persist(true);
   }
 
+  /**
+   * {@inheritDoc}
+   * <p/>
+   * This method will attempt a lock around the
+   * {@link LockArea#STALE_CONFIG_CACHE} in order to ensure that the stale
+   * configuration cache can only be invalidated once this transaction has been
+   * committed. Without this lock, it's possible that the stale config cache
+   * might be invalidated and then re-populated with old data before this
+   * transaction commits.
+   */
   @Override
   @Transactional
+  @TransactionalLock(lockArea = LockArea.STALE_CONFIG_CACHE, lockType = 
LockType.WRITE)
   public void persist(boolean newConfig) {
     cluster.getClusterGlobalLock().writeLock().lock(); //null cluster is not 
expected, NPE anyway later in code
     try {

http://git-wip-us.apache.org/repos/asf/ambari/blob/b0240006/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index 9e9128e..3d2388e 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -41,9 +41,6 @@ import javax.annotation.Nullable;
 import javax.persistence.EntityManager;
 import javax.persistence.RollbackException;
 
-import org.apache.ambari.annotations.TransactionalLock;
-import org.apache.ambari.annotations.TransactionalLock.LockArea;
-import org.apache.ambari.annotations.TransactionalLock.LockType;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.ConfigGroupNotFoundException;
 import org.apache.ambari.server.DuplicateResourceException;
@@ -2798,7 +2795,6 @@ public class ClusterImpl implements Cluster {
   }
 
   @Transactional
-  @TransactionalLock(lockArea = LockArea.STALE_CONFIG_CACHE, lockType = 
LockType.WRITE)
   void selectConfig(String type, String tag, String user) {
     Collection<ClusterConfigMappingEntity> entities =
         clusterDAO.getClusterConfigMappingEntitiesByCluster(getClusterId());
@@ -2827,7 +2823,6 @@ public class ClusterImpl implements Cluster {
   }
 
   @Transactional
-  @TransactionalLock(lockArea = LockArea.STALE_CONFIG_CACHE, lockType = 
LockType.WRITE)
   ServiceConfigVersionResponse applyConfigs(Set<Config> configs, String user, 
String serviceConfigVersionNote) {
 
     String serviceName = null;

http://git-wip-us.apache.org/repos/asf/ambari/blob/b0240006/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java
index b4f0c52..0ff143c 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java
@@ -23,6 +23,7 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -39,8 +40,6 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import javax.persistence.EntityManager;
 
-import junit.framework.Assert;
-
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.RequestFactory;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
@@ -62,6 +61,7 @@ import org.apache.ambari.server.state.configgroup.ConfigGroup;
 import org.apache.ambari.server.state.configgroup.ConfigGroupFactory;
 import org.apache.ambari.server.state.host.HostFactory;
 import org.apache.ambari.server.state.stack.OsFamily;
+import org.apache.ambari.server.utils.SynchronousThreadPoolExecutor;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -69,13 +69,15 @@ import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.security.core.context.SecurityContextHolder;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.persist.PersistService;
 import com.google.inject.persist.Transactional;
-import org.springframework.security.core.context.SecurityContextHolder;
+
+import junit.framework.Assert;
 
 
 @RunWith(Enclosed.class)
@@ -198,6 +200,12 @@ public class ConfigHelperTest {
       managementController.updateClusters(new HashSet<ClusterRequest>() {{
         add(clusterRequest4);
       }}, null);
+
+      // instrument the asynchronous stale config invalidation to be 
synchronous
+      // so that tests pass
+      Field field = 
ConfigHelper.class.getDeclaredField("cacheInvalidationExecutor");
+      field.setAccessible(true);
+      field.set(configHelper, new SynchronousThreadPoolExecutor());
     }
 
     @After

Reply via email to