This is an automated email from the ASF dual-hosted git repository.
cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0600772 use a non-concurrent map for lookups-cached-global unless
incremental updates are actually required (#12293)
0600772 is described below
commit 0600772cceec633a7754a68d6b4ff5ed21af8795
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Mar 8 21:54:25 2022 -0800
use a non-concurrent map for lookups-cached-global unless incremental
updates are actually required (#12293)
* use a non-concurrent map for lookups-cached-global unless incremental
updates are actually required
* adjustments
* fix test
---
.../query/lookup/KafkaLookupExtractorFactory.java | 4 +-
.../query/lookup/namespace/CacheGenerator.java | 15 +-
.../lookup/namespace/JdbcCacheGenerator.java | 27 +-
.../lookup/namespace/StaticMapCacheGenerator.java | 12 +-
.../server/lookup/namespace/UriCacheGenerator.java | 12 +-
.../lookup/namespace/cache/CacheHandler.java | 13 +-
.../server/lookup/namespace/cache/CacheProxy.java | 43 ----
.../lookup/namespace/cache/CacheScheduler.java | 60 ++---
.../cache/NamespaceExtractionCacheManager.java | 39 +--
.../OffHeapNamespaceExtractionCacheManager.java | 52 +++-
.../OnHeapNamespaceExtractionCacheManager.java | 35 ++-
.../lookup/namespace/JdbcCacheGeneratorTest.java | 13 +-
.../namespace/NamespacedExtractorModuleTest.java | 17 +-
.../namespace/StaticMapCacheGeneratorTest.java | 22 +-
.../lookup/namespace/UriCacheGeneratorTest.java | 139 +++++------
.../lookup/namespace/cache/CacheSchedulerTest.java | 29 +--
.../cache/JdbcExtractionNamespaceTest.java | 272 +++++++++------------
.../NamespaceExtractionCacheManagersTest.java | 25 +-
.../OnHeapNamespaceExtractionCacheManagerTest.java | 34 ++-
19 files changed, 412 insertions(+), 451 deletions(-)
diff --git
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
index a50402a..f3d2c1e 100644
---
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
+++
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
@@ -53,7 +53,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -154,8 +153,9 @@ public class KafkaLookupExtractorFactory implements
LookupExtractorFactory
final String topic = getKafkaTopic();
LOG.debug("About to listen to topic [%s] with group.id [%s]", topic,
factoryId);
+ // this creates a ConcurrentMap
cacheHandler = cacheManager.createCache();
- final ConcurrentMap<String, String> map = cacheHandler.getCache();
+ final Map<String, String> map = cacheHandler.getCache();
mapRef.set(map);
diff --git
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/CacheGenerator.java
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/CacheGenerator.java
index 243d990..280e03e 100644
---
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/CacheGenerator.java
+++
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/CacheGenerator.java
@@ -19,6 +19,7 @@
package org.apache.druid.query.lookup.namespace;
+import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
import javax.annotation.Nullable;
@@ -31,23 +32,21 @@ public interface CacheGenerator<T extends
ExtractionNamespace>
/**
* If the lookup source, encapsulated by this {@code CacheGenerator}, has
data newer than identified
* by the given {@code lastVersion} (which is null at the first run of this
method, or the version from the previous
- * run), this method creates a new {@code CacheScheduler.VersionedCache}
with {@link
- * CacheScheduler#createVersionedCache}, called on the given {@code
scheduler}, with the version string identifying
- * the current version of lookup source, populates the created {@code
VersionedCache} and returns it. If the lookup
- * source is up-to-date, this methods returns null.
+ * run), this method populates a {@link CacheHandler} and returns the
version string identifying the current version
+ * of lookup source, If the lookup source is up-to-date, this methods
returns null.
*
* @param namespace The ExtractionNamespace for which to generate cache.
* @param id An object uniquely corresponding to the {@link
CacheScheduler.Entry}, for which this generateCache()
* method is called. Also it has the same toString()
representation, that is useful for logging
* @param lastVersion The version which was last cached
- * @param scheduler Should be used only to call {@link
CacheScheduler#createVersionedCache}.
- * @return the new cache along with the new version, or null if the last
version is up-to-date.
+ * @param cache a cache to populate
+ * @return the new version, or null if the last version is up-to-date.
*/
@Nullable
- CacheScheduler.VersionedCache generateCache(
+ String generateCache(
T namespace,
CacheScheduler.EntryImpl<T> id,
String lastVersion,
- CacheScheduler scheduler
+ CacheHandler cache
) throws Exception;
}
diff --git
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/JdbcCacheGenerator.java
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/JdbcCacheGenerator.java
index 3a73eee..ba35a5f 100644
---
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/JdbcCacheGenerator.java
+++
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/JdbcCacheGenerator.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.lookup.namespace.CacheGenerator;
import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
+import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
import org.apache.druid.utils.JvmUtils;
import org.skife.jdbi.v2.DBI;
@@ -38,7 +39,6 @@ import org.skife.jdbi.v2.util.TimestampMapper;
import javax.annotation.Nullable;
import java.sql.Timestamp;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -57,11 +57,11 @@ public final class JdbcCacheGenerator implements
CacheGenerator<JdbcExtractionNa
@Override
@Nullable
- public CacheScheduler.VersionedCache generateCache(
+ public String generateCache(
final JdbcExtractionNamespace namespace,
final CacheScheduler.EntryImpl<JdbcExtractionNamespace> entryId,
final String lastVersion,
- final CacheScheduler scheduler
+ final CacheHandler cache
)
{
final long lastCheck = lastVersion == null ? JodaUtils.MIN_INSTANT :
Long.parseLong(lastVersion);
@@ -76,10 +76,7 @@ public final class JdbcCacheGenerator implements
CacheGenerator<JdbcExtractionNa
}
catch (UnableToObtainConnectionException e) {
if (e.getMessage().contains(NO_SUITABLE_DRIVER_FOUND_ERROR)) {
- throw new ISE(
- e,
- JDBC_DRIVER_JAR_FILES_MISSING_ERROR
- );
+ throw new ISE(e, JDBC_DRIVER_JAR_FILES_MISSING_ERROR);
} else {
throw e;
}
@@ -94,16 +91,15 @@ public final class JdbcCacheGenerator implements
CacheGenerator<JdbcExtractionNa
} else {
newVersion = StringUtils.format("%d", dbQueryStart);
}
- final CacheScheduler.VersionedCache versionedCache =
scheduler.createVersionedCache(entryId, newVersion);
final long startNs = System.nanoTime();
try (
Handle handle = getHandle(entryId, namespace);
- ResultIterator<Pair<String, String>> pairs = getLookupPairs(handle,
namespace)) {
- final Map<String, String> cache = versionedCache.getCache();
+ ResultIterator<Pair<String, String>> pairs = getLookupPairs(handle,
namespace)
+ ) {
final MapPopulator.PopulateResult populateResult =
MapPopulator.populateAndWarnAtByteLimit(
pairs,
- cache,
+ cache.getCache(),
(long) (MAX_MEMORY * namespace.getMaxHeapPercentage() / 100.0),
null == entryId ? null : entryId.toString()
);
@@ -115,21 +111,18 @@ public final class JdbcCacheGenerator implements
CacheGenerator<JdbcExtractionNa
entryId,
duration
);
- return versionedCache;
+ return newVersion;
}
catch (UnableToObtainConnectionException e) {
if (e.getMessage().contains(NO_SUITABLE_DRIVER_FOUND_ERROR)) {
- throw new ISE(
- e,
- JDBC_DRIVER_JAR_FILES_MISSING_ERROR
- );
+ throw new ISE(e, JDBC_DRIVER_JAR_FILES_MISSING_ERROR);
} else {
throw e;
}
}
catch (Throwable t) {
try {
- versionedCache.close();
+ cache.close();
}
catch (Exception e) {
t.addSuppressed(e);
diff --git
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/StaticMapCacheGenerator.java
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/StaticMapCacheGenerator.java
index f291ae9..999f77c 100644
---
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/StaticMapCacheGenerator.java
+++
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/StaticMapCacheGenerator.java
@@ -21,6 +21,7 @@ package org.apache.druid.server.lookup.namespace;
import org.apache.druid.query.lookup.namespace.CacheGenerator;
import org.apache.druid.query.lookup.namespace.StaticMapExtractionNamespace;
+import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
import javax.annotation.Nullable;
@@ -32,11 +33,11 @@ public final class StaticMapCacheGenerator implements
CacheGenerator<StaticMapEx
@Override
@Nullable
- public CacheScheduler.VersionedCache generateCache(
+ public String generateCache(
final StaticMapExtractionNamespace namespace,
final CacheScheduler.EntryImpl<StaticMapExtractionNamespace> id,
final String lastVersion,
- final CacheScheduler scheduler
+ final CacheHandler cache
)
{
if (lastVersion != null) {
@@ -46,14 +47,13 @@ public final class StaticMapCacheGenerator implements
CacheGenerator<StaticMapEx
"StaticMapCacheGenerator could only be configured for a namespace
which is scheduled "
+ "to be updated once, not periodically. Last version: `" +
lastVersion + "`");
}
- CacheScheduler.VersionedCache versionedCache =
scheduler.createVersionedCache(id, version);
try {
- versionedCache.getCache().putAll(namespace.getMap());
- return versionedCache;
+ cache.getCache().putAll(namespace.getMap());
+ return version;
}
catch (Throwable t) {
try {
- versionedCache.close();
+ cache.close();
}
catch (Exception e) {
t.addSuppressed(e);
diff --git
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/UriCacheGenerator.java
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/UriCacheGenerator.java
index f774cec..3c10ef6 100644
---
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/UriCacheGenerator.java
+++
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/UriCacheGenerator.java
@@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.lookup.namespace.CacheGenerator;
import org.apache.druid.query.lookup.namespace.UriExtractionNamespace;
import org.apache.druid.segment.loading.URIDataPuller;
+import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
import org.apache.druid.utils.CompressionUtils;
import org.apache.druid.utils.JvmUtils;
@@ -62,11 +63,11 @@ public final class UriCacheGenerator implements
CacheGenerator<UriExtractionName
@Override
@Nullable
- public CacheScheduler.VersionedCache generateCache(
+ public String generateCache(
final UriExtractionNamespace extractionNamespace,
final CacheScheduler.EntryImpl<UriExtractionNamespace> entryId,
@Nullable final String lastVersion,
- final CacheScheduler scheduler
+ final CacheHandler cache
) throws Exception
{
final boolean doSearch = extractionNamespace.getUriPrefix() != null;
@@ -143,14 +144,13 @@ public final class UriCacheGenerator implements
CacheGenerator<UriExtractionName
}
};
- final CacheScheduler.VersionedCache versionedCache =
scheduler.createVersionedCache(entryId, version);
try {
final long startNs = System.nanoTime();
final MapPopulator.PopulateResult populateResult = new
MapPopulator<>(
extractionNamespace.getNamespaceParseSpec().getParser()
).populateAndWarnAtByteLimit(
source,
- versionedCache.getCache(),
+ cache.getCache(),
(long) (MAX_MEMORY *
extractionNamespace.getMaxHeapPercentage() / 100.0),
null == entryId ? null : entryId.toString()
);
@@ -163,11 +163,11 @@ public final class UriCacheGenerator implements
CacheGenerator<UriExtractionName
entryId,
duration
);
- return versionedCache;
+ return version;
}
catch (Throwable t) {
try {
- versionedCache.close();
+ cache.close();
}
catch (Exception e) {
t.addSuppressed(e);
diff --git
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheHandler.java
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheHandler.java
index 567a944..58a4a99 100644
---
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheHandler.java
+++
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheHandler.java
@@ -21,17 +21,18 @@ package org.apache.druid.server.lookup.namespace.cache;
import org.apache.druid.java.util.common.logger.Logger;
-import java.util.concurrent.ConcurrentMap;
+import java.io.Closeable;
+import java.util.Map;
-public final class CacheHandler implements AutoCloseable
+public final class CacheHandler implements AutoCloseable, Closeable
{
private static final Logger log = new Logger(CacheHandler.class);
private final NamespaceExtractionCacheManager cacheManager;
- private final ConcurrentMap<String, String> cache;
+ private final Map<String, String> cache;
final Object id;
- CacheHandler(NamespaceExtractionCacheManager cacheManager,
ConcurrentMap<String, String> cache, Object id)
+ CacheHandler(NamespaceExtractionCacheManager cacheManager, Map<String,
String> cache, Object id)
{
log.debug("Creating %s", super.toString());
this.cacheManager = cacheManager;
@@ -39,7 +40,7 @@ public final class CacheHandler implements AutoCloseable
this.id = id;
}
- public ConcurrentMap<String, String> getCache()
+ public Map<String, String> getCache()
{
return cache;
}
@@ -48,7 +49,7 @@ public final class CacheHandler implements AutoCloseable
public void close()
{
cacheManager.disposeCache(this);
- // Log statement after disposeCache(), because logging may fail (e. g. in
shutdown hooks)
+ // Log statement after disposeCache(), because logging may fail (e.g. in
shutdown hooks)
log.debug("Closed %s", super.toString());
}
}
diff --git
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheProxy.java
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheProxy.java
deleted file mode 100644
index de23853..0000000
---
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheProxy.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.lookup.namespace.cache;
-
-import com.google.common.collect.ForwardingConcurrentMap;
-
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Used in {@link OffHeapNamespaceExtractionCacheManager#createCache()}
- */
-final class CacheProxy extends ForwardingConcurrentMap<String, String>
-{
- private final ConcurrentMap<String, String> delegate;
-
- CacheProxy(ConcurrentMap<String, String> delegate)
- {
- this.delegate = delegate;
- }
-
- @Override
- protected ConcurrentMap<String, String> delegate()
- {
- return delegate;
- }
-}
diff --git
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
index fb2e229..43d85d1 100644
---
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
+++
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
@@ -145,7 +145,7 @@ public final class CacheScheduler
{
private final T namespace;
private final String asString;
- private final AtomicReference<CacheState> cacheStateHolder = new
AtomicReference<CacheState>(NoCache.CACHE_NOT_INITIALIZED);
+ private final AtomicReference<CacheState> cacheStateHolder = new
AtomicReference<>(NoCache.CACHE_NOT_INITIALIZED);
private final Future<?> updaterFuture;
private final Cleaners.Cleanable entryCleanable;
private final CacheGenerator<T> cacheGenerator;
@@ -169,27 +169,13 @@ public final class CacheScheduler
private Cleaners.Cleanable createCleaner(Entry<T> entry)
{
- return Cleaners.register(entry, new Runnable()
- {
- @Override
- public void run()
- {
- closeFromCleaner();
- }
- });
+ return Cleaners.register(entry, this::closeFromCleaner);
}
private Future<?> schedule(final T namespace)
{
final long updateMs = namespace.getPollMs();
- Runnable command = new Runnable()
- {
- @Override
- public void run()
- {
- updateCache();
- }
- };
+ Runnable command = this::updateCache;
if (updateMs > 0) {
return
cacheManager.scheduledExecutorService().scheduleAtFixedRate(command, 0,
updateMs, TimeUnit.MILLISECONDS);
} else {
@@ -224,12 +210,20 @@ public final class CacheScheduler
private void tryUpdateCache(String currentVersion) throws Exception
{
boolean updatedCacheSuccessfully = false;
- VersionedCache newVersionedCache = null;
+ CacheHandler newCache = null;
try {
- newVersionedCache = cacheGenerator.generateCache(namespace, this,
currentVersion, CacheScheduler.this
+ updatesStarted.incrementAndGet();
+ newCache = CacheScheduler.this.cacheManager.allocateCache();
+ final String newVersion = cacheGenerator.generateCache(
+ namespace,
+ this,
+ currentVersion,
+ newCache
);
- if (newVersionedCache != null) {
- CacheState previousCacheState = swapCacheState(newVersionedCache);
+ if (newVersion != null) {
+ newCache = cacheManager.attachCache(newCache);
+ final VersionedCache newVersionedCache = new
VersionedCache(String.valueOf(this), newVersion, newCache);
+ final CacheState previousCacheState =
swapCacheState(newVersionedCache);
if (previousCacheState != NoCache.ENTRY_CLOSED) {
updatedCacheSuccessfully = true;
if (previousCacheState instanceof VersionedCache) {
@@ -246,8 +240,8 @@ public final class CacheScheduler
}
catch (Throwable t) {
try {
- if (newVersionedCache != null && !updatedCacheSuccessfully) {
- newVersionedCache.close();
+ if (newCache != null && !updatedCacheSuccessfully) {
+ newCache.close();
}
log.error(t, "Failed to update %s", this);
}
@@ -381,16 +375,16 @@ public final class CacheScheduler
ENTRY_CLOSED
}
- public final class VersionedCache implements CacheState, AutoCloseable
+ public static final class VersionedCache implements CacheState, AutoCloseable
{
final String entryId;
final CacheHandler cacheHandler;
final String version;
- private VersionedCache(String entryId, String version)
+ private VersionedCache(String entryId, String version, CacheHandler cache)
{
this.entryId = entryId;
- this.cacheHandler = cacheManager.createCache();
+ this.cacheHandler = cache;
this.version = version;
}
@@ -458,20 +452,6 @@ public final class CacheScheduler
);
}
- /**
- * This method should be used from {@link CacheGenerator#generateCache}
implementations, to obtain a {@link
- * VersionedCache} to be returned.
- *
- * @param entryId an object uniquely corresponding to the {@link
CacheScheduler.Entry}, for which VersionedCache is
- * created
- * @param version version, associated with the cache
- */
- public VersionedCache createVersionedCache(@Nullable EntryImpl<? extends
ExtractionNamespace> entryId, String version)
- {
- updatesStarted.incrementAndGet();
- return new VersionedCache(String.valueOf(entryId), version);
- }
-
@VisibleForTesting
long updatesStarted()
{
diff --git
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java
index 867822c..268a193 100644
---
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java
+++
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java
@@ -61,24 +61,20 @@ public abstract class NamespaceExtractionCacheManager
);
ExecutorServices.manageLifecycle(lifecycle, scheduledExecutorService);
scheduledExecutorService.scheduleAtFixedRate(
- new Runnable()
- {
- @Override
- public void run()
- {
- try {
- monitor(serviceEmitter);
- }
- catch (Exception e) {
- log.error(e, "Error emitting namespace stats");
- if (Thread.currentThread().isInterrupted()) {
- throw new RuntimeException(e);
- }
+ () -> {
+ try {
+ monitor(serviceEmitter);
+ }
+ catch (Exception e) {
+ log.error(e, "Error emitting namespace stats");
+ if (Thread.currentThread().isInterrupted()) {
+ throw new RuntimeException(e);
}
}
},
1,
- 10, TimeUnit.MINUTES
+ 10,
+ TimeUnit.MINUTES
);
}
@@ -99,8 +95,23 @@ public abstract class NamespaceExtractionCacheManager
return scheduledExecutorService.awaitTermination(time, unit);
}
+ /**
+ * Creates a thread-safe, permanently mutable cache. Use this method if you
need a cache which can be incrementally
+ * updated, indefinitely, at the cost of the overhead of synchronization
+ */
public abstract CacheHandler createCache();
+ /**
+ * Create a cache that is intended to be populated before use, and then
passed to {@link #attachCache(CacheHandler)}.
+ */
+ public abstract CacheHandler allocateCache();
+
+ /**
+ * Attach a cache created with {@link #allocateCache()} to the cache
manager. The cache return from this method
+ * should be treated as read-only (though some implmentations may still
allow modification, it is not safe to do so).
+ */
+ public abstract CacheHandler attachCache(CacheHandler cache);
+
abstract void disposeCache(CacheHandler cacheHandler);
abstract int cacheCount();
diff --git
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManager.java
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManager.java
index 51a0f79..419182d 100644
---
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManager.java
+++
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManager.java
@@ -20,6 +20,7 @@
package org.apache.druid.server.lookup.namespace.cache;
import com.google.common.base.Throwables;
+import com.google.common.collect.ForwardingConcurrentMap;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.Cleaners;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
@@ -54,11 +55,13 @@ public class OffHeapNamespaceExtractionCacheManager extends
NamespaceExtractionC
*
* <p>In case of actual race, we don't wait in those methods until the
other one, which manages to switch this flag
* first, completes. This could result into the situation that neither one
completes, if the JVM is shutting down
- * and the thread from which {@link Cleaners.Cleanable#clean()}
(delegating to {@link #run()}) is called started the disposal
- * operation, then more deterministic shutdown hook / lifecycle.stop(),
which may call {@link #disposeManually()}
- * completed early, and then the whole process shuts down before {@link
Cleaners.Cleanable#clean()} completes, because shutdown
- * is not blocked by it. However this should be harmless because anyway we
remove the whole MapDB's file in
- * lifecycle.stop() (see {@link
OffHeapNamespaceExtractionCacheManager#OffHeapNamespaceExtractionCacheManager}).
+ * and the thread from which {@link Cleaners.Cleanable#clean()}
(delegating to {@link #run()}) is called started
+ * the disposal operation, then more deterministic shutdown hook /
lifecycle.stop(), which may call
+ * {@link #disposeManually()} completed early, and then the whole process
shuts down before
+ * {@link Cleaners.Cleanable#clean()} completes, because shutdown is not
blocked by it. However this should be
+ * harmless because anyway we remove the whole MapDB's file in
lifecycle.stop()
+ * (see {@link
OffHeapNamespaceExtractionCacheManager#OffHeapNamespaceExtractionCacheManager}).
+ *
* However if we persist off-heap DB between JVM runs, this decision
should be revised.
*/
final AtomicBoolean disposed = new AtomicBoolean(false);
@@ -69,8 +72,8 @@ public class OffHeapNamespaceExtractionCacheManager extends
NamespaceExtractionC
}
/**
- * To be called by the JVM via {@link Cleaners.Cleanable#clean()}. The
only difference from {@link #disposeManually()} is
- * exception treatment.
+ * To be called by the JVM via {@link Cleaners.Cleanable#clean()}. The
only difference from
+ * {@link #disposeManually()} is exception treatment.
*/
@Override
public void run()
@@ -127,6 +130,22 @@ public class OffHeapNamespaceExtractionCacheManager
extends NamespaceExtractionC
}
}
+ private static final class CacheProxy extends
ForwardingConcurrentMap<String, String>
+ {
+ private final ConcurrentMap<String, String> delegate;
+
+ CacheProxy(ConcurrentMap<String, String> delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ @Override
+ protected ConcurrentMap<String, String> delegate()
+ {
+ return delegate;
+ }
+ }
+
private final DB mmapDB;
private final File tmpFile;
private AtomicLong mapDbKeyCounter = new AtomicLong(0);
@@ -208,9 +227,9 @@ public class OffHeapNamespaceExtractionCacheManager extends
NamespaceExtractionC
}
}
MapDbCacheDisposer cacheDisposer = new MapDbCacheDisposer(mapDbKey);
- // Cleaner is "the second level of defence". Normally all users of
createCache() must call disposeCache() with
- // the returned CacheHandler instance manually. But if they don't do this
for whatever reason, JVM will cleanup
- // the cache itself.
+ // Cleaner is "the second level of defence". Normally all users of
createMutableCache() must call disposeCache()
+ // with the returned CacheHandler instance manually. But if they don't do
this for whatever reason, JVM will
+ // cleanup the cache itself.
Cleaners.Cleanable cleanable = Cleaners.register(cache, cacheDisposer);
MapDbCacheDisposerAndCleaner disposerAndCleaner = new
MapDbCacheDisposerAndCleaner(
cacheDisposer,
@@ -220,6 +239,19 @@ public class OffHeapNamespaceExtractionCacheManager
extends NamespaceExtractionC
}
@Override
+ public CacheHandler allocateCache()
+ {
+ return createCache();
+ }
+
+ @Override
+ public CacheHandler attachCache(CacheHandler cache)
+ {
+ // nothing to do, allocate is create, no specialized implementation for
populate then read-only pattern
+ return cache;
+ }
+
+ @Override
void disposeCache(CacheHandler cacheHandler)
{
MapDbCacheDisposerAndCleaner disposerAndCleaner =
(MapDbCacheDisposerAndCleaner) cacheHandler.id;
diff --git
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManager.java
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManager.java
index 9c6871e..287d927 100644
---
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManager.java
+++
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManager.java
@@ -29,6 +29,7 @@ import
org.apache.druid.server.lookup.namespace.NamespaceExtractionConfig;
import java.lang.ref.WeakReference;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -50,9 +51,7 @@ public class OnHeapNamespaceExtractionCacheManager extends
NamespaceExtractionCa
* <p>{@link WeakReference} doesn't override Object's identity equals() and
hashCode(), so effectively this map plays
* like concurrent {@link java.util.IdentityHashMap}.
*/
- private final Set<WeakReference<ConcurrentMap<String, String>>> caches =
Collections.newSetFromMap(
- new ConcurrentHashMap<WeakReference<ConcurrentMap<String, String>>,
Boolean>()
- );
+ private final Set<WeakReference<Map<String, String>>> caches =
Collections.newSetFromMap(new ConcurrentHashMap<>());
@Inject
public OnHeapNamespaceExtractionCacheManager(
@@ -66,7 +65,7 @@ public class OnHeapNamespaceExtractionCacheManager extends
NamespaceExtractionCa
private void expungeCollectedCaches()
{
- for (Iterator<WeakReference<ConcurrentMap<String, String>>> iterator =
caches.iterator(); iterator.hasNext(); ) {
+ for (Iterator<WeakReference<Map<String, String>>> iterator =
caches.iterator(); iterator.hasNext(); ) {
WeakReference<?> cacheRef = iterator.next();
if (cacheRef.get() == null) {
// This may not necessarily mean leak of CacheHandler, because
disposeCache() may be called concurrently with
@@ -85,13 +84,35 @@ public class OnHeapNamespaceExtractionCacheManager extends
NamespaceExtractionCa
public CacheHandler createCache()
{
ConcurrentMap<String, String> cache = new ConcurrentHashMap<>();
- WeakReference<ConcurrentMap<String, String>> cacheRef = new
WeakReference<>(cache);
+ WeakReference<Map<String, String>> cacheRef = new WeakReference<>(cache);
expungeCollectedCaches();
caches.add(cacheRef);
return new CacheHandler(this, cache, cacheRef);
}
@Override
+ public CacheHandler allocateCache()
+ {
+ Map<String, String> cache = new HashMap<>();
+ // untracked, but disposing will explode if we don't create a weak
reference here
+ return new CacheHandler(this, cache, new WeakReference<>(cache));
+ }
+
+ @Override
+ public CacheHandler attachCache(CacheHandler cache)
+ {
+ if (caches.contains((WeakReference<Map<String, String>>) cache.id)) {
+ throw new ISE("cache [%s] is already attached", cache.id);
+ }
+ // this cache is not thread-safe, make sure nothing ever writes to it
+ Map<String, String> immutable =
Collections.unmodifiableMap(cache.getCache());
+ WeakReference<Map<String, String>> cacheRef = new
WeakReference<>(immutable);
+ expungeCollectedCaches();
+ caches.add(cacheRef);
+ return new CacheHandler(this, immutable, cacheRef);
+ }
+
+ @Override
void disposeCache(CacheHandler cacheHandler)
{
if (!(cacheHandler.id instanceof WeakReference)) {
@@ -113,8 +134,8 @@ public class OnHeapNamespaceExtractionCacheManager extends
NamespaceExtractionCa
long numEntries = 0;
long size = 0;
expungeCollectedCaches();
- for (WeakReference<ConcurrentMap<String, String>> cacheRef : caches) {
- final ConcurrentMap<String, String> cache = cacheRef.get();
+ for (WeakReference<Map<String, String>> cacheRef : caches) {
+ final Map<String, String> cache = cacheRef.get();
if (cache == null) {
continue;
}
diff --git
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
index 2f6463a..ff27b50 100644
---
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
+++
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
@@ -26,6 +26,7 @@ import
org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
+import
org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
import
org.apache.druid.server.lookup.namespace.cache.OnHeapNamespaceExtractionCacheManager;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
@@ -50,10 +51,16 @@ public class JdbcCacheGeneratorTest
private static final ServiceEmitter SERVICE_EMITTER = new
NoopServiceEmitter();
+ private static final NamespaceExtractionCacheManager CACHE_MANAGER = new
OnHeapNamespaceExtractionCacheManager(
+ new Lifecycle(),
+ SERVICE_EMITTER,
+ new NamespaceExtractionConfig()
+ );
+
private static final CacheScheduler SCHEDULER = new CacheScheduler(
SERVICE_EMITTER,
Collections.emptyMap(),
- new OnHeapNamespaceExtractionCacheManager(new Lifecycle(),
SERVICE_EMITTER, new NamespaceExtractionConfig())
+ CACHE_MANAGER
);
private static final String LAST_VERSION = "1";
@@ -84,7 +91,7 @@ public class JdbcCacheGeneratorTest
exception.expect(IllegalStateException.class);
exception.expectMessage(MISSING_JDB_DRIVER_JAR_MSG);
- target.generateCache(missingJarNamespace, KEY, LAST_VERSION, SCHEDULER);
+ target.generateCache(missingJarNamespace, KEY, LAST_VERSION,
CACHE_MANAGER.allocateCache());
}
@Test
@@ -100,7 +107,7 @@ public class JdbcCacheGeneratorTest
exception.expect(IllegalStateException.class);
exception.expectMessage(MISSING_JDB_DRIVER_JAR_MSG);
- target.generateCache(missingJarNamespace, KEY, LAST_VERSION, SCHEDULER);
+ target.generateCache(missingJarNamespace, KEY, LAST_VERSION,
CACHE_MANAGER.allocateCache());
}
@SuppressWarnings("SameParameterValue")
diff --git
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/NamespacedExtractorModuleTest.java
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/NamespacedExtractorModuleTest.java
index 8118783..715da7b 100644
---
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/NamespacedExtractorModuleTest.java
+++
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/NamespacedExtractorModuleTest.java
@@ -30,7 +30,9 @@ import
org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
import org.apache.druid.query.lookup.namespace.UriExtractionNamespace;
import org.apache.druid.query.lookup.namespace.UriExtractionNamespaceTest;
import org.apache.druid.segment.loading.LocalFileTimestampVersionFinder;
+import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
+import
org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
import
org.apache.druid.server.lookup.namespace.cache.OnHeapNamespaceExtractionCacheManager;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.joda.time.Period;
@@ -57,6 +59,7 @@ public class NamespacedExtractorModuleTest
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+ private NamespaceExtractionCacheManager cacheManager;
@Before
public void setUp() throws Exception
@@ -75,10 +78,15 @@ public class NamespacedExtractorModuleTest
lifecycle = new Lifecycle();
lifecycle.start();
NoopServiceEmitter noopServiceEmitter = new NoopServiceEmitter();
+ cacheManager = new OnHeapNamespaceExtractionCacheManager(
+ lifecycle,
+ noopServiceEmitter,
+ new NamespaceExtractionConfig()
+ );
scheduler = new CacheScheduler(
noopServiceEmitter,
factoryMap,
- new OnHeapNamespaceExtractionCacheManager(lifecycle,
noopServiceEmitter, new NamespaceExtractionConfig())
+ cacheManager
);
}
@@ -108,9 +116,10 @@ public class NamespacedExtractorModuleTest
null,
null
);
- CacheScheduler.VersionedCache versionedCache =
factory.generateCache(namespace, null, null, scheduler);
- Assert.assertNotNull(versionedCache);
- Map<String, String> map = versionedCache.getCache();
+ CacheHandler cache = cacheManager.allocateCache();
+ String version = factory.generateCache(namespace, null, null, cache);
+ Assert.assertNotNull(version);
+ Map<String, String> map = cache.getCache();
Assert.assertEquals("bar", map.get("foo"));
Assert.assertEquals(null, map.get("baz"));
}
diff --git
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/StaticMapCacheGeneratorTest.java
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/StaticMapCacheGeneratorTest.java
index bd88d4c..30687e4 100644
---
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/StaticMapCacheGeneratorTest.java
+++
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/StaticMapCacheGeneratorTest.java
@@ -22,7 +22,9 @@ package org.apache.druid.server.lookup.namespace;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.query.lookup.namespace.StaticMapExtractionNamespace;
+import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
+import
org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
import
org.apache.druid.server.lookup.namespace.cache.OnHeapNamespaceExtractionCacheManager;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.junit.After;
@@ -39,6 +41,7 @@ public class StaticMapCacheGeneratorTest
private Lifecycle lifecycle;
private CacheScheduler scheduler;
+ private NamespaceExtractionCacheManager cacheManager;
@Before
public void setup() throws Exception
@@ -46,10 +49,15 @@ public class StaticMapCacheGeneratorTest
lifecycle = new Lifecycle();
lifecycle.start();
NoopServiceEmitter noopServiceEmitter = new NoopServiceEmitter();
+ cacheManager = new OnHeapNamespaceExtractionCacheManager(
+ lifecycle,
+ noopServiceEmitter,
+ new NamespaceExtractionConfig()
+ );
scheduler = new CacheScheduler(
noopServiceEmitter,
Collections.emptyMap(),
- new OnHeapNamespaceExtractionCacheManager(lifecycle,
noopServiceEmitter, new NamespaceExtractionConfig())
+ cacheManager
);
}
@@ -64,10 +72,11 @@ public class StaticMapCacheGeneratorTest
{
final StaticMapCacheGenerator factory = new StaticMapCacheGenerator();
final StaticMapExtractionNamespace namespace = new
StaticMapExtractionNamespace(MAP);
- CacheScheduler.VersionedCache versionedCache =
factory.generateCache(namespace, null, null, scheduler);
- Assert.assertNotNull(versionedCache);
- Assert.assertEquals(factory.getVersion(), versionedCache.getVersion());
- Assert.assertEquals(MAP, versionedCache.getCache());
+ CacheHandler cache = cacheManager.allocateCache();
+ final String version = factory.generateCache(namespace, null, null, cache);
+ Assert.assertNotNull(version);
+ Assert.assertEquals(factory.getVersion(), version);
+ Assert.assertEquals(MAP, cache.getCache());
}
@@ -76,6 +85,7 @@ public class StaticMapCacheGeneratorTest
{
final StaticMapCacheGenerator factory = new StaticMapCacheGenerator();
final StaticMapExtractionNamespace namespace = new
StaticMapExtractionNamespace(MAP);
- factory.generateCache(namespace, null, factory.getVersion(), scheduler);
+ CacheHandler cache = cacheManager.allocateCache();
+ factory.generateCache(namespace, null, factory.getVersion(), cache);
}
}
diff --git
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/UriCacheGeneratorTest.java
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/UriCacheGeneratorTest.java
index 4846f89..58ef2dc 100644
---
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/UriCacheGeneratorTest.java
+++
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/UriCacheGeneratorTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.query.lookup.namespace.UriExtractionNamespace;
import org.apache.druid.query.lookup.namespace.UriExtractionNamespaceTest;
import org.apache.druid.segment.loading.LocalFileTimestampVersionFinder;
+import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
import org.apache.druid.server.lookup.namespace.cache.CacheSchedulerTest;
import
org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
@@ -170,72 +171,64 @@ public class UriCacheGeneratorTest
);
final List<Function<Lifecycle, NamespaceExtractionCacheManager>>
cacheManagerCreators = ImmutableList.of(
- new Function<Lifecycle, NamespaceExtractionCacheManager>()
- {
- @Override
- public NamespaceExtractionCacheManager apply(Lifecycle lifecycle)
- {
- return new OnHeapNamespaceExtractionCacheManager(
- lifecycle,
- new NoopServiceEmitter(),
- new NamespaceExtractionConfig()
- );
- }
- },
- new Function<Lifecycle, NamespaceExtractionCacheManager>()
- {
- @Override
- public NamespaceExtractionCacheManager apply(Lifecycle lifecycle)
- {
- return new OffHeapNamespaceExtractionCacheManager(
- lifecycle,
- new NoopServiceEmitter(),
- new NamespaceExtractionConfig()
- );
- }
- }
+ lifecycle -> new OnHeapNamespaceExtractionCacheManager(
+ lifecycle,
+ new NoopServiceEmitter(),
+ new NamespaceExtractionConfig()
+ ),
+ lifecycle -> new OffHeapNamespaceExtractionCacheManager(
+ lifecycle,
+ new NoopServiceEmitter(),
+ new NamespaceExtractionConfig()
+ )
);
- return new Iterable<Object[]>()
+ return () -> new Iterator<Object[]>()
{
+ Iterator<Object[]> compressionIt = compressionParams.iterator();
+ Iterator<Function<Lifecycle, NamespaceExtractionCacheManager>>
cacheManagerCreatorsIt =
+ cacheManagerCreators.iterator();
+ Object[] compressions = compressionIt.next();
+
@Override
- public Iterator<Object[]> iterator()
+ public boolean hasNext()
{
- return new Iterator<Object[]>()
- {
- Iterator<Object[]> compressionIt = compressionParams.iterator();
- Iterator<Function<Lifecycle, NamespaceExtractionCacheManager>>
cacheManagerCreatorsIt =
- cacheManagerCreators.iterator();
- Object[] compressions = compressionIt.next();
-
- @Override
- public boolean hasNext()
- {
- return compressionIt.hasNext() || cacheManagerCreatorsIt.hasNext();
- }
+ return compressionIt.hasNext() || cacheManagerCreatorsIt.hasNext();
+ }
- @Override
- public Object[] next()
- {
- if (cacheManagerCreatorsIt.hasNext()) {
- Function<Lifecycle, NamespaceExtractionCacheManager>
cacheManagerCreator = cacheManagerCreatorsIt.next();
- return new Object[]{compressions[0], compressions[1],
cacheManagerCreator};
- } else {
- cacheManagerCreatorsIt = cacheManagerCreators.iterator();
- compressions = compressionIt.next();
- return next();
- }
- }
+ @Override
+ public Object[] next()
+ {
+ if (cacheManagerCreatorsIt.hasNext()) {
+ Function<Lifecycle, NamespaceExtractionCacheManager>
cacheManagerCreator = cacheManagerCreatorsIt.next();
+ return new Object[]{compressions[0], compressions[1],
cacheManagerCreator};
+ } else {
+ cacheManagerCreatorsIt = cacheManagerCreators.iterator();
+ compressions = compressionIt.next();
+ return next();
+ }
+ }
- @Override
- public void remove()
- {
- throw new UOE("Cannot remove");
- }
- };
+ @Override
+ public void remove()
+ {
+ throw new UOE("Cannot remove");
}
};
}
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private final String suffix;
+
+ private final Function<File, OutputStream> outStreamSupplier;
+ private final Lifecycle lifecycle;
+ private final NamespaceExtractionCacheManager cacheManager;
+ private final CacheScheduler scheduler;
+ private File tmpFile;
+ private UriCacheGenerator generator;
+ private UriExtractionNamespace namespace;
+
public UriCacheGeneratorTest(
String suffix,
Function<File, OutputStream> outStreamSupplier,
@@ -245,30 +238,19 @@ public class UriCacheGeneratorTest
this.suffix = suffix;
this.outStreamSupplier = outStreamSupplier;
this.lifecycle = new Lifecycle();
+ this.cacheManager = cacheManagerCreator.apply(lifecycle);
this.scheduler = new CacheScheduler(
new NoopServiceEmitter(),
ImmutableMap.of(UriExtractionNamespace.class, new
UriCacheGenerator(FINDERS)),
- cacheManagerCreator.apply(lifecycle)
+ cacheManager
);
}
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- private final String suffix;
- private final Function<File, OutputStream> outStreamSupplier;
- private Lifecycle lifecycle;
- private CacheScheduler scheduler;
- private File tmpFile;
- private File tmpFileParent;
- private UriCacheGenerator generator;
- private UriExtractionNamespace namespace;
-
@Before
public void setUp() throws Exception
{
lifecycle.start();
- tmpFileParent = new File(temporaryFolder.newFolder(), "☃");
+ File tmpFileParent = new File(temporaryFolder.newFolder(), "☃");
Assert.assertTrue(tmpFileParent.mkdir());
Assert.assertTrue(tmpFileParent.isDirectory());
tmpFile = Files.createTempFile(tmpFileParent.toPath(),
"druidTestURIExtractionNS", suffix).toFile();
@@ -372,15 +354,16 @@ public class UriCacheGeneratorTest
{
Assert.assertEquals(0, scheduler.getActiveEntries());
- CacheScheduler.VersionedCache versionedCache =
generator.generateCache(namespace, null, null, scheduler);
- Assert.assertNotNull(versionedCache);
- Map<String, String> map = versionedCache.getCache();
+ CacheHandler cache = cacheManager.allocateCache();
+ String newVersion = generator.generateCache(namespace, null, null, cache);
+ Assert.assertNotNull(newVersion);
+ Map<String, String> map = cache.getCache();
Assert.assertEquals("bar", map.get("foo"));
Assert.assertEquals(null, map.get("baz"));
- String version = versionedCache.getVersion();
+ String version = newVersion;
Assert.assertNotNull(version);
- Assert.assertNull(generator.generateCache(namespace, null, version,
scheduler));
+ Assert.assertNull(generator.generateCache(namespace, null, version,
cacheManager.allocateCache()));
}
@Test(expected = FileNotFoundException.class)
@@ -395,7 +378,7 @@ public class UriCacheGeneratorTest
null
);
Assert.assertTrue(new File(namespace.getUri()).delete());
- generator.generateCache(badNamespace, null, null, scheduler);
+ generator.generateCache(badNamespace, null, null,
cacheManager.allocateCache());
}
@Test(expected = FileNotFoundException.class)
@@ -411,7 +394,7 @@ public class UriCacheGeneratorTest
null
);
Assert.assertTrue(new File(namespace.getUri()).delete());
- generator.generateCache(badNamespace, null, null, scheduler);
+ generator.generateCache(badNamespace, null, null,
cacheManager.allocateCache());
}
@Test(expected = IAE.class)
@@ -504,7 +487,7 @@ public class UriCacheGeneratorTest
null,
null
);
- Assert.assertNotNull(generator.generateCache(extractionNamespace, null,
null, scheduler));
+ Assert.assertNotNull(generator.generateCache(extractionNamespace, null,
null, cacheManager.allocateCache()));
}
@Test(timeout = 60_000L)
diff --git
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
index ce2f257..51412d7 100644
---
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
+++
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
@@ -54,7 +54,6 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -134,24 +133,12 @@ public class CacheSchedulerTest
lifecycle.start();
cacheManager = createCacheManager.apply(lifecycle);
final Path tmpDir = temporaryFolder.newFolder().toPath();
- final CacheGenerator<UriExtractionNamespace> cacheGenerator = new
- CacheGenerator<UriExtractionNamespace>()
- {
- @Override
- public CacheScheduler.VersionedCache generateCache(
- final UriExtractionNamespace extractionNamespace,
- final CacheScheduler.EntryImpl<UriExtractionNamespace> id,
- final String lastVersion,
- final CacheScheduler scheduler
- ) throws InterruptedException
- {
- Thread.sleep(2); // To make absolutely sure there is a unique
currentTimeMillis
- String version = Long.toString(System.currentTimeMillis());
- CacheScheduler.VersionedCache versionedCache =
scheduler.createVersionedCache(id, version);
- // Don't actually read off disk because TravisCI doesn't like that
- versionedCache.getCache().put(KEY, VALUE);
- return versionedCache;
- }
+ final CacheGenerator<UriExtractionNamespace> cacheGenerator =
(extractionNamespace, id, lastVersion, cache) -> {
+ Thread.sleep(2); // To make absolutely sure there is a unique
currentTimeMillis
+ String version = Long.toString(System.currentTimeMillis());
+ // Don't actually read off disk because TravisCI doesn't like that
+ cache.getCache().put(KEY, VALUE);
+ return version;
};
scheduler = new CacheScheduler(
new NoopServiceEmitter(),
@@ -193,9 +180,7 @@ public class CacheSchedulerTest
);
CacheScheduler.Entry entry = scheduler.schedule(namespace);
waitFor(entry);
- Map<String, String> cache = entry.getCache();
- Assert.assertNull(cache.put("key", "val"));
- Assert.assertEquals("val", cache.get("key"));
+ Assert.assertEquals(VALUE, entry.getCache().get(KEY));
}
@Test(timeout = 60_000L)
diff --git
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
index c583b34..b6a3724 100644
---
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
+++
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
@@ -56,7 +56,6 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -126,160 +125,117 @@ public class JdbcExtractionNamespaceTest
setupTeardownService =
MoreExecutors.listeningDecorator(Execs.multiThreaded(2,
"JDBCExtractionNamespaceTeardown--%s"));
final ListenableFuture<Handle> setupFuture = setupTeardownService.submit(
- new Callable<Handle>()
- {
- @Override
- public Handle call()
- {
- final Handle handle =
derbyConnectorRule.getConnector().getDBI().open();
- Assert.assertEquals(
- 0,
- handle.createStatement(
- StringUtils.format(
- "CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s
VARCHAR(64), %s VARCHAR(64))",
- TABLE_NAME,
- TS_COLUMN,
- FILTER_COLUMN,
- KEY_NAME,
- VAL_NAME
- )
- ).setQueryTimeout(1).execute()
- );
- handle.createStatement(StringUtils.format("TRUNCATE TABLE %s",
TABLE_NAME)).setQueryTimeout(1).execute();
- handle.commit();
- closer.register(new Closeable()
- {
- @Override
- public void close() throws IOException
- {
- handle.createStatement("DROP TABLE " +
TABLE_NAME).setQueryTimeout(1).execute();
- final ListenableFuture future =
setupTeardownService.submit(new Runnable()
- {
- @Override
- public void run()
- {
- handle.close();
- }
- });
- try (Closeable closeable = new Closeable()
- {
- @Override
- public void close()
- {
- future.cancel(true);
- }
- }) {
- future.get(10, TimeUnit.SECONDS);
- }
- catch (InterruptedException | ExecutionException |
TimeoutException e) {
- throw new IOException("Error closing handle", e);
- }
- }
- });
- closer.register(new Closeable()
+ () -> {
+ final Handle handle =
derbyConnectorRule.getConnector().getDBI().open();
+ Assert.assertEquals(
+ 0,
+ handle.createStatement(
+ StringUtils.format(
+ "CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s
VARCHAR(64), %s VARCHAR(64))",
+ TABLE_NAME,
+ TS_COLUMN,
+ FILTER_COLUMN,
+ KEY_NAME,
+ VAL_NAME
+ )
+ ).setQueryTimeout(1).execute()
+ );
+ handle.createStatement(StringUtils.format("TRUNCATE TABLE %s",
TABLE_NAME)).setQueryTimeout(1).execute();
+ handle.commit();
+ closer.register(() -> {
+ handle.createStatement("DROP TABLE " +
TABLE_NAME).setQueryTimeout(1).execute();
+ final ListenableFuture future = setupTeardownService.submit(new
Runnable()
{
@Override
- public void close()
+ public void run()
{
- if (scheduler == null) {
- return;
- }
- Assert.assertEquals(0, scheduler.getActiveEntries());
+ handle.close();
}
});
- for (Map.Entry<String, String[]> entry : RENAMES.entrySet()) {
- try {
- String key = entry.getKey();
- String value = entry.getValue()[0];
- String filter = entry.getValue()[1];
- insertValues(handle, key, value, filter, "2015-01-01
00:00:00");
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
+ try (Closeable ignored = () -> future.cancel(true)) {
+ future.get(10, TimeUnit.SECONDS);
}
+ catch (InterruptedException | ExecutionException |
TimeoutException e) {
+ throw new IOException("Error closing handle", e);
+ }
+ });
+ closer.register(() -> {
+ if (scheduler == null) {
+ return;
+ }
+ Assert.assertEquals(0, scheduler.getActiveEntries());
+ });
+ for (Map.Entry<String, String[]> entry : RENAMES.entrySet()) {
+ try {
+ String key = entry.getKey();
+ String value = entry.getValue()[0];
+ String filter = entry.getValue()[1];
+ insertValues(handle, key, value, filter, "2015-01-01 00:00:00");
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
- NoopServiceEmitter noopServiceEmitter = new NoopServiceEmitter();
- scheduler = new CacheScheduler(
- noopServiceEmitter,
- ImmutableMap.of(
- JdbcExtractionNamespace.class,
- new CacheGenerator<JdbcExtractionNamespace>()
+ NoopServiceEmitter noopServiceEmitter = new NoopServiceEmitter();
+ scheduler = new CacheScheduler(
+ noopServiceEmitter,
+ ImmutableMap.of(
+ JdbcExtractionNamespace.class,
+ new CacheGenerator<JdbcExtractionNamespace>()
+ {
+ private final JdbcCacheGenerator delegate =
+ new JdbcCacheGenerator();
+
+ @Override
+ public String generateCache(
+ final JdbcExtractionNamespace namespace,
+ final
CacheScheduler.EntryImpl<JdbcExtractionNamespace> id,
+ final String lastVersion,
+ final CacheHandler cache
+ ) throws InterruptedException
{
- private final JdbcCacheGenerator delegate =
- new JdbcCacheGenerator();
-
- @Override
- public CacheScheduler.VersionedCache generateCache(
- final JdbcExtractionNamespace namespace,
- final
CacheScheduler.EntryImpl<JdbcExtractionNamespace> id,
- final String lastVersion,
- final CacheScheduler scheduler
- ) throws InterruptedException
- {
- updateLock.lockInterruptibly();
+ updateLock.lockInterruptibly();
+ try {
+ log.debug("Running cache generator");
try {
- log.debug("Running cache generator");
- try {
- return delegate.generateCache(namespace, id,
lastVersion, scheduler);
- }
- finally {
- updates.incrementAndGet();
- }
+ return delegate.generateCache(namespace, id,
lastVersion, cache);
}
finally {
- updateLock.unlock();
+ updates.incrementAndGet();
}
}
- }
- ),
- new OnHeapNamespaceExtractionCacheManager(
- lifecycle,
- noopServiceEmitter,
- new NamespaceExtractionConfig()
- )
- );
- try {
- lifecycle.start();
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- closer.register(
- new Closeable()
- {
- @Override
- public void close() throws IOException
- {
- final ListenableFuture future =
setupTeardownService.submit(
- new Runnable()
- {
- @Override
- public void run()
- {
- lifecycle.stop();
- }
- }
- );
- try (final Closeable closeable = new Closeable()
- {
- @Override
- public void close()
- {
- future.cancel(true);
+ finally {
+ updateLock.unlock();
}
- }) {
- future.get(30, TimeUnit.SECONDS);
- }
- catch (InterruptedException | ExecutionException |
TimeoutException e) {
- throw new IOException("Error stopping lifecycle", e);
}
}
- }
- );
- return handle;
+ ),
+ new OnHeapNamespaceExtractionCacheManager(
+ lifecycle,
+ noopServiceEmitter,
+ new NamespaceExtractionConfig()
+ )
+ );
+ try {
+ lifecycle.start();
}
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ closer.register(
+ () -> {
+ final ListenableFuture future = setupTeardownService.submit(()
-> lifecycle.stop());
+ try (final Closeable ignored = () -> future.cancel(true)) {
+ future.get(30, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException | ExecutionException |
TimeoutException e) {
+ throw new IOException("Error stopping lifecycle", e);
+ }
+ }
+ );
+ return handle;
}
);
@@ -293,36 +249,26 @@ public class JdbcExtractionNamespaceTest
public void tearDown() throws InterruptedException, ExecutionException,
TimeoutException, IOException
{
final ListenableFuture<?> tearDownFuture = setupTeardownService.submit(
- new Runnable()
- {
- @Override
- public void run()
- {
- try {
- closer.close();
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
+ () -> {
+ try {
+ closer.close();
}
- }
- );
- try (final Closeable closeable = new Closeable()
- {
- @Override
- public void close() throws IOException
- {
- setupTeardownService.shutdownNow();
- try {
- if (!setupTeardownService.awaitTermination(60, TimeUnit.SECONDS)) {
- log.error("Tear down service didn't finish");
+ catch (IOException e) {
+ throw new RuntimeException(e);
}
}
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted", e);
+ );
+ try (final Closeable ignored = () -> {
+ setupTeardownService.shutdownNow();
+ try {
+ if (!setupTeardownService.awaitTermination(60, TimeUnit.SECONDS)) {
+ log.error("Tear down service didn't finish");
}
}
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted", e);
+ }
}) {
tearDownFuture.get(60, TimeUnit.SECONDS);
}
diff --git
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManagersTest.java
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManagersTest.java
index 6342bf5..e497f18 100644
---
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManagersTest.java
+++
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManagersTest.java
@@ -82,21 +82,16 @@ public class NamespaceExtractionCacheManagersTest
try {
for (int i = 0; i < concurrentThreads; ++i) {
futures.add(service.submit(
- new Runnable()
- {
- @Override
- public void run()
- {
- try {
- thunder.await();
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- for (int i = 0; i < 1000; ++i) {
- CacheHandler cacheHandler = manager.createCache();
- cacheHandler.close();
- }
+ () -> {
+ try {
+ thunder.await();
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ for (int i1 = 0; i1 < 1000; ++i1) {
+ CacheHandler cacheHandler = manager.createCache();
+ cacheHandler.close();
}
}
));
diff --git
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManagerTest.java
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManagerTest.java
index 1210620..758f2ce 100644
---
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManagerTest.java
+++
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.Initialization;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.lookup.namespace.NamespaceExtractionModule;
import org.junit.Assert;
@@ -40,6 +41,37 @@ public class OnHeapNamespaceExtractionCacheManagerTest
@Test
public void testInjection()
{
+ final NamespaceExtractionCacheManager manager = getCacheManager();
+ Assert.assertEquals(OnHeapNamespaceExtractionCacheManager.class,
manager.getClass());
+ }
+
+ @Test
+ public void testImmutableAfterAttach()
+ {
+ NamespaceExtractionCacheManager manager = getCacheManager();
+ CacheHandler handler = manager.allocateCache();
+ handler.getCache().put("some key", "some value");
+ CacheHandler immutableHandler = manager.attachCache(handler);
+ Assert.assertThrows(
+ UnsupportedOperationException.class,
+ () -> immutableHandler.getCache().put("other key", "other value")
+ );
+ }
+
+ @Test
+ public void testIllegalToDoubleAttach()
+ {
+ NamespaceExtractionCacheManager manager = getCacheManager();
+ CacheHandler handler = manager.createCache();
+ handler.getCache().put("some key", "some value");
+ Assert.assertThrows(
+ ISE.class,
+ () -> manager.attachCache(handler)
+ );
+ }
+
+ private NamespaceExtractionCacheManager getCacheManager()
+ {
final Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.of(
@@ -61,6 +93,6 @@ public class OnHeapNamespaceExtractionCacheManagerTest
properties.clear();
properties.put(NamespaceExtractionModule.TYPE_PREFIX, "onHeap");
final NamespaceExtractionCacheManager manager =
injector.getInstance(NamespaceExtractionCacheManager.class);
- Assert.assertEquals(OnHeapNamespaceExtractionCacheManager.class,
manager.getClass());
+ return manager;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]