This is an automated email from the ASF dual-hosted git repository.

cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0600772  use a non-concurrent map for lookups-cached-global unless 
incremental updates are actually required (#12293)
0600772 is described below

commit 0600772cceec633a7754a68d6b4ff5ed21af8795
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Mar 8 21:54:25 2022 -0800

    use a non-concurrent map for lookups-cached-global unless incremental 
updates are actually required (#12293)
    
    * use a non-concurrent map for lookups-cached-global unless incremental 
updates are actually required
    * adjustments
    * fix test
---
 .../query/lookup/KafkaLookupExtractorFactory.java  |   4 +-
 .../query/lookup/namespace/CacheGenerator.java     |  15 +-
 .../lookup/namespace/JdbcCacheGenerator.java       |  27 +-
 .../lookup/namespace/StaticMapCacheGenerator.java  |  12 +-
 .../server/lookup/namespace/UriCacheGenerator.java |  12 +-
 .../lookup/namespace/cache/CacheHandler.java       |  13 +-
 .../server/lookup/namespace/cache/CacheProxy.java  |  43 ----
 .../lookup/namespace/cache/CacheScheduler.java     |  60 ++---
 .../cache/NamespaceExtractionCacheManager.java     |  39 +--
 .../OffHeapNamespaceExtractionCacheManager.java    |  52 +++-
 .../OnHeapNamespaceExtractionCacheManager.java     |  35 ++-
 .../lookup/namespace/JdbcCacheGeneratorTest.java   |  13 +-
 .../namespace/NamespacedExtractorModuleTest.java   |  17 +-
 .../namespace/StaticMapCacheGeneratorTest.java     |  22 +-
 .../lookup/namespace/UriCacheGeneratorTest.java    | 139 +++++------
 .../lookup/namespace/cache/CacheSchedulerTest.java |  29 +--
 .../cache/JdbcExtractionNamespaceTest.java         | 272 +++++++++------------
 .../NamespaceExtractionCacheManagersTest.java      |  25 +-
 .../OnHeapNamespaceExtractionCacheManagerTest.java |  34 ++-
 19 files changed, 412 insertions(+), 451 deletions(-)

diff --git 
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
 
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
index a50402a..f3d2c1e 100644
--- 
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
+++ 
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
@@ -53,7 +53,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -154,8 +153,9 @@ public class KafkaLookupExtractorFactory implements 
LookupExtractorFactory
 
       final String topic = getKafkaTopic();
       LOG.debug("About to listen to topic [%s] with group.id [%s]", topic, 
factoryId);
+      // this creates a ConcurrentMap
       cacheHandler = cacheManager.createCache();
-      final ConcurrentMap<String, String> map = cacheHandler.getCache();
+      final Map<String, String> map = cacheHandler.getCache();
       mapRef.set(map);
 
 
diff --git 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/CacheGenerator.java
 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/CacheGenerator.java
index 243d990..280e03e 100644
--- 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/CacheGenerator.java
+++ 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/CacheGenerator.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.query.lookup.namespace;
 
+import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
 import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
 
 import javax.annotation.Nullable;
@@ -31,23 +32,21 @@ public interface CacheGenerator<T extends 
ExtractionNamespace>
   /**
    * If the lookup source, encapsulated by this {@code CacheGenerator}, has 
data newer than identified
    * by the given {@code lastVersion} (which is null at the first run of this 
method, or the version from the previous
-   * run), this method creates a new {@code CacheScheduler.VersionedCache} 
with {@link
-   * CacheScheduler#createVersionedCache}, called on the given {@code 
scheduler}, with the version string identifying
-   * the current version of lookup source, populates the created {@code 
VersionedCache} and returns it. If the lookup
-   * source is up-to-date, this methods returns null.
+   * run), this method populates a {@link CacheHandler} and returns the 
version string identifying the current version
+   * of lookup source, If the lookup source is up-to-date, this methods 
returns null.
    *
    * @param namespace The ExtractionNamespace for which to generate cache.
    * @param id An object uniquely corresponding to the {@link 
CacheScheduler.Entry}, for which this generateCache()
    *           method is called. Also it has the same toString() 
representation, that is useful for logging
    * @param lastVersion The version which was last cached
-   * @param scheduler Should be used only to call {@link 
CacheScheduler#createVersionedCache}.
-   * @return the new cache along with the new version, or null if the last 
version is up-to-date.
+   * @param cache a cache to populate
+   * @return the new version, or null if the last version is up-to-date.
    */
   @Nullable
-  CacheScheduler.VersionedCache generateCache(
+  String generateCache(
       T namespace,
       CacheScheduler.EntryImpl<T> id,
       String lastVersion,
-      CacheScheduler scheduler
+      CacheHandler cache
   ) throws Exception;
 }
diff --git 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/JdbcCacheGenerator.java
 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/JdbcCacheGenerator.java
index 3a73eee..ba35a5f 100644
--- 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/JdbcCacheGenerator.java
+++ 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/JdbcCacheGenerator.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.lookup.namespace.CacheGenerator;
 import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
+import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
 import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
 import org.apache.druid.utils.JvmUtils;
 import org.skife.jdbi.v2.DBI;
@@ -38,7 +39,6 @@ import org.skife.jdbi.v2.util.TimestampMapper;
 
 import javax.annotation.Nullable;
 import java.sql.Timestamp;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -57,11 +57,11 @@ public final class JdbcCacheGenerator implements 
CacheGenerator<JdbcExtractionNa
 
   @Override
   @Nullable
-  public CacheScheduler.VersionedCache generateCache(
+  public String generateCache(
       final JdbcExtractionNamespace namespace,
       final CacheScheduler.EntryImpl<JdbcExtractionNamespace> entryId,
       final String lastVersion,
-      final CacheScheduler scheduler
+      final CacheHandler cache
   )
   {
     final long lastCheck = lastVersion == null ? JodaUtils.MIN_INSTANT : 
Long.parseLong(lastVersion);
@@ -76,10 +76,7 @@ public final class JdbcCacheGenerator implements 
CacheGenerator<JdbcExtractionNa
     }
     catch (UnableToObtainConnectionException e) {
       if (e.getMessage().contains(NO_SUITABLE_DRIVER_FOUND_ERROR)) {
-        throw new ISE(
-            e,
-            JDBC_DRIVER_JAR_FILES_MISSING_ERROR
-        );
+        throw new ISE(e, JDBC_DRIVER_JAR_FILES_MISSING_ERROR);
       } else {
         throw e;
       }
@@ -94,16 +91,15 @@ public final class JdbcCacheGenerator implements 
CacheGenerator<JdbcExtractionNa
     } else {
       newVersion = StringUtils.format("%d", dbQueryStart);
     }
-    final CacheScheduler.VersionedCache versionedCache = 
scheduler.createVersionedCache(entryId, newVersion);
 
     final long startNs = System.nanoTime();
     try (
         Handle handle = getHandle(entryId, namespace);
-        ResultIterator<Pair<String, String>> pairs = getLookupPairs(handle, 
namespace)) {
-      final Map<String, String> cache = versionedCache.getCache();
+        ResultIterator<Pair<String, String>> pairs = getLookupPairs(handle, 
namespace)
+    ) {
       final MapPopulator.PopulateResult populateResult = 
MapPopulator.populateAndWarnAtByteLimit(
           pairs,
-          cache,
+          cache.getCache(),
           (long) (MAX_MEMORY * namespace.getMaxHeapPercentage() / 100.0),
           null == entryId ? null : entryId.toString()
       );
@@ -115,21 +111,18 @@ public final class JdbcCacheGenerator implements 
CacheGenerator<JdbcExtractionNa
           entryId,
           duration
       );
-      return versionedCache;
+      return newVersion;
     }
     catch (UnableToObtainConnectionException e) {
       if (e.getMessage().contains(NO_SUITABLE_DRIVER_FOUND_ERROR)) {
-        throw new ISE(
-            e,
-            JDBC_DRIVER_JAR_FILES_MISSING_ERROR
-        );
+        throw new ISE(e, JDBC_DRIVER_JAR_FILES_MISSING_ERROR);
       } else {
         throw e;
       }
     }
     catch (Throwable t) {
       try {
-        versionedCache.close();
+        cache.close();
       }
       catch (Exception e) {
         t.addSuppressed(e);
diff --git 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/StaticMapCacheGenerator.java
 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/StaticMapCacheGenerator.java
index f291ae9..999f77c 100644
--- 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/StaticMapCacheGenerator.java
+++ 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/StaticMapCacheGenerator.java
@@ -21,6 +21,7 @@ package org.apache.druid.server.lookup.namespace;
 
 import org.apache.druid.query.lookup.namespace.CacheGenerator;
 import org.apache.druid.query.lookup.namespace.StaticMapExtractionNamespace;
+import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
 import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
 
 import javax.annotation.Nullable;
@@ -32,11 +33,11 @@ public final class StaticMapCacheGenerator implements 
CacheGenerator<StaticMapEx
 
   @Override
   @Nullable
-  public CacheScheduler.VersionedCache generateCache(
+  public String generateCache(
       final StaticMapExtractionNamespace namespace,
       final CacheScheduler.EntryImpl<StaticMapExtractionNamespace> id,
       final String lastVersion,
-      final CacheScheduler scheduler
+      final CacheHandler cache
   )
   {
     if (lastVersion != null) {
@@ -46,14 +47,13 @@ public final class StaticMapCacheGenerator implements 
CacheGenerator<StaticMapEx
           "StaticMapCacheGenerator could only be configured for a namespace 
which is scheduled "
           + "to be updated once, not periodically. Last version: `" + 
lastVersion + "`");
     }
-    CacheScheduler.VersionedCache versionedCache = 
scheduler.createVersionedCache(id, version);
     try {
-      versionedCache.getCache().putAll(namespace.getMap());
-      return versionedCache;
+      cache.getCache().putAll(namespace.getMap());
+      return version;
     }
     catch (Throwable t) {
       try {
-        versionedCache.close();
+        cache.close();
       }
       catch (Exception e) {
         t.addSuppressed(e);
diff --git 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/UriCacheGenerator.java
 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/UriCacheGenerator.java
index f774cec..3c10ef6 100644
--- 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/UriCacheGenerator.java
+++ 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/UriCacheGenerator.java
@@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.lookup.namespace.CacheGenerator;
 import org.apache.druid.query.lookup.namespace.UriExtractionNamespace;
 import org.apache.druid.segment.loading.URIDataPuller;
+import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
 import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
 import org.apache.druid.utils.CompressionUtils;
 import org.apache.druid.utils.JvmUtils;
@@ -62,11 +63,11 @@ public final class UriCacheGenerator implements 
CacheGenerator<UriExtractionName
 
   @Override
   @Nullable
-  public CacheScheduler.VersionedCache generateCache(
+  public String generateCache(
       final UriExtractionNamespace extractionNamespace,
       final CacheScheduler.EntryImpl<UriExtractionNamespace> entryId,
       @Nullable final String lastVersion,
-      final CacheScheduler scheduler
+      final CacheHandler cache
   ) throws Exception
   {
     final boolean doSearch = extractionNamespace.getUriPrefix() != null;
@@ -143,14 +144,13 @@ public final class UriCacheGenerator implements 
CacheGenerator<UriExtractionName
             }
           };
 
-          final CacheScheduler.VersionedCache versionedCache = 
scheduler.createVersionedCache(entryId, version);
           try {
             final long startNs = System.nanoTime();
             final MapPopulator.PopulateResult populateResult = new 
MapPopulator<>(
                 extractionNamespace.getNamespaceParseSpec().getParser()
             ).populateAndWarnAtByteLimit(
                 source,
-                versionedCache.getCache(),
+                cache.getCache(),
                 (long) (MAX_MEMORY * 
extractionNamespace.getMaxHeapPercentage() / 100.0),
                 null == entryId ? null : entryId.toString()
             );
@@ -163,11 +163,11 @@ public final class UriCacheGenerator implements 
CacheGenerator<UriExtractionName
                 entryId,
                 duration
             );
-            return versionedCache;
+            return version;
           }
           catch (Throwable t) {
             try {
-              versionedCache.close();
+              cache.close();
             }
             catch (Exception e) {
               t.addSuppressed(e);
diff --git 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheHandler.java
 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheHandler.java
index 567a944..58a4a99 100644
--- 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheHandler.java
+++ 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheHandler.java
@@ -21,17 +21,18 @@ package org.apache.druid.server.lookup.namespace.cache;
 
 import org.apache.druid.java.util.common.logger.Logger;
 
-import java.util.concurrent.ConcurrentMap;
+import java.io.Closeable;
+import java.util.Map;
 
-public final class CacheHandler implements AutoCloseable
+public final class CacheHandler implements AutoCloseable, Closeable
 {
   private static final Logger log = new Logger(CacheHandler.class);
 
   private final NamespaceExtractionCacheManager cacheManager;
-  private final ConcurrentMap<String, String> cache;
+  private final Map<String, String> cache;
   final Object id;
 
-  CacheHandler(NamespaceExtractionCacheManager cacheManager, 
ConcurrentMap<String, String> cache, Object id)
+  CacheHandler(NamespaceExtractionCacheManager cacheManager, Map<String, 
String> cache, Object id)
   {
     log.debug("Creating %s", super.toString());
     this.cacheManager = cacheManager;
@@ -39,7 +40,7 @@ public final class CacheHandler implements AutoCloseable
     this.id = id;
   }
 
-  public ConcurrentMap<String, String> getCache()
+  public Map<String, String> getCache()
   {
     return cache;
   }
@@ -48,7 +49,7 @@ public final class CacheHandler implements AutoCloseable
   public void close()
   {
     cacheManager.disposeCache(this);
-    // Log statement after disposeCache(), because logging may fail (e. g. in 
shutdown hooks)
+    // Log statement after disposeCache(), because logging may fail (e.g. in 
shutdown hooks)
     log.debug("Closed %s", super.toString());
   }
 }
diff --git 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheProxy.java
 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheProxy.java
deleted file mode 100644
index de23853..0000000
--- 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheProxy.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.lookup.namespace.cache;
-
-import com.google.common.collect.ForwardingConcurrentMap;
-
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Used in {@link OffHeapNamespaceExtractionCacheManager#createCache()}
- */
-final class CacheProxy extends ForwardingConcurrentMap<String, String>
-{
-  private final ConcurrentMap<String, String> delegate;
-
-  CacheProxy(ConcurrentMap<String, String> delegate)
-  {
-    this.delegate = delegate;
-  }
-
-  @Override
-  protected ConcurrentMap<String, String> delegate()
-  {
-    return delegate;
-  }
-}
diff --git 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
index fb2e229..43d85d1 100644
--- 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
+++ 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
@@ -145,7 +145,7 @@ public final class CacheScheduler
   {
     private final T namespace;
     private final String asString;
-    private final AtomicReference<CacheState> cacheStateHolder = new 
AtomicReference<CacheState>(NoCache.CACHE_NOT_INITIALIZED);
+    private final AtomicReference<CacheState> cacheStateHolder = new 
AtomicReference<>(NoCache.CACHE_NOT_INITIALIZED);
     private final Future<?> updaterFuture;
     private final Cleaners.Cleanable entryCleanable;
     private final CacheGenerator<T> cacheGenerator;
@@ -169,27 +169,13 @@ public final class CacheScheduler
 
     private Cleaners.Cleanable createCleaner(Entry<T> entry)
     {
-      return Cleaners.register(entry, new Runnable()
-      {
-        @Override
-        public void run()
-        {
-          closeFromCleaner();
-        }
-      });
+      return Cleaners.register(entry, this::closeFromCleaner);
     }
 
     private Future<?> schedule(final T namespace)
     {
       final long updateMs = namespace.getPollMs();
-      Runnable command = new Runnable()
-      {
-        @Override
-        public void run()
-        {
-          updateCache();
-        }
-      };
+      Runnable command = this::updateCache;
       if (updateMs > 0) {
         return 
cacheManager.scheduledExecutorService().scheduleAtFixedRate(command, 0, 
updateMs, TimeUnit.MILLISECONDS);
       } else {
@@ -224,12 +210,20 @@ public final class CacheScheduler
     private void tryUpdateCache(String currentVersion) throws Exception
     {
       boolean updatedCacheSuccessfully = false;
-      VersionedCache newVersionedCache = null;
+      CacheHandler newCache = null;
       try {
-        newVersionedCache = cacheGenerator.generateCache(namespace, this, 
currentVersion, CacheScheduler.this
+        updatesStarted.incrementAndGet();
+        newCache = CacheScheduler.this.cacheManager.allocateCache();
+        final String newVersion = cacheGenerator.generateCache(
+            namespace,
+            this,
+            currentVersion,
+            newCache
         );
-        if (newVersionedCache != null) {
-          CacheState previousCacheState = swapCacheState(newVersionedCache);
+        if (newVersion != null) {
+          newCache = cacheManager.attachCache(newCache);
+          final VersionedCache newVersionedCache = new 
VersionedCache(String.valueOf(this), newVersion, newCache);
+          final CacheState previousCacheState = 
swapCacheState(newVersionedCache);
           if (previousCacheState != NoCache.ENTRY_CLOSED) {
             updatedCacheSuccessfully = true;
             if (previousCacheState instanceof VersionedCache) {
@@ -246,8 +240,8 @@ public final class CacheScheduler
       }
       catch (Throwable t) {
         try {
-          if (newVersionedCache != null && !updatedCacheSuccessfully) {
-            newVersionedCache.close();
+          if (newCache != null && !updatedCacheSuccessfully) {
+            newCache.close();
           }
           log.error(t, "Failed to update %s", this);
         }
@@ -381,16 +375,16 @@ public final class CacheScheduler
     ENTRY_CLOSED
   }
 
-  public final class VersionedCache implements CacheState, AutoCloseable
+  public static final class VersionedCache implements CacheState, AutoCloseable
   {
     final String entryId;
     final CacheHandler cacheHandler;
     final String version;
 
-    private VersionedCache(String entryId, String version)
+    private VersionedCache(String entryId, String version, CacheHandler cache)
     {
       this.entryId = entryId;
-      this.cacheHandler = cacheManager.createCache();
+      this.cacheHandler = cache;
       this.version = version;
     }
 
@@ -458,20 +452,6 @@ public final class CacheScheduler
     );
   }
 
-  /**
-   * This method should be used from {@link CacheGenerator#generateCache} 
implementations, to obtain a {@link
-   * VersionedCache} to be returned.
-   *
-   * @param entryId an object uniquely corresponding to the {@link 
CacheScheduler.Entry}, for which VersionedCache is
-   *                created
-   * @param version version, associated with the cache
-   */
-  public VersionedCache createVersionedCache(@Nullable EntryImpl<? extends 
ExtractionNamespace> entryId, String version)
-  {
-    updatesStarted.incrementAndGet();
-    return new VersionedCache(String.valueOf(entryId), version);
-  }
-
   @VisibleForTesting
   long updatesStarted()
   {
diff --git 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java
 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java
index 867822c..268a193 100644
--- 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java
+++ 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java
@@ -61,24 +61,20 @@ public abstract class NamespaceExtractionCacheManager
     );
     ExecutorServices.manageLifecycle(lifecycle, scheduledExecutorService);
     scheduledExecutorService.scheduleAtFixedRate(
-        new Runnable()
-        {
-          @Override
-          public void run()
-          {
-            try {
-              monitor(serviceEmitter);
-            }
-            catch (Exception e) {
-              log.error(e, "Error emitting namespace stats");
-              if (Thread.currentThread().isInterrupted()) {
-                throw new RuntimeException(e);
-              }
+        () -> {
+          try {
+            monitor(serviceEmitter);
+          }
+          catch (Exception e) {
+            log.error(e, "Error emitting namespace stats");
+            if (Thread.currentThread().isInterrupted()) {
+              throw new RuntimeException(e);
             }
           }
         },
         1,
-        10, TimeUnit.MINUTES
+        10,
+        TimeUnit.MINUTES
     );
   }
 
@@ -99,8 +95,23 @@ public abstract class NamespaceExtractionCacheManager
     return scheduledExecutorService.awaitTermination(time, unit);
   }
 
+  /**
+   * Creates a thread-safe, permanently mutable cache. Use this method if you 
need a cache which can be incrementally
+   * updated, indefinitely, at the cost of the overhead of synchronization
+   */
   public abstract CacheHandler createCache();
 
+  /**
+   * Create a cache that is intended to be populated before use, and then 
passed to {@link #attachCache(CacheHandler)}.
+   */
+  public abstract CacheHandler allocateCache();
+
+  /**
+   * Attach a cache created with {@link #allocateCache()} to the cache 
manager. The cache return from this method
+   * should be treated as read-only (though some implmentations may still 
allow modification, it is not safe to do so).
+   */
+  public abstract CacheHandler attachCache(CacheHandler cache);
+
   abstract void disposeCache(CacheHandler cacheHandler);
 
   abstract int cacheCount();
diff --git 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManager.java
 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManager.java
index 51a0f79..419182d 100644
--- 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManager.java
+++ 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManager.java
@@ -20,6 +20,7 @@
 package org.apache.druid.server.lookup.namespace.cache;
 
 import com.google.common.base.Throwables;
+import com.google.common.collect.ForwardingConcurrentMap;
 import com.google.inject.Inject;
 import org.apache.druid.java.util.common.Cleaners;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
@@ -54,11 +55,13 @@ public class OffHeapNamespaceExtractionCacheManager extends 
NamespaceExtractionC
      *
      * <p>In case of actual race, we don't wait in those methods until the 
other one, which manages to switch this flag
      * first, completes. This could result into the situation that neither one 
completes, if the JVM is shutting down
-     * and the thread from which {@link Cleaners.Cleanable#clean()} 
(delegating to {@link #run()}) is called started the disposal
-     * operation, then more deterministic shutdown hook / lifecycle.stop(), 
which may call {@link #disposeManually()}
-     * completed early, and then the whole process shuts down before {@link 
Cleaners.Cleanable#clean()} completes, because shutdown
-     * is not blocked by it. However this should be harmless because anyway we 
remove the whole MapDB's file in
-     * lifecycle.stop() (see {@link 
OffHeapNamespaceExtractionCacheManager#OffHeapNamespaceExtractionCacheManager}).
+     * and the thread from which {@link Cleaners.Cleanable#clean()} 
(delegating to {@link #run()}) is called started
+     * the disposal operation, then more deterministic shutdown hook / 
lifecycle.stop(), which may call
+     * {@link #disposeManually()} completed early, and then the whole process 
shuts down before
+     * {@link Cleaners.Cleanable#clean()} completes, because shutdown is not 
blocked by it. However this should be
+     * harmless because anyway we remove the whole MapDB's file in 
lifecycle.stop()
+     * (see {@link 
OffHeapNamespaceExtractionCacheManager#OffHeapNamespaceExtractionCacheManager}).
+     *
      * However if we persist off-heap DB between JVM runs, this decision 
should be revised.
      */
     final AtomicBoolean disposed = new AtomicBoolean(false);
@@ -69,8 +72,8 @@ public class OffHeapNamespaceExtractionCacheManager extends 
NamespaceExtractionC
     }
 
     /**
-     * To be called by the JVM via {@link Cleaners.Cleanable#clean()}. The 
only difference from {@link #disposeManually()} is
-     * exception treatment.
+     * To be called by the JVM via {@link Cleaners.Cleanable#clean()}. The 
only difference from
+     * {@link #disposeManually()} is exception treatment.
      */
     @Override
     public void run()
@@ -127,6 +130,22 @@ public class OffHeapNamespaceExtractionCacheManager 
extends NamespaceExtractionC
     }
   }
 
+  private static final class CacheProxy extends 
ForwardingConcurrentMap<String, String>
+  {
+    private final ConcurrentMap<String, String> delegate;
+
+    CacheProxy(ConcurrentMap<String, String> delegate)
+    {
+      this.delegate = delegate;
+    }
+
+    @Override
+    protected ConcurrentMap<String, String> delegate()
+    {
+      return delegate;
+    }
+  }
+
   private final DB mmapDB;
   private final File tmpFile;
   private AtomicLong mapDbKeyCounter = new AtomicLong(0);
@@ -208,9 +227,9 @@ public class OffHeapNamespaceExtractionCacheManager extends 
NamespaceExtractionC
       }
     }
     MapDbCacheDisposer cacheDisposer = new MapDbCacheDisposer(mapDbKey);
-    // Cleaner is "the second level of defence". Normally all users of 
createCache() must call disposeCache() with
-    // the returned CacheHandler instance manually. But if they don't do this 
for whatever reason, JVM will cleanup
-    // the cache itself.
+    // Cleaner is "the second level of defence". Normally all users of 
createMutableCache() must call disposeCache()
+    // with the returned CacheHandler instance manually. But if they don't do 
this for whatever reason, JVM will
+    // cleanup the cache itself.
     Cleaners.Cleanable cleanable = Cleaners.register(cache, cacheDisposer);
     MapDbCacheDisposerAndCleaner disposerAndCleaner = new 
MapDbCacheDisposerAndCleaner(
         cacheDisposer,
@@ -220,6 +239,19 @@ public class OffHeapNamespaceExtractionCacheManager 
extends NamespaceExtractionC
   }
 
   @Override
+  public CacheHandler allocateCache()
+  {
+    return createCache();
+  }
+
+  @Override
+  public CacheHandler attachCache(CacheHandler cache)
+  {
+    // nothing to do, allocate is create, no specialized implementation for 
populate then read-only pattern
+    return cache;
+  }
+
+  @Override
   void disposeCache(CacheHandler cacheHandler)
   {
     MapDbCacheDisposerAndCleaner disposerAndCleaner = 
(MapDbCacheDisposerAndCleaner) cacheHandler.id;
diff --git 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManager.java
 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManager.java
index 9c6871e..287d927 100644
--- 
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManager.java
+++ 
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManager.java
@@ -29,6 +29,7 @@ import 
org.apache.druid.server.lookup.namespace.NamespaceExtractionConfig;
 
 import java.lang.ref.WeakReference;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
@@ -50,9 +51,7 @@ public class OnHeapNamespaceExtractionCacheManager extends 
NamespaceExtractionCa
    * <p>{@link WeakReference} doesn't override Object's identity equals() and 
hashCode(), so effectively this map plays
    * like concurrent {@link java.util.IdentityHashMap}.
    */
-  private final Set<WeakReference<ConcurrentMap<String, String>>> caches = 
Collections.newSetFromMap(
-      new ConcurrentHashMap<WeakReference<ConcurrentMap<String, String>>, 
Boolean>()
-  );
+  private final Set<WeakReference<Map<String, String>>> caches = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
 
   @Inject
   public OnHeapNamespaceExtractionCacheManager(
@@ -66,7 +65,7 @@ public class OnHeapNamespaceExtractionCacheManager extends 
NamespaceExtractionCa
 
   private void expungeCollectedCaches()
   {
-    for (Iterator<WeakReference<ConcurrentMap<String, String>>> iterator = 
caches.iterator(); iterator.hasNext(); ) {
+    for (Iterator<WeakReference<Map<String, String>>> iterator = 
caches.iterator(); iterator.hasNext(); ) {
       WeakReference<?> cacheRef = iterator.next();
       if (cacheRef.get() == null) {
         // This may not necessarily mean leak of CacheHandler, because 
disposeCache() may be called concurrently with
@@ -85,13 +84,35 @@ public class OnHeapNamespaceExtractionCacheManager extends 
NamespaceExtractionCa
   public CacheHandler createCache()
   {
     ConcurrentMap<String, String> cache = new ConcurrentHashMap<>();
-    WeakReference<ConcurrentMap<String, String>> cacheRef = new 
WeakReference<>(cache);
+    WeakReference<Map<String, String>> cacheRef = new WeakReference<>(cache);
     expungeCollectedCaches();
     caches.add(cacheRef);
     return new CacheHandler(this, cache, cacheRef);
   }
 
   @Override
+  public CacheHandler allocateCache()
+  {
+    Map<String, String> cache = new HashMap<>();
+    // untracked, but disposing will explode if we don't create a weak 
reference here
+    return new CacheHandler(this, cache, new WeakReference<>(cache));
+  }
+
+  @Override
+  public CacheHandler attachCache(CacheHandler cache)
+  {
+    if (caches.contains((WeakReference<Map<String, String>>) cache.id)) {
+      throw new ISE("cache [%s] is already attached", cache.id);
+    }
+    // this cache is not thread-safe, make sure nothing ever writes to it
+    Map<String, String> immutable = 
Collections.unmodifiableMap(cache.getCache());
+    WeakReference<Map<String, String>> cacheRef = new 
WeakReference<>(immutable);
+    expungeCollectedCaches();
+    caches.add(cacheRef);
+    return new CacheHandler(this, immutable, cacheRef);
+  }
+
+  @Override
   void disposeCache(CacheHandler cacheHandler)
   {
     if (!(cacheHandler.id instanceof WeakReference)) {
@@ -113,8 +134,8 @@ public class OnHeapNamespaceExtractionCacheManager extends 
NamespaceExtractionCa
     long numEntries = 0;
     long size = 0;
     expungeCollectedCaches();
-    for (WeakReference<ConcurrentMap<String, String>> cacheRef : caches) {
-      final ConcurrentMap<String, String> cache = cacheRef.get();
+    for (WeakReference<Map<String, String>> cacheRef : caches) {
+      final Map<String, String> cache = cacheRef.get();
       if (cache == null) {
         continue;
       }
diff --git 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
index 2f6463a..ff27b50 100644
--- 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
+++ 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
@@ -26,6 +26,7 @@ import 
org.apache.druid.metadata.MetadataStorageConnectorConfig;
 import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
 import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
 import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
+import 
org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
 import 
org.apache.druid.server.lookup.namespace.cache.OnHeapNamespaceExtractionCacheManager;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.easymock.EasyMock;
@@ -50,10 +51,16 @@ public class JdbcCacheGeneratorTest
 
   private static final ServiceEmitter SERVICE_EMITTER = new 
NoopServiceEmitter();
 
+  private static final NamespaceExtractionCacheManager CACHE_MANAGER = new 
OnHeapNamespaceExtractionCacheManager(
+      new Lifecycle(),
+      SERVICE_EMITTER,
+      new NamespaceExtractionConfig()
+  );
+
   private static final CacheScheduler SCHEDULER = new CacheScheduler(
       SERVICE_EMITTER,
       Collections.emptyMap(),
-      new OnHeapNamespaceExtractionCacheManager(new Lifecycle(), 
SERVICE_EMITTER, new NamespaceExtractionConfig())
+      CACHE_MANAGER
   );
 
   private static final String LAST_VERSION = "1";
@@ -84,7 +91,7 @@ public class JdbcCacheGeneratorTest
     exception.expect(IllegalStateException.class);
     exception.expectMessage(MISSING_JDB_DRIVER_JAR_MSG);
 
-    target.generateCache(missingJarNamespace, KEY, LAST_VERSION, SCHEDULER);
+    target.generateCache(missingJarNamespace, KEY, LAST_VERSION, 
CACHE_MANAGER.allocateCache());
   }
 
   @Test
@@ -100,7 +107,7 @@ public class JdbcCacheGeneratorTest
     exception.expect(IllegalStateException.class);
     exception.expectMessage(MISSING_JDB_DRIVER_JAR_MSG);
 
-    target.generateCache(missingJarNamespace, KEY, LAST_VERSION, SCHEDULER);
+    target.generateCache(missingJarNamespace, KEY, LAST_VERSION, 
CACHE_MANAGER.allocateCache());
   }
 
   @SuppressWarnings("SameParameterValue")
diff --git 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/NamespacedExtractorModuleTest.java
 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/NamespacedExtractorModuleTest.java
index 8118783..715da7b 100644
--- 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/NamespacedExtractorModuleTest.java
+++ 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/NamespacedExtractorModuleTest.java
@@ -30,7 +30,9 @@ import 
org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
 import org.apache.druid.query.lookup.namespace.UriExtractionNamespace;
 import org.apache.druid.query.lookup.namespace.UriExtractionNamespaceTest;
 import org.apache.druid.segment.loading.LocalFileTimestampVersionFinder;
+import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
 import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
+import 
org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
 import 
org.apache.druid.server.lookup.namespace.cache.OnHeapNamespaceExtractionCacheManager;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.joda.time.Period;
@@ -57,6 +59,7 @@ public class NamespacedExtractorModuleTest
 
   @Rule
   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+  private NamespaceExtractionCacheManager cacheManager;
 
   @Before
   public void setUp() throws Exception
@@ -75,10 +78,15 @@ public class NamespacedExtractorModuleTest
     lifecycle = new Lifecycle();
     lifecycle.start();
     NoopServiceEmitter noopServiceEmitter = new NoopServiceEmitter();
+    cacheManager = new OnHeapNamespaceExtractionCacheManager(
+        lifecycle,
+        noopServiceEmitter,
+        new NamespaceExtractionConfig()
+    );
     scheduler = new CacheScheduler(
         noopServiceEmitter,
         factoryMap,
-        new OnHeapNamespaceExtractionCacheManager(lifecycle, 
noopServiceEmitter, new NamespaceExtractionConfig())
+        cacheManager
     );
   }
 
@@ -108,9 +116,10 @@ public class NamespacedExtractorModuleTest
         null,
         null
     );
-    CacheScheduler.VersionedCache versionedCache = 
factory.generateCache(namespace, null, null, scheduler);
-    Assert.assertNotNull(versionedCache);
-    Map<String, String> map = versionedCache.getCache();
+    CacheHandler cache = cacheManager.allocateCache();
+    String version = factory.generateCache(namespace, null, null, cache);
+    Assert.assertNotNull(version);
+    Map<String, String> map = cache.getCache();
     Assert.assertEquals("bar", map.get("foo"));
     Assert.assertEquals(null, map.get("baz"));
   }
diff --git 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/StaticMapCacheGeneratorTest.java
 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/StaticMapCacheGeneratorTest.java
index bd88d4c..30687e4 100644
--- 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/StaticMapCacheGeneratorTest.java
+++ 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/StaticMapCacheGeneratorTest.java
@@ -22,7 +22,9 @@ package org.apache.druid.server.lookup.namespace;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.query.lookup.namespace.StaticMapExtractionNamespace;
+import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
 import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
+import 
org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
 import 
org.apache.druid.server.lookup.namespace.cache.OnHeapNamespaceExtractionCacheManager;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.junit.After;
@@ -39,6 +41,7 @@ public class StaticMapCacheGeneratorTest
 
   private Lifecycle lifecycle;
   private CacheScheduler scheduler;
+  private NamespaceExtractionCacheManager cacheManager;
 
   @Before
   public void setup() throws Exception
@@ -46,10 +49,15 @@ public class StaticMapCacheGeneratorTest
     lifecycle = new Lifecycle();
     lifecycle.start();
     NoopServiceEmitter noopServiceEmitter = new NoopServiceEmitter();
+    cacheManager = new OnHeapNamespaceExtractionCacheManager(
+        lifecycle,
+        noopServiceEmitter,
+        new NamespaceExtractionConfig()
+    );
     scheduler = new CacheScheduler(
         noopServiceEmitter,
         Collections.emptyMap(),
-        new OnHeapNamespaceExtractionCacheManager(lifecycle, 
noopServiceEmitter, new NamespaceExtractionConfig())
+        cacheManager
     );
   }
 
@@ -64,10 +72,11 @@ public class StaticMapCacheGeneratorTest
   {
     final StaticMapCacheGenerator factory = new StaticMapCacheGenerator();
     final StaticMapExtractionNamespace namespace = new 
StaticMapExtractionNamespace(MAP);
-    CacheScheduler.VersionedCache versionedCache = 
factory.generateCache(namespace, null, null, scheduler);
-    Assert.assertNotNull(versionedCache);
-    Assert.assertEquals(factory.getVersion(), versionedCache.getVersion());
-    Assert.assertEquals(MAP, versionedCache.getCache());
+    CacheHandler cache = cacheManager.allocateCache();
+    final String version = factory.generateCache(namespace, null, null, cache);
+    Assert.assertNotNull(version);
+    Assert.assertEquals(factory.getVersion(), version);
+    Assert.assertEquals(MAP, cache.getCache());
 
   }
 
@@ -76,6 +85,7 @@ public class StaticMapCacheGeneratorTest
   {
     final StaticMapCacheGenerator factory = new StaticMapCacheGenerator();
     final StaticMapExtractionNamespace namespace = new 
StaticMapExtractionNamespace(MAP);
-    factory.generateCache(namespace, null, factory.getVersion(), scheduler);
+    CacheHandler cache = cacheManager.allocateCache();
+    factory.generateCache(namespace, null, factory.getVersion(), cache);
   }
 }
diff --git 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/UriCacheGeneratorTest.java
 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/UriCacheGeneratorTest.java
index 4846f89..58ef2dc 100644
--- 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/UriCacheGeneratorTest.java
+++ 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/UriCacheGeneratorTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.query.lookup.namespace.UriExtractionNamespace;
 import org.apache.druid.query.lookup.namespace.UriExtractionNamespaceTest;
 import org.apache.druid.segment.loading.LocalFileTimestampVersionFinder;
+import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
 import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
 import org.apache.druid.server.lookup.namespace.cache.CacheSchedulerTest;
 import 
org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
@@ -170,72 +171,64 @@ public class UriCacheGeneratorTest
     );
 
     final List<Function<Lifecycle, NamespaceExtractionCacheManager>> 
cacheManagerCreators = ImmutableList.of(
-        new Function<Lifecycle, NamespaceExtractionCacheManager>()
-        {
-          @Override
-          public NamespaceExtractionCacheManager apply(Lifecycle lifecycle)
-          {
-            return new OnHeapNamespaceExtractionCacheManager(
-                lifecycle,
-                new NoopServiceEmitter(),
-                new NamespaceExtractionConfig()
-            );
-          }
-        },
-        new Function<Lifecycle, NamespaceExtractionCacheManager>()
-        {
-          @Override
-          public NamespaceExtractionCacheManager apply(Lifecycle lifecycle)
-          {
-            return new OffHeapNamespaceExtractionCacheManager(
-                lifecycle,
-                new NoopServiceEmitter(),
-                new NamespaceExtractionConfig()
-            );
-          }
-        }
+        lifecycle -> new OnHeapNamespaceExtractionCacheManager(
+            lifecycle,
+            new NoopServiceEmitter(),
+            new NamespaceExtractionConfig()
+        ),
+        lifecycle -> new OffHeapNamespaceExtractionCacheManager(
+            lifecycle,
+            new NoopServiceEmitter(),
+            new NamespaceExtractionConfig()
+        )
     );
-    return new Iterable<Object[]>()
+    return () -> new Iterator<Object[]>()
     {
+      Iterator<Object[]> compressionIt = compressionParams.iterator();
+      Iterator<Function<Lifecycle, NamespaceExtractionCacheManager>> 
cacheManagerCreatorsIt =
+          cacheManagerCreators.iterator();
+      Object[] compressions = compressionIt.next();
+
       @Override
-      public Iterator<Object[]> iterator()
+      public boolean hasNext()
       {
-        return new Iterator<Object[]>()
-        {
-          Iterator<Object[]> compressionIt = compressionParams.iterator();
-          Iterator<Function<Lifecycle, NamespaceExtractionCacheManager>> 
cacheManagerCreatorsIt =
-              cacheManagerCreators.iterator();
-          Object[] compressions = compressionIt.next();
-
-          @Override
-          public boolean hasNext()
-          {
-            return compressionIt.hasNext() || cacheManagerCreatorsIt.hasNext();
-          }
+        return compressionIt.hasNext() || cacheManagerCreatorsIt.hasNext();
+      }
 
-          @Override
-          public Object[] next()
-          {
-            if (cacheManagerCreatorsIt.hasNext()) {
-              Function<Lifecycle, NamespaceExtractionCacheManager> 
cacheManagerCreator = cacheManagerCreatorsIt.next();
-              return new Object[]{compressions[0], compressions[1], 
cacheManagerCreator};
-            } else {
-              cacheManagerCreatorsIt = cacheManagerCreators.iterator();
-              compressions = compressionIt.next();
-              return next();
-            }
-          }
+      @Override
+      public Object[] next()
+      {
+        if (cacheManagerCreatorsIt.hasNext()) {
+          Function<Lifecycle, NamespaceExtractionCacheManager> 
cacheManagerCreator = cacheManagerCreatorsIt.next();
+          return new Object[]{compressions[0], compressions[1], 
cacheManagerCreator};
+        } else {
+          cacheManagerCreatorsIt = cacheManagerCreators.iterator();
+          compressions = compressionIt.next();
+          return next();
+        }
+      }
 
-          @Override
-          public void remove()
-          {
-            throw new UOE("Cannot remove");
-          }
-        };
+      @Override
+      public void remove()
+      {
+        throw new UOE("Cannot remove");
       }
     };
   }
 
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private final String suffix;
+
+  private final Function<File, OutputStream> outStreamSupplier;
+  private final Lifecycle lifecycle;
+  private final NamespaceExtractionCacheManager cacheManager;
+  private final CacheScheduler scheduler;
+  private File tmpFile;
+  private UriCacheGenerator generator;
+  private UriExtractionNamespace namespace;
+
   public UriCacheGeneratorTest(
       String suffix,
       Function<File, OutputStream> outStreamSupplier,
@@ -245,30 +238,19 @@ public class UriCacheGeneratorTest
     this.suffix = suffix;
     this.outStreamSupplier = outStreamSupplier;
     this.lifecycle = new Lifecycle();
+    this.cacheManager = cacheManagerCreator.apply(lifecycle);
     this.scheduler = new CacheScheduler(
         new NoopServiceEmitter(),
         ImmutableMap.of(UriExtractionNamespace.class, new 
UriCacheGenerator(FINDERS)),
-        cacheManagerCreator.apply(lifecycle)
+        cacheManager
     );
   }
 
-  @Rule
-  public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-  private final String suffix;
-  private final Function<File, OutputStream> outStreamSupplier;
-  private Lifecycle lifecycle;
-  private CacheScheduler scheduler;
-  private File tmpFile;
-  private File tmpFileParent;
-  private UriCacheGenerator generator;
-  private UriExtractionNamespace namespace;
-
   @Before
   public void setUp() throws Exception
   {
     lifecycle.start();
-    tmpFileParent = new File(temporaryFolder.newFolder(), "☃");
+    File tmpFileParent = new File(temporaryFolder.newFolder(), "☃");
     Assert.assertTrue(tmpFileParent.mkdir());
     Assert.assertTrue(tmpFileParent.isDirectory());
     tmpFile = Files.createTempFile(tmpFileParent.toPath(), 
"druidTestURIExtractionNS", suffix).toFile();
@@ -372,15 +354,16 @@ public class UriCacheGeneratorTest
   {
     Assert.assertEquals(0, scheduler.getActiveEntries());
 
-    CacheScheduler.VersionedCache versionedCache = 
generator.generateCache(namespace, null, null, scheduler);
-    Assert.assertNotNull(versionedCache);
-    Map<String, String> map = versionedCache.getCache();
+    CacheHandler cache = cacheManager.allocateCache();
+    String newVersion = generator.generateCache(namespace, null, null, cache);
+    Assert.assertNotNull(newVersion);
+    Map<String, String> map = cache.getCache();
     Assert.assertEquals("bar", map.get("foo"));
     Assert.assertEquals(null, map.get("baz"));
-    String version = versionedCache.getVersion();
+    String version = newVersion;
     Assert.assertNotNull(version);
 
-    Assert.assertNull(generator.generateCache(namespace, null, version, 
scheduler));
+    Assert.assertNull(generator.generateCache(namespace, null, version, 
cacheManager.allocateCache()));
   }
 
   @Test(expected = FileNotFoundException.class)
@@ -395,7 +378,7 @@ public class UriCacheGeneratorTest
         null
     );
     Assert.assertTrue(new File(namespace.getUri()).delete());
-    generator.generateCache(badNamespace, null, null, scheduler);
+    generator.generateCache(badNamespace, null, null, 
cacheManager.allocateCache());
   }
 
   @Test(expected = FileNotFoundException.class)
@@ -411,7 +394,7 @@ public class UriCacheGeneratorTest
         null
     );
     Assert.assertTrue(new File(namespace.getUri()).delete());
-    generator.generateCache(badNamespace, null, null, scheduler);
+    generator.generateCache(badNamespace, null, null, 
cacheManager.allocateCache());
   }
 
   @Test(expected = IAE.class)
@@ -504,7 +487,7 @@ public class UriCacheGeneratorTest
         null,
         null
     );
-    Assert.assertNotNull(generator.generateCache(extractionNamespace, null, 
null, scheduler));
+    Assert.assertNotNull(generator.generateCache(extractionNamespace, null, 
null, cacheManager.allocateCache()));
   }
 
   @Test(timeout = 60_000L)
diff --git 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
index ce2f257..51412d7 100644
--- 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
+++ 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
@@ -54,7 +54,6 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Map;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -134,24 +133,12 @@ public class CacheSchedulerTest
     lifecycle.start();
     cacheManager = createCacheManager.apply(lifecycle);
     final Path tmpDir = temporaryFolder.newFolder().toPath();
-    final CacheGenerator<UriExtractionNamespace> cacheGenerator = new
-        CacheGenerator<UriExtractionNamespace>()
-    {
-      @Override
-      public CacheScheduler.VersionedCache generateCache(
-          final UriExtractionNamespace extractionNamespace,
-          final CacheScheduler.EntryImpl<UriExtractionNamespace> id,
-          final String lastVersion,
-          final CacheScheduler scheduler
-      ) throws InterruptedException
-      {
-        Thread.sleep(2); // To make absolutely sure there is a unique 
currentTimeMillis
-        String version = Long.toString(System.currentTimeMillis());
-        CacheScheduler.VersionedCache versionedCache = 
scheduler.createVersionedCache(id, version);
-        // Don't actually read off disk because TravisCI doesn't like that
-        versionedCache.getCache().put(KEY, VALUE);
-        return versionedCache;
-      }
+    final CacheGenerator<UriExtractionNamespace> cacheGenerator = 
(extractionNamespace, id, lastVersion, cache) -> {
+      Thread.sleep(2); // To make absolutely sure there is a unique 
currentTimeMillis
+      String version = Long.toString(System.currentTimeMillis());
+      // Don't actually read off disk because TravisCI doesn't like that
+      cache.getCache().put(KEY, VALUE);
+      return version;
     };
     scheduler = new CacheScheduler(
         new NoopServiceEmitter(),
@@ -193,9 +180,7 @@ public class CacheSchedulerTest
     );
     CacheScheduler.Entry entry = scheduler.schedule(namespace);
     waitFor(entry);
-    Map<String, String> cache = entry.getCache();
-    Assert.assertNull(cache.put("key", "val"));
-    Assert.assertEquals("val", cache.get("key"));
+    Assert.assertEquals(VALUE, entry.getCache().get(KEY));
   }
 
   @Test(timeout = 60_000L)
diff --git 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
index c583b34..b6a3724 100644
--- 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
+++ 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
@@ -56,7 +56,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -126,160 +125,117 @@ public class JdbcExtractionNamespaceTest
     setupTeardownService =
         MoreExecutors.listeningDecorator(Execs.multiThreaded(2, 
"JDBCExtractionNamespaceTeardown--%s"));
     final ListenableFuture<Handle> setupFuture = setupTeardownService.submit(
-        new Callable<Handle>()
-        {
-          @Override
-          public Handle call()
-          {
-            final Handle handle = 
derbyConnectorRule.getConnector().getDBI().open();
-            Assert.assertEquals(
-                0,
-                handle.createStatement(
-                    StringUtils.format(
-                        "CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s 
VARCHAR(64), %s VARCHAR(64))",
-                        TABLE_NAME,
-                        TS_COLUMN,
-                        FILTER_COLUMN,
-                        KEY_NAME,
-                        VAL_NAME
-                    )
-                ).setQueryTimeout(1).execute()
-            );
-            handle.createStatement(StringUtils.format("TRUNCATE TABLE %s", 
TABLE_NAME)).setQueryTimeout(1).execute();
-            handle.commit();
-            closer.register(new Closeable()
-            {
-              @Override
-              public void close() throws IOException
-              {
-                handle.createStatement("DROP TABLE " + 
TABLE_NAME).setQueryTimeout(1).execute();
-                final ListenableFuture future = 
setupTeardownService.submit(new Runnable()
-                {
-                  @Override
-                  public void run()
-                  {
-                    handle.close();
-                  }
-                });
-                try (Closeable closeable = new Closeable()
-                {
-                  @Override
-                  public void close()
-                  {
-                    future.cancel(true);
-                  }
-                }) {
-                  future.get(10, TimeUnit.SECONDS);
-                }
-                catch (InterruptedException | ExecutionException | 
TimeoutException e) {
-                  throw new IOException("Error closing handle", e);
-                }
-              }
-            });
-            closer.register(new Closeable()
+        () -> {
+          final Handle handle = 
derbyConnectorRule.getConnector().getDBI().open();
+          Assert.assertEquals(
+              0,
+              handle.createStatement(
+                  StringUtils.format(
+                      "CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s 
VARCHAR(64), %s VARCHAR(64))",
+                      TABLE_NAME,
+                      TS_COLUMN,
+                      FILTER_COLUMN,
+                      KEY_NAME,
+                      VAL_NAME
+                  )
+              ).setQueryTimeout(1).execute()
+          );
+          handle.createStatement(StringUtils.format("TRUNCATE TABLE %s", 
TABLE_NAME)).setQueryTimeout(1).execute();
+          handle.commit();
+          closer.register(() -> {
+            handle.createStatement("DROP TABLE " + 
TABLE_NAME).setQueryTimeout(1).execute();
+            final ListenableFuture future = setupTeardownService.submit(new 
Runnable()
             {
               @Override
-              public void close()
+              public void run()
               {
-                if (scheduler == null) {
-                  return;
-                }
-                Assert.assertEquals(0, scheduler.getActiveEntries());
+                handle.close();
               }
             });
-            for (Map.Entry<String, String[]> entry : RENAMES.entrySet()) {
-              try {
-                String key = entry.getKey();
-                String value = entry.getValue()[0];
-                String filter = entry.getValue()[1];
-                insertValues(handle, key, value, filter, "2015-01-01 
00:00:00");
-              }
-              catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new RuntimeException(e);
-              }
+            try (Closeable ignored = () -> future.cancel(true)) {
+              future.get(10, TimeUnit.SECONDS);
             }
+            catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+              throw new IOException("Error closing handle", e);
+            }
+          });
+          closer.register(() -> {
+            if (scheduler == null) {
+              return;
+            }
+            Assert.assertEquals(0, scheduler.getActiveEntries());
+          });
+          for (Map.Entry<String, String[]> entry : RENAMES.entrySet()) {
+            try {
+              String key = entry.getKey();
+              String value = entry.getValue()[0];
+              String filter = entry.getValue()[1];
+              insertValues(handle, key, value, filter, "2015-01-01 00:00:00");
+            }
+            catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new RuntimeException(e);
+            }
+          }
 
-            NoopServiceEmitter noopServiceEmitter = new NoopServiceEmitter();
-            scheduler = new CacheScheduler(
-                noopServiceEmitter,
-                ImmutableMap.of(
-                    JdbcExtractionNamespace.class,
-                    new CacheGenerator<JdbcExtractionNamespace>()
+          NoopServiceEmitter noopServiceEmitter = new NoopServiceEmitter();
+          scheduler = new CacheScheduler(
+              noopServiceEmitter,
+              ImmutableMap.of(
+                  JdbcExtractionNamespace.class,
+                  new CacheGenerator<JdbcExtractionNamespace>()
+                  {
+                    private final JdbcCacheGenerator delegate =
+                        new JdbcCacheGenerator();
+
+                    @Override
+                    public String generateCache(
+                        final JdbcExtractionNamespace namespace,
+                        final 
CacheScheduler.EntryImpl<JdbcExtractionNamespace> id,
+                        final String lastVersion,
+                        final CacheHandler cache
+                    ) throws InterruptedException
                     {
-                      private final JdbcCacheGenerator delegate =
-                          new JdbcCacheGenerator();
-
-                      @Override
-                      public CacheScheduler.VersionedCache generateCache(
-                          final JdbcExtractionNamespace namespace,
-                          final 
CacheScheduler.EntryImpl<JdbcExtractionNamespace> id,
-                          final String lastVersion,
-                          final CacheScheduler scheduler
-                      ) throws InterruptedException
-                      {
-                        updateLock.lockInterruptibly();
+                      updateLock.lockInterruptibly();
+                      try {
+                        log.debug("Running cache generator");
                         try {
-                          log.debug("Running cache generator");
-                          try {
-                            return delegate.generateCache(namespace, id, 
lastVersion, scheduler);
-                          }
-                          finally {
-                            updates.incrementAndGet();
-                          }
+                          return delegate.generateCache(namespace, id, 
lastVersion, cache);
                         }
                         finally {
-                          updateLock.unlock();
+                          updates.incrementAndGet();
                         }
                       }
-                    }
-                ),
-                new OnHeapNamespaceExtractionCacheManager(
-                    lifecycle,
-                    noopServiceEmitter,
-                    new NamespaceExtractionConfig()
-                )
-            );
-            try {
-              lifecycle.start();
-            }
-            catch (Exception e) {
-              throw new RuntimeException(e);
-            }
-            closer.register(
-                new Closeable()
-                {
-                  @Override
-                  public void close() throws IOException
-                  {
-                    final ListenableFuture future = 
setupTeardownService.submit(
-                        new Runnable()
-                        {
-                          @Override
-                          public void run()
-                          {
-                            lifecycle.stop();
-                          }
-                        }
-                    );
-                    try (final Closeable closeable = new Closeable()
-                    {
-                      @Override
-                      public void close()
-                      {
-                        future.cancel(true);
+                      finally {
+                        updateLock.unlock();
                       }
-                    }) {
-                      future.get(30, TimeUnit.SECONDS);
-                    }
-                    catch (InterruptedException | ExecutionException | 
TimeoutException e) {
-                      throw new IOException("Error stopping lifecycle", e);
                     }
                   }
-                }
-            );
-            return handle;
+              ),
+              new OnHeapNamespaceExtractionCacheManager(
+                  lifecycle,
+                  noopServiceEmitter,
+                  new NamespaceExtractionConfig()
+              )
+          );
+          try {
+            lifecycle.start();
           }
+          catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+          closer.register(
+              () -> {
+                final ListenableFuture future = setupTeardownService.submit(() 
-> lifecycle.stop());
+                try (final Closeable ignored = () -> future.cancel(true)) {
+                  future.get(30, TimeUnit.SECONDS);
+                }
+                catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+                  throw new IOException("Error stopping lifecycle", e);
+                }
+              }
+          );
+          return handle;
         }
     );
 
@@ -293,36 +249,26 @@ public class JdbcExtractionNamespaceTest
   public void tearDown() throws InterruptedException, ExecutionException, 
TimeoutException, IOException
   {
     final ListenableFuture<?> tearDownFuture = setupTeardownService.submit(
-        new Runnable()
-        {
-          @Override
-          public void run()
-          {
-            try {
-              closer.close();
-            }
-            catch (IOException e) {
-              throw new RuntimeException(e);
-            }
+        () -> {
+          try {
+            closer.close();
           }
-        }
-    );
-    try (final Closeable closeable = new Closeable()
-    {
-      @Override
-      public void close() throws IOException
-      {
-        setupTeardownService.shutdownNow();
-        try {
-          if (!setupTeardownService.awaitTermination(60, TimeUnit.SECONDS)) {
-            log.error("Tear down service didn't finish");
+          catch (IOException e) {
+            throw new RuntimeException(e);
           }
         }
-        catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new IOException("Interrupted", e);
+    );
+    try (final Closeable ignored = () -> {
+      setupTeardownService.shutdownNow();
+      try {
+        if (!setupTeardownService.awaitTermination(60, TimeUnit.SECONDS)) {
+          log.error("Tear down service didn't finish");
         }
       }
+      catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException("Interrupted", e);
+      }
     }) {
       tearDownFuture.get(60, TimeUnit.SECONDS);
     }
diff --git 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManagersTest.java
 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManagersTest.java
index 6342bf5..e497f18 100644
--- 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManagersTest.java
+++ 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManagersTest.java
@@ -82,21 +82,16 @@ public class NamespaceExtractionCacheManagersTest
     try {
       for (int i = 0; i < concurrentThreads; ++i) {
         futures.add(service.submit(
-            new Runnable()
-            {
-              @Override
-              public void run()
-              {
-                try {
-                  thunder.await();
-                }
-                catch (InterruptedException e) {
-                  throw new RuntimeException(e);
-                }
-                for (int i = 0; i < 1000; ++i) {
-                  CacheHandler cacheHandler = manager.createCache();
-                  cacheHandler.close();
-                }
+            () -> {
+              try {
+                thunder.await();
+              }
+              catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              for (int i1 = 0; i1 < 1000; ++i1) {
+                CacheHandler cacheHandler = manager.createCache();
+                cacheHandler.close();
               }
             }
         ));
diff --git 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManagerTest.java
 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManagerTest.java
index 1210620..758f2ce 100644
--- 
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManagerTest.java
+++ 
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.guice.GuiceInjectors;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.initialization.Initialization;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.lookup.namespace.NamespaceExtractionModule;
 import org.junit.Assert;
@@ -40,6 +41,37 @@ public class OnHeapNamespaceExtractionCacheManagerTest
   @Test
   public void testInjection()
   {
+    final NamespaceExtractionCacheManager manager = getCacheManager();
+    Assert.assertEquals(OnHeapNamespaceExtractionCacheManager.class, 
manager.getClass());
+  }
+
+  @Test
+  public void testImmutableAfterAttach()
+  {
+    NamespaceExtractionCacheManager manager = getCacheManager();
+    CacheHandler handler = manager.allocateCache();
+    handler.getCache().put("some key", "some value");
+    CacheHandler immutableHandler = manager.attachCache(handler);
+    Assert.assertThrows(
+        UnsupportedOperationException.class,
+        () -> immutableHandler.getCache().put("other key", "other value")
+    );
+  }
+
+  @Test
+  public void testIllegalToDoubleAttach()
+  {
+    NamespaceExtractionCacheManager manager = getCacheManager();
+    CacheHandler handler = manager.createCache();
+    handler.getCache().put("some key", "some value");
+    Assert.assertThrows(
+        ISE.class,
+        () -> manager.attachCache(handler)
+    );
+  }
+
+  private NamespaceExtractionCacheManager getCacheManager()
+  {
     final Injector injector = Initialization.makeInjectorWithModules(
         GuiceInjectors.makeStartupInjector(),
         ImmutableList.of(
@@ -61,6 +93,6 @@ public class OnHeapNamespaceExtractionCacheManagerTest
     properties.clear();
     properties.put(NamespaceExtractionModule.TYPE_PREFIX, "onHeap");
     final NamespaceExtractionCacheManager manager = 
injector.getInstance(NamespaceExtractionCacheManager.class);
-    Assert.assertEquals(OnHeapNamespaceExtractionCacheManager.class, 
manager.getClass());
+    return manager;
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to