[GOBBLIN-405] Fix race condition with access to immediately invalidated resources
Closes #2280 from htran1/broker_cache_race Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/bde5bb1f Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/bde5bb1f Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/bde5bb1f Branch: refs/heads/0.12.0 Commit: bde5bb1f9d2eb310b6a16d52bad383eefaf0d75c Parents: 19b2d81 Author: Hung Tran <[email protected]> Authored: Wed Feb 7 10:19:45 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Wed Feb 7 10:19:45 2018 -0800 ---------------------------------------------------------------------- .../apache/gobblin/broker/ResourceEntry.java | 17 ++++ .../publisher/DataPublisherFactoryTest.java | 48 +++++++++++ .../gobblin/broker/DefaultBrokerCache.java | 87 ++++++++++++++------ .../broker/ImmediatelyInvalidResourceEntry.java | 15 +++- 4 files changed, 141 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bde5bb1f/gobblin-api/src/main/java/org/apache/gobblin/broker/ResourceEntry.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/broker/ResourceEntry.java b/gobblin-api/src/main/java/org/apache/gobblin/broker/ResourceEntry.java index 6402391..cbdebe7 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/broker/ResourceEntry.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/broker/ResourceEntry.java @@ -44,4 +44,21 @@ public interface ResourceEntry<T> extends SharedResourceFactoryResponse<T> { * key, blocking all requests for that key. As suck, this method should be reasonably fast. */ void onInvalidate(); + + /** + * This method should guarantee that if all callers accessing the resource using this method then the object is + * returned atomically with respect to any validity state change. + * + * This is to avoid race conditions in cases where the state is changed when getting the resource. Some examples are + * resources that can only be used a certain number of times. + * + * @return null if the object is not valid, otherwise the valid object + */ + default T getResourceIfValid() { + if (isValid()) { + return getResource(); + } else { + return null; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bde5bb1f/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java b/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java index b2cd739..6f58a50 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java @@ -17,9 +17,16 @@ package org.apache.gobblin.publisher; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.testng.Assert; import org.testng.annotations.Test; @@ -94,6 +101,47 @@ public class DataPublisherFactoryTest { Assert.assertTrue(publisher1.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP)); } + @Test() + public void testMultiThreadedGetNonThreadSafePublisher() + throws InterruptedException, ExecutionException, IOException { + SharedResourcesBroker broker = + SharedResourcesBrokerFactory.<SimpleScopeType>createDefaultTopLevelBroker(ConfigFactory.empty(), + SimpleScopeType.GLOBAL.defaultScopeInstance()); + + ExecutorService service = Executors.newFixedThreadPool(40); + List<Future<?>> futures = new ArrayList<>(); + + for (int i = 0; i < 100000; i++) { + futures.add(service.submit(new GetNonThreadSafePublisher(broker))); + } + + for (Future f: futures) { + f.get(); + } + service.shutdown(); + service.awaitTermination(100, TimeUnit.SECONDS); + } + + private static class GetNonThreadSafePublisher implements Runnable { + private final SharedResourcesBroker broker; + private static long count = 0; + + GetNonThreadSafePublisher(SharedResourcesBroker broker) { + this.broker = broker; + } + + @Override + public void run() { + try { + DataPublisher publisher1 = DataPublisherFactory.get(TestNonThreadsafeDataPublisher.class.getName(), null, this.broker); + Assert.assertNotNull(publisher1); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private static class TestNonThreadsafeDataPublisher extends DataPublisher { public TestNonThreadsafeDataPublisher(State state) { super(state); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bde5bb1f/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java index 6425bab..0c001f1 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java @@ -37,6 +37,7 @@ import com.google.common.util.concurrent.Striped; import org.apache.gobblin.broker.iface.ScopeType; import org.apache.gobblin.broker.iface.SharedResourceFactory; +import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse; import org.apache.gobblin.broker.iface.SharedResourceKey; import org.apache.gobblin.broker.iface.NoSuchScopeException; @@ -98,42 +99,78 @@ class DefaultBrokerCache<S extends ScopeType<S>> { } /** - * Get an object for the specified factory, key, scope, and broker. {@link DefaultBrokerCache} - * guarantees that calling this method for the same factory, key, and scope will return the same object. + * Get a scoped object from the cache. */ @SuppressWarnings(value = "unchecked") - <T, K extends SharedResourceKey> T getScoped(final SharedResourceFactory<T, K, S> factory, @Nonnull final K key, + <T, K extends SharedResourceKey> SharedResourceFactoryResponse<T> getScopedFromCache( + final SharedResourceFactory<T, K, S> factory, @Nonnull final K key, @Nonnull final ScopeWrapper<S> scope, final SharedResourcesBrokerImpl<S> broker) throws ExecutionException { - RawJobBrokerKey fullKey = new RawJobBrokerKey(scope, factory.getName(), key); Object obj = this.sharedResourceCache.get(fullKey, new Callable<Object>() { @Override public Object call() throws Exception { - return factory.createResource(broker.getScopedView(scope.getType()), broker.getConfigView(scope.getType(), key, factory.getName())); + return factory.createResource(broker.getScopedView(scope.getType()), broker.getConfigView(scope.getType(), key, + factory.getName())); } }); - if (obj instanceof ResourceCoordinate) { - ResourceCoordinate<T, K, S> resourceCoordinate = (ResourceCoordinate<T, K, S>) obj; - if (!SharedResourcesBrokerUtils.isScopeTypeAncestor((ScopeType) scope.getType(), ((ResourceCoordinate) obj).getScope())) { - throw new RuntimeException(String.format("%s returned an invalid coordinate: scope %s is not an ancestor of %s.", - factory.getName(), ((ResourceCoordinate) obj).getScope(), scope.getType())); - } - try { - return getScoped(resourceCoordinate.getFactory(), resourceCoordinate.getKey(), - broker.getWrappedScope(resourceCoordinate.getScope()), broker); - } catch (NoSuchScopeException nsse) { - throw new RuntimeException(String.format("%s returned an invalid coordinate: scope %s is not available.", - factory.getName(), resourceCoordinate.getScope().name()), nsse); - } - } else if (obj instanceof ResourceEntry) { - if (!((ResourceEntry) obj).isValid()) { - safeInvalidate(fullKey); - return getScoped(factory, key, scope, broker); + + return (SharedResourceFactoryResponse<T>)obj; + } + + /** + * Get an object for the specified factory, key, scope, and broker. {@link DefaultBrokerCache} + * guarantees that calling this method for the same factory, key, and scope will return the same object. + */ + @SuppressWarnings(value = "unchecked") + <T, K extends SharedResourceKey> T getScoped(final SharedResourceFactory<T, K, S> factory, @Nonnull final K key, + @Nonnull final ScopeWrapper<S> scope, final SharedResourcesBrokerImpl<S> broker) + throws ExecutionException { + SharedResourceFactory<T, K, S> currentFactory = factory; + K currentKey = key; + ScopeWrapper<S> currentScope = scope; + + Object obj = getScopedFromCache(currentFactory, currentKey, currentScope, broker); + + // this loop is to continue looking up objects through redirection or reloading until a valid resource is found + while (true) { + if (obj instanceof ResourceCoordinate) { + ResourceCoordinate<T, K, S> resourceCoordinate = (ResourceCoordinate<T, K, S>) obj; + if (!SharedResourcesBrokerUtils.isScopeTypeAncestor((ScopeType) currentScope.getType(), ((ResourceCoordinate) obj).getScope())) { + throw new RuntimeException(String + .format("%s returned an invalid coordinate: scope %s is not an ancestor of %s.", currentFactory.getName(), + ((ResourceCoordinate) obj).getScope(), currentScope.getType())); + } + try { + obj = getScopedFromCache(resourceCoordinate.getFactory(), resourceCoordinate.getKey(), + broker.getWrappedScope(resourceCoordinate.getScope()), broker); + } catch (NoSuchScopeException nsse) { + throw new RuntimeException(String + .format("%s returned an invalid coordinate: scope %s is not available.", factory.getName(), + resourceCoordinate.getScope().name()), nsse); + } + } else if (obj instanceof ResourceEntry) { + T resource = ((ResourceEntry<T>) obj).getResourceIfValid(); + + // valid resource found + if (resource != null) { + return resource; + } + + // resource is invalid. The lock in this block is to reduce the chance of starvation where a thread keeps + // getting objects that are invalidated by another thread. + Lock lock = this.invalidationLock.get(key); + try { + lock.lock(); + RawJobBrokerKey fullKey = new RawJobBrokerKey(currentScope, currentFactory.getName(), currentKey); + safeInvalidate(fullKey); + obj = getScopedFromCache(currentFactory, currentKey, currentScope, broker); + } finally { + lock.unlock(); + } + } else { + throw new RuntimeException(String.format("Invalid response from %s: %s.", factory.getName(), obj.getClass())); } - return ((ResourceEntry<T>) obj).getResource(); - } else { - throw new RuntimeException(String.format("Invalid response from %s: %s.", factory.getName(), obj.getClass())); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bde5bb1f/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java index b3f8502..ccb569c 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java @@ -36,7 +36,7 @@ public class ImmediatelyInvalidResourceEntry<T> extends ResourceInstance<T> { } @Override - public T getResource() { + public synchronized T getResource() { // mark the object as invalid before returning so that a new one will be created on the next // request from the factory this.valid = false; @@ -53,4 +53,17 @@ public class ImmediatelyInvalidResourceEntry<T> extends ResourceInstance<T> { public void onInvalidate() { // these type of resource cannot be closed on invalidation since the lifetime can't be determined } + + /** + * This method is synchronized so that the validity check and validity change is atomic for callers of this method. + * @return + */ + @Override + public synchronized T getResourceIfValid() { + if (this.valid) { + return getResource(); + } else { + return null; + } + } }
