This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e2fde8c516e Refactor lookups behavior while loading/dropping the
containers (#14806)
e2fde8c516e is described below
commit e2fde8c516e418e4d9711c8490b54318cdd2ccf7
Author: Pranav <[email protected]>
AuthorDate: Tue Nov 7 10:07:28 2023 -0800
Refactor lookups behavior while loading/dropping the containers (#14806)
---
.../extensions-core/lookups-cached-global.md | 1 +
.../query/lookup/KafkaLookupExtractorFactory.java | 12 ++
.../lookup/KafkaLookupExtractorFactoryTest.java | 2 +
.../lookup/NamespaceLookupExtractorFactory.java | 17 +++
.../lookup/namespace/ExtractionNamespace.java | 5 +
.../lookup/namespace/JdbcExtractionNamespace.java | 13 ++
.../lookup/namespace/cache/CacheScheduler.java | 6 +
.../NamespaceLookupExtractorFactoryTest.java | 36 +++++-
.../JdbcExtractionNamespaceUrlCheckTest.java | 16 ++-
.../lookup/namespace/JdbcCacheGeneratorTest.java | 1 +
.../lookup/namespace/cache/CacheSchedulerTest.java | 19 +++
.../cache/JdbcExtractionNamespaceTest.java | 6 +
.../druid/server/lookup/LoadingLookupFactory.java | 11 ++
.../druid/server/lookup/PollingLookupFactory.java | 11 ++
.../server/lookup/LoadingLookupFactoryTest.java | 2 +
.../server/lookup/PollingLookupFactoryTest.java | 2 +
.../druid/query/lookup/LookupExtractorFactory.java | 11 ++
.../LookupExtractorFactoryContainerTest.java | 11 ++
.../druid/query/lookup/LookupSegmentTest.java | 11 ++
.../query/lookup/LookupListeningResource.java | 6 +-
.../query/lookup/LookupReferencesManager.java | 123 +++++++++++++-----
.../query/lookup/MapLookupExtractorFactory.java | 11 ++
.../LookupEnabledTestExprMacroTable.java | 11 ++
.../query/lookup/LookupReferencesManagerTest.java | 137 +++++++++++++++++++--
.../lookup/RegisteredLookupExtractionFnTest.java | 10 ++
25 files changed, 446 insertions(+), 45 deletions(-)
diff --git a/docs/development/extensions-core/lookups-cached-global.md
b/docs/development/extensions-core/lookups-cached-global.md
index dc8827a5b36..13883c7aa79 100644
--- a/docs/development/extensions-core/lookups-cached-global.md
+++ b/docs/development/extensions-core/lookups-cached-global.md
@@ -353,6 +353,7 @@ The JDBC lookups will poll a database to populate its local
cache. If the `tsCol
|`tsColumn`| The column in `table` which contains when the key was
updated|No|Not used|
|`pollPeriod`|How often to poll the DB|No|0 (only once)|
|`jitterSeconds`| How much jitter to add (in seconds) up to maximum as a delay
(actual value will be used as random from 0 to `jitterSeconds`), used to
distribute db load more evenly|No|0|
+|`loadTimeoutSeconds`| How much time (in seconds) it can take to query and
populate lookup values. It will be helpful in lookup updates. On lookup update,
it will wait maximum of `loadTimeoutSeconds` for new lookup to come up and
continue serving from old lookup until new lookup successfully loads. |No|0|
|`maxHeapPercentage`|The maximum percentage of heap size that the lookup
should consume. If the lookup grows beyond this size, warning messages will be
logged in the respective service logs.|No|10% of JVM heap size|
```json
diff --git
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
index 6d5d393a0bf..049f33f40c8 100644
---
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
+++
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
@@ -297,6 +297,18 @@ public class KafkaLookupExtractorFactory implements
LookupExtractorFactory
return new KafkaLookupExtractorIntrospectionHandler(this);
}
+ @Override
+ public void awaitInitialization()
+ {
+ // Kafka lookup do not need await on initialization as it is realtime
kafka lookups.
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return true;
+ }
+
@Override
public LookupExtractor get()
{
diff --git
a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
index 6c21990bb52..24a7b481b22 100644
---
a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
+++
b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
@@ -83,11 +83,13 @@ public class KafkaLookupExtractorFactoryTest
mapper.writeValueAsString(expected),
KafkaLookupExtractorFactory.class
);
+ result.awaitInitialization();
Assert.assertEquals(expected.getKafkaTopic(), result.getKafkaTopic());
Assert.assertEquals(expected.getKafkaProperties(),
result.getKafkaProperties());
Assert.assertEquals(cacheManager, result.getCacheManager());
Assert.assertEquals(0, expected.getCompletedEventCount());
Assert.assertEquals(0, result.getCompletedEventCount());
+ Assert.assertTrue(result.isInitialized());
}
@Test
diff --git
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/NamespaceLookupExtractorFactory.java
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/NamespaceLookupExtractorFactory.java
index a89727f643a..ce3121679ad 100644
---
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/NamespaceLookupExtractorFactory.java
+++
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/NamespaceLookupExtractorFactory.java
@@ -36,6 +36,7 @@ import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -169,6 +170,22 @@ public class NamespaceLookupExtractorFactory implements
LookupExtractorFactory
return lookupIntrospectHandler;
}
+ @Override
+ public void awaitInitialization() throws InterruptedException,
TimeoutException
+ {
+ long timeout = extractionNamespace.getLoadTimeoutMills();
+ if (entry.getCacheState() == CacheScheduler.NoCache.CACHE_NOT_INITIALIZED)
{
+ LOG.info("Cache not initialized yet for namespace %s waiting for %s
mills", extractionNamespace, timeout);
+ entry.awaitTotalUpdatesWithTimeout(1, timeout);
+ }
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return entry.getCacheState() instanceof CacheScheduler.VersionedCache;
+ }
+
@JsonProperty
public ExtractionNamespace getExtractionNamespace()
{
diff --git
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/ExtractionNamespace.java
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/ExtractionNamespace.java
index c52021bd18f..ba910d3755d 100644
---
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/ExtractionNamespace.java
+++
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/ExtractionNamespace.java
@@ -49,4 +49,9 @@ public interface ExtractionNamespace
{
return 0;
}
+
+ default long getLoadTimeoutMills()
+ {
+ return 60 * 1000;
+ }
}
diff --git
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespace.java
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespace.java
index 32ceccd1a82..0c1cd427e54 100644
---
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespace.java
+++
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespace.java
@@ -45,6 +45,7 @@ public class JdbcExtractionNamespace implements
ExtractionNamespace
private static final Logger LOG = new Logger(JdbcExtractionNamespace.class);
long DEFAULT_MAX_HEAP_PERCENTAGE = 10L;
+ long DEFAULT_LOOKUP_LOAD_TIME_SECONDS = 120;
@JsonProperty
private final MetadataStorageConnectorConfig connectorConfig;
@@ -63,6 +64,8 @@ public class JdbcExtractionNamespace implements
ExtractionNamespace
@JsonProperty
private final long maxHeapPercentage;
@JsonProperty
+ private final long loadTimeoutSeconds;
+ @JsonProperty
private final int jitterSeconds;
@JsonCreator
@@ -77,6 +80,7 @@ public class JdbcExtractionNamespace implements
ExtractionNamespace
@Min(0) @JsonProperty(value = "pollPeriod") @Nullable final Period
pollPeriod,
@JsonProperty(value = "maxHeapPercentage") @Nullable final Long
maxHeapPercentage,
@JsonProperty(value = "jitterSeconds") @Nullable Integer jitterSeconds,
+ @JsonProperty(value = "loadTimeoutSeconds") @Nullable final Long
loadTimeoutSeconds,
@JacksonInject JdbcAccessSecurityConfig securityConfig
)
{
@@ -101,6 +105,7 @@ public class JdbcExtractionNamespace implements
ExtractionNamespace
}
this.jitterSeconds = jitterSeconds == null ? 0 : jitterSeconds;
this.maxHeapPercentage = maxHeapPercentage == null ?
DEFAULT_MAX_HEAP_PERCENTAGE : maxHeapPercentage;
+ this.loadTimeoutSeconds = loadTimeoutSeconds == null ?
DEFAULT_LOOKUP_LOAD_TIME_SECONDS : loadTimeoutSeconds;
}
/**
@@ -176,6 +181,12 @@ public class JdbcExtractionNamespace implements
ExtractionNamespace
return 1000L * ThreadLocalRandom.current().nextInt(jitterSeconds + 1);
}
+ @Override
+ public long getLoadTimeoutMills()
+ {
+ return 1000L * loadTimeoutSeconds;
+ }
+
@Override
public String toString()
{
@@ -187,6 +198,8 @@ public class JdbcExtractionNamespace implements
ExtractionNamespace
", tsColumn='" + tsColumn + '\'' +
", filter='" + filter + '\'' +
", pollPeriod=" + pollPeriod +
+ ", jitterSeconds=" + jitterSeconds +
+ ", loadTimeoutSeconds=" + loadTimeoutSeconds +
", maxHeapPercentage=" + maxHeapPercentage +
'}';
}
diff --git
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
index 61e580563f8..30fa710a4b5 100644
---
a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
+++
b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
@@ -115,6 +115,12 @@ public final class CacheScheduler
impl.updateCounter.awaitCount(totalUpdates);
}
+ @VisibleForTesting
+ public void awaitTotalUpdatesWithTimeout(int totalUpdates, long
timeoutMills)
+ throws InterruptedException, TimeoutException
+ {
+ impl.updateCounter.awaitCount(totalUpdates, timeoutMills,
TimeUnit.MILLISECONDS);
+ }
@VisibleForTesting
void awaitNextUpdates(int nextUpdates) throws InterruptedException
{
diff --git
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/NamespaceLookupExtractorFactoryTest.java
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/NamespaceLookupExtractorFactoryTest.java
index 3515ebe0623..1aa4ef1dfe4 100644
---
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/NamespaceLookupExtractorFactoryTest.java
+++
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/NamespaceLookupExtractorFactoryTest.java
@@ -59,6 +59,7 @@ import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -227,7 +228,6 @@ public class NamespaceLookupExtractorFactoryTest
);
Assert.assertTrue(namespaceLookupExtractorFactory.start());
Assert.assertTrue(namespaceLookupExtractorFactory.start());
-
verify(scheduler).scheduleAndWait(extractionNamespace, 60000L);
verifyNoMoreInteractions(scheduler, entry, versionedCache);
}
@@ -287,6 +287,40 @@ public class NamespaceLookupExtractorFactoryTest
verifyNoMoreInteractions(scheduler, entry, versionedCache);
}
+ @Test
+ public void testAwaitInitializationOnCacheNotInitialized() throws Exception
+ {
+ final ExtractionNamespace extractionNamespace = new ExtractionNamespace()
+ {
+ @Override
+ public long getPollMs()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getLoadTimeoutMills()
+ {
+ return 1;
+ }
+ };
+ expectScheduleAndWaitOnce(extractionNamespace);
+
when(entry.getCacheState()).thenReturn(CacheScheduler.NoCache.CACHE_NOT_INITIALIZED);
+
+ final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory =
new NamespaceLookupExtractorFactory(
+ extractionNamespace,
+ scheduler
+ );
+ Assert.assertTrue(namespaceLookupExtractorFactory.start());
+ namespaceLookupExtractorFactory.awaitInitialization();
+ Assert.assertThrows(ISE.class, () ->
namespaceLookupExtractorFactory.get());
+ verify(scheduler).scheduleAndWait(extractionNamespace, 60000L);
+ verify(entry, times(2)).getCacheState();
+ verify(entry).awaitTotalUpdatesWithTimeout(1, 1);
+ Thread.sleep(10);
+ verifyNoMoreInteractions(scheduler, entry, versionedCache);
+ }
+
private void expectScheduleAndWaitOnce(ExtractionNamespace
extractionNamespace)
{
try {
diff --git
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespaceUrlCheckTest.java
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespaceUrlCheckTest.java
index f4fffef5fff..178abca9c49 100644
---
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespaceUrlCheckTest.java
+++
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespaceUrlCheckTest.java
@@ -63,7 +63,9 @@ public class JdbcExtractionNamespaceUrlCheckTest
"some filter",
new Period(10),
null,
- 0, new JdbcAccessSecurityConfig()
+ 0,
+ 1000L,
+ new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
@@ -102,6 +104,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
new Period(10),
null,
0,
+ null,
new JdbcAccessSecurityConfig()
{
@Override
@@ -139,6 +142,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
new Period(10),
null,
0,
+ null,
new JdbcAccessSecurityConfig()
{
@Override
@@ -178,6 +182,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
new Period(10),
null,
0,
+ null,
new JdbcAccessSecurityConfig()
{
@Override
@@ -221,6 +226,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
new Period(10),
null,
0,
+ null,
new JdbcAccessSecurityConfig()
{
@Override
@@ -260,6 +266,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
new Period(10),
10L,
0,
+ null,
new JdbcAccessSecurityConfig()
{
@Override
@@ -296,7 +303,9 @@ public class JdbcExtractionNamespaceUrlCheckTest
"some filter",
new Period(10),
null,
- 0, new JdbcAccessSecurityConfig()
+ 0,
+ null,
+ new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
@@ -335,6 +344,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
new Period(10),
null,
0,
+ null,
new JdbcAccessSecurityConfig()
{
@Override
@@ -380,6 +390,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
new Period(10),
null,
0,
+ null,
new JdbcAccessSecurityConfig()
{
@Override
@@ -423,6 +434,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
new Period(10),
null,
0,
+ null,
new JdbcAccessSecurityConfig()
{
@Override
diff --git
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
index 1eb74630fda..7162eac0a2d 100644
---
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
+++
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
@@ -138,6 +138,7 @@ public class JdbcCacheGeneratorTest
Period.ZERO,
null,
0,
+ null,
new JdbcAccessSecurityConfig()
);
}
diff --git
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
index fd96529ae99..fb4d6070e3e 100644
---
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
+++
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
@@ -191,6 +191,24 @@ public class CacheSchedulerTest
Assert.assertEquals(VALUE, entry.getCache().get(KEY));
}
+ @Test(timeout = 60_000L)
+ public void testInitialization() throws InterruptedException,
TimeoutException
+ {
+ UriExtractionNamespace namespace = new UriExtractionNamespace(
+ tmpFile.toURI(),
+ null, null,
+ new UriExtractionNamespace.ObjectMapperFlatDataParser(
+ UriExtractionNamespaceTest.registerTypes(new ObjectMapper())
+ ),
+ new Period(0),
+ null,
+ null
+ );
+ CacheScheduler.Entry entry = scheduler.schedule(namespace);
+ entry.awaitTotalUpdatesWithTimeout(1, 2000);
+ Assert.assertEquals(VALUE, entry.getCache().get(KEY));
+ }
+
@Test(timeout = 60_000L)
public void testPeriodicUpdatesScheduled() throws InterruptedException
{
@@ -459,6 +477,7 @@ public class CacheSchedulerTest
new Period(10_000),
null,
0,
+ null,
new JdbcAccessSecurityConfig()
{
@Override
diff --git
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
index e0c651724d7..cf71a5da490 100644
---
a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
+++
b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
@@ -329,6 +329,7 @@ public class JdbcExtractionNamespaceTest
new Period(0),
null,
0,
+ null,
new JdbcAccessSecurityConfig()
);
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace))
{
@@ -363,6 +364,7 @@ public class JdbcExtractionNamespaceTest
new Period(0),
null,
0,
+ null,
new JdbcAccessSecurityConfig()
);
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace))
{
@@ -414,6 +416,7 @@ public class JdbcExtractionNamespaceTest
new Period(0),
null,
120,
+ null,
new JdbcAccessSecurityConfig()
);
long jitter = extractionNamespace.getJitterMills();
@@ -433,6 +436,7 @@ public class JdbcExtractionNamespaceTest
FILTER_COLUMN + "='1'",
new Period(0),
null,
+ 0,
null,
new JdbcAccessSecurityConfig()
);
@@ -478,6 +482,7 @@ public class JdbcExtractionNamespaceTest
new Period(10),
null,
0,
+ null,
securityConfig
);
final ObjectMapper mapper = new DefaultObjectMapper();
@@ -504,6 +509,7 @@ public class JdbcExtractionNamespaceTest
new Period(10),
null,
0,
+ null,
new JdbcAccessSecurityConfig()
);
CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace);
diff --git
a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/LoadingLookupFactory.java
b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/LoadingLookupFactory.java
index 5746c695239..f7267c7dbfb 100644
---
a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/LoadingLookupFactory.java
+++
b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/LoadingLookupFactory.java
@@ -111,6 +111,17 @@ public class LoadingLookupFactory implements
LookupExtractorFactory
return null;
}
+ @Override
+ public void awaitInitialization()
+ {
+ // LoadingLookupFactory does not have any initialization period as it
fetches the key from loadingCache and DataFetcher as necessary.
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return true;
+ }
@Override
public LoadingLookup get()
{
diff --git
a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookupFactory.java
b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookupFactory.java
index 48cdc47f6cc..63ab7356be8 100644
---
a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookupFactory.java
+++
b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookupFactory.java
@@ -128,6 +128,17 @@ public class PollingLookupFactory implements
LookupExtractorFactory
return null;
}
+ @Override
+ public void awaitInitialization()
+ {
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return true;
+ }
+
@Override
public PollingLookup get()
{
diff --git
a/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/LoadingLookupFactoryTest.java
b/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/LoadingLookupFactoryTest.java
index be6acd56139..cf67e6e32ea 100644
---
a/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/LoadingLookupFactoryTest.java
+++
b/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/LoadingLookupFactoryTest.java
@@ -57,6 +57,8 @@ public class LoadingLookupFactoryTest
EasyMock.expectLastCall().once();
EasyMock.replay(loadingLookup);
Assert.assertTrue(loadingLookupFactory.start());
+ loadingLookupFactory.awaitInitialization();
+ Assert.assertTrue(loadingLookupFactory.isInitialized());
Assert.assertTrue(loadingLookupFactory.close());
EasyMock.verify(loadingLookup);
diff --git
a/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/PollingLookupFactoryTest.java
b/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/PollingLookupFactoryTest.java
index 39bfc41a619..510839ffc3f 100644
---
a/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/PollingLookupFactoryTest.java
+++
b/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/PollingLookupFactoryTest.java
@@ -36,6 +36,8 @@ public class PollingLookupFactoryTest
EasyMock.expect(pollingLookup.isOpen()).andReturn(true).once();
EasyMock.replay(pollingLookup);
Assert.assertTrue(pollingLookupFactory.start());
+ pollingLookupFactory.awaitInitialization();
+ Assert.assertTrue(pollingLookupFactory.isInitialized());
EasyMock.verify(pollingLookup);
}
diff --git
a/processing/src/main/java/org/apache/druid/query/lookup/LookupExtractorFactory.java
b/processing/src/main/java/org/apache/druid/query/lookup/LookupExtractorFactory.java
index 8d4ee42c5bb..80e1189ae6a 100644
---
a/processing/src/main/java/org/apache/druid/query/lookup/LookupExtractorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/lookup/LookupExtractorFactory.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Supplier;
import javax.annotation.Nullable;
+import java.util.concurrent.TimeoutException;
/**
* Users of Lookup Extraction need to implement a {@link
LookupExtractorFactory} supplier of type {@link LookupExtractor}.
@@ -79,4 +80,14 @@ public interface LookupExtractorFactory extends
Supplier<LookupExtractor>
*/
@Nullable
LookupIntrospectHandler getIntrospectHandler();
+
+ /**
+ * awaitToInitialise blocks and wait for the cache to initialize fully.
+ */
+ void awaitInitialization() throws InterruptedException, TimeoutException;
+
+ /**
+ * @return true if cache is loaded and lookup is queryable else returns false
+ */
+ boolean isInitialized();
}
diff --git
a/processing/src/test/java/org/apache/druid/query/lookup/LookupExtractorFactoryContainerTest.java
b/processing/src/test/java/org/apache/druid/query/lookup/LookupExtractorFactoryContainerTest.java
index 500093fbd36..727fdf52dd3 100644
---
a/processing/src/test/java/org/apache/druid/query/lookup/LookupExtractorFactoryContainerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/lookup/LookupExtractorFactoryContainerTest.java
@@ -104,6 +104,17 @@ public class LookupExtractorFactoryContainerTest
return null;
}
+ @Override
+ public void awaitInitialization()
+ {
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return true;
+ }
+
@Override
public LookupExtractor get()
{
diff --git
a/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java
b/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java
index cc5630b9331..2d36ffa1e63 100644
---
a/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java
+++
b/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java
@@ -82,6 +82,17 @@ public class LookupSegmentTest
throw new UnsupportedOperationException("not needed for this test");
}
+ @Override
+ public void awaitInitialization()
+ {
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return true;
+ }
+
@Override
public LookupExtractor get()
{
diff --git
a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java
b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java
index 01f7d684831..53072b96b10 100644
---
a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java
+++
b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java
@@ -94,7 +94,9 @@ class LookupListeningResource extends ListenerResource
try {
state.getToLoad().forEach(manager::add);
- state.getToDrop().forEach(manager::remove);
+ state.getToDrop().forEach(lookName -> {
+ manager.remove(lookName,
state.getToLoad().getOrDefault(lookName, null));
+ });
return
Response.status(Response.Status.ACCEPTED).entity(manager.getAllLookupsState()).build();
}
@@ -135,7 +137,7 @@ class LookupListeningResource extends ListenerResource
@Override
public Object delete(String id)
{
- manager.remove(id);
+ manager.remove(id, null);
return id;
}
}
diff --git
a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
index 3cdaec0d4a3..03879b0eaea 100644
---
a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
+++
b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
@@ -66,6 +66,7 @@ import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
@@ -117,6 +118,8 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
private final LookupConfig lookupConfig;
+ private ExecutorService lookupUpdateExecutorService;
+
@Inject
public LookupReferencesManager(
LookupConfig lookupConfig,
@@ -147,6 +150,10 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
this.lookupListeningAnnouncerConfig = lookupListeningAnnouncerConfig;
this.lookupConfig = lookupConfig;
this.testMode = testMode;
+ this.lookupUpdateExecutorService = Execs.multiThreaded(
+ lookupConfig.getNumLookupLoadingThreads(),
+ "LookupExtractorFactoryContainerProvider-Update-%s"
+ );
}
@LifecycleStart
@@ -217,7 +224,7 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
Map<String, LookupExtractorFactoryContainer> lookupMap = new
HashMap<>(swappedState.lookupMap);
for (Notice notice : swappedState.noticesBeingHandled) {
try {
- notice.handle(lookupMap);
+ notice.handle(lookupMap, this);
}
catch (Exception ex) {
LOG.error(ex, "Exception occurred while handling lookup notice [%s].",
notice);
@@ -266,7 +273,7 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
LOG.error(ex, "Failed to close lookup [%s].", e.getKey());
}
}
-
+ lookupUpdateExecutorService.shutdown();
LOG.debug("LookupExtractorFactoryContainerProvider is stopped.");
}
@@ -277,10 +284,10 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
addNotice(new LoadNotice(lookupName, lookupExtractorFactoryContainer,
lookupConfig.getLookupStartRetries()));
}
- public void remove(String lookupName)
+ public void remove(String lookupName, LookupExtractorFactoryContainer
loadedContainer)
{
Preconditions.checkState(lifecycleLock.awaitStarted(1,
TimeUnit.MILLISECONDS));
- addNotice(new DropNotice(lookupName));
+ addNotice(new DropNotice(lookupName, loadedContainer));
}
private void addNotice(Notice notice)
@@ -301,6 +308,11 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
LockSupport.unpark(mainThread);
}
+ public void submitAsyncLookupTask(Runnable task)
+ {
+ lookupUpdateExecutorService.submit(task);
+ }
+
@Override
public Optional<LookupExtractorFactoryContainer> get(String lookupName)
{
@@ -595,11 +607,24 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
)
);
}
-
+ private void dropContainer(LookupExtractorFactoryContainer container, String
lookupName)
+ {
+ if (container != null) {
+ LOG.debug("Removed lookup [%s] with spec [%s].", lookupName, container);
+
+ if (!container.getLookupExtractorFactory().destroy()) {
+ throw new ISE(
+ "destroy method returned false for lookup [%s]:[%s]",
+ lookupName,
+ container
+ );
+ }
+ }
+ }
@VisibleForTesting
interface Notice
{
- void handle(Map<String, LookupExtractorFactoryContainer> lookupMap) throws
Exception;
+ void handle(Map<String, LookupExtractorFactoryContainer> lookupMap,
LookupReferencesManager manager) throws Exception;
}
private static class LoadNotice implements Notice
@@ -616,7 +641,8 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
}
@Override
- public void handle(Map<String, LookupExtractorFactoryContainer> lookupMap)
throws Exception
+ public void handle(Map<String, LookupExtractorFactoryContainer> lookupMap,
LookupReferencesManager manager)
+ throws Exception
{
LookupExtractorFactoryContainer old = lookupMap.get(lookupName);
if (old != null && !lookupExtractorFactoryContainer.replaces(old)) {
@@ -642,18 +668,45 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
e -> true,
startRetries
);
-
- old = lookupMap.put(lookupName, lookupExtractorFactoryContainer);
-
- LOG.debug("Loaded lookup [%s] with spec [%s].", lookupName,
lookupExtractorFactoryContainer);
-
- if (old != null) {
- if (!old.getLookupExtractorFactory().destroy()) {
- throw new ISE("destroy method returned false for lookup [%s]:[%s]",
lookupName, old);
- }
+ if
(lookupExtractorFactoryContainer.getLookupExtractorFactory().isInitialized()) {
+ old = lookupMap.put(lookupName, lookupExtractorFactoryContainer);
+ LOG.debug("Loaded lookup [%s] with spec [%s].", lookupName,
lookupExtractorFactoryContainer);
+ manager.dropContainer(old, lookupName);
+ return;
}
+ manager.submitAsyncLookupTask(() -> {
+ try {
+ /*
+ Retry startRetries times and wait for first cache to load for new
container,
+ if loaded then kill old container and start serving from new one.
+ If new lookupExtractorFactoryContainer has errors in loading, kill the
new container and do not remove the old container
+ */
+ RetryUtils.retry(
+ () -> {
+
lookupExtractorFactoryContainer.getLookupExtractorFactory().awaitInitialization();
+ return null;
+ }, e -> true,
+ startRetries
+ );
+ if
(lookupExtractorFactoryContainer.getLookupExtractorFactory().isInitialized()) {
+ // send load notice with cache loaded container
+ manager.add(lookupName, lookupExtractorFactoryContainer);
+ } else {
+ // skip loading new container as it is failed after 3 attempts
+ manager.dropContainer(lookupExtractorFactoryContainer, lookupName);
+ }
+ }
+ catch (Exception e) {
+ // drop new failed container and continue serving old one
+ LOG.error(
+ e,
+ "Exception in updating the namespace %s, continue serving from
old container and killing new container ",
+ lookupExtractorFactoryContainer
+ );
+ manager.dropContainer(lookupExtractorFactoryContainer, lookupName);
+ }
+ });
}
-
@Override
public String toString()
{
@@ -667,28 +720,36 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
private static class DropNotice implements Notice
{
private final String lookupName;
+ private final LookupExtractorFactoryContainer loadedContainer;
- DropNotice(String lookupName)
+ /**
+ * @param lookupName Name of the lookup to drop
+ * @param loadedContainer Container ref to newly loaded container, this is
mandatory in the update lookup call, it can be null in purely drop call.
+ */
+ DropNotice(String lookupName, @Nullable LookupExtractorFactoryContainer
loadedContainer)
{
this.lookupName = lookupName;
+ this.loadedContainer = loadedContainer;
}
@Override
- public void handle(Map<String, LookupExtractorFactoryContainer> lookupMap)
+ public void handle(Map<String, LookupExtractorFactoryContainer> lookupMap,
LookupReferencesManager manager)
{
- final LookupExtractorFactoryContainer lookupExtractorFactoryContainer =
lookupMap.remove(lookupName);
-
- if (lookupExtractorFactoryContainer != null) {
- LOG.debug("Removed lookup [%s] with spec [%s].", lookupName,
lookupExtractorFactoryContainer);
-
- if
(!lookupExtractorFactoryContainer.getLookupExtractorFactory().destroy()) {
- throw new ISE(
- "destroy method returned false for lookup [%s]:[%s]",
- lookupName,
- lookupExtractorFactoryContainer
- );
- }
+ if (loadedContainer != null &&
!loadedContainer.getLookupExtractorFactory().isInitialized()) {
+ final LookupExtractorFactoryContainer containterToDrop =
lookupMap.get(lookupName);
+ manager.submitAsyncLookupTask(() -> {
+ try {
+ loadedContainer.getLookupExtractorFactory().awaitInitialization();
+ manager.dropContainer(containterToDrop, lookupName);
+ }
+ catch (InterruptedException | TimeoutException e) {
+ // do nothing as loadedContainer is dropped by LoadNotice handler
eventually if cache is not loaded
+ }
+ });
+ return;
}
+ final LookupExtractorFactoryContainer lookupExtractorFactoryContainer =
lookupMap.remove(lookupName);
+ manager.dropContainer(lookupExtractorFactoryContainer, lookupName);
}
@Override
diff --git
a/server/src/main/java/org/apache/druid/query/lookup/MapLookupExtractorFactory.java
b/server/src/main/java/org/apache/druid/query/lookup/MapLookupExtractorFactory.java
index 3f39e149ba3..74f68b20a5d 100644
---
a/server/src/main/java/org/apache/druid/query/lookup/MapLookupExtractorFactory.java
+++
b/server/src/main/java/org/apache/druid/query/lookup/MapLookupExtractorFactory.java
@@ -87,6 +87,17 @@ public class MapLookupExtractorFactory implements
LookupExtractorFactory
return lookupIntrospectHandler;
}
+ @Override
+ public void awaitInitialization()
+ {
+
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return true;
+ }
@Override
public LookupExtractor get()
{
diff --git
a/server/src/test/java/org/apache/druid/query/expression/LookupEnabledTestExprMacroTable.java
b/server/src/test/java/org/apache/druid/query/expression/LookupEnabledTestExprMacroTable.java
index 07f225d7eef..c5c399e8b4c 100644
---
a/server/src/test/java/org/apache/druid/query/expression/LookupEnabledTestExprMacroTable.java
+++
b/server/src/test/java/org/apache/druid/query/expression/LookupEnabledTestExprMacroTable.java
@@ -98,6 +98,17 @@ public class LookupEnabledTestExprMacroTable extends
ExprMacroTable
throw new UnsupportedOperationException();
}
+ @Override
+ public void awaitInitialization()
+ {
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return true;
+ }
+
@Override
public LookupExtractor get()
{
diff --git
a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
index 8cc6956d117..855b761ec16 100644
---
a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
+++
b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
@@ -46,6 +46,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
public class LookupReferencesManagerTest
{
@@ -149,7 +150,7 @@ public class LookupReferencesManagerTest
@Test(expected = IllegalStateException.class)
public void testRemoveExceptionWhenClosed()
{
- lookupReferencesManager.remove("test");
+ lookupReferencesManager.remove("test", null);
}
@Test(expected = IllegalStateException.class)
@@ -164,6 +165,7 @@ public class LookupReferencesManagerTest
LookupExtractorFactory lookupExtractorFactory =
EasyMock.createMock(LookupExtractorFactory.class);
EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once();
EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once();
+
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
EasyMock.replay(lookupExtractorFactory);
Map<String, Object> lookupMap = new HashMap<>();
@@ -193,18 +195,132 @@ public class LookupReferencesManagerTest
Assert.assertEquals(Optional.of(testContainer),
lookupReferencesManager.get("test"));
- lookupReferencesManager.remove("test");
+ lookupReferencesManager.remove("test", testContainer);
lookupReferencesManager.handlePendingNotices();
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
}
+ @Test
+ public void testLoadBadContaineAfterOldGoodContainer() throws Exception
+ {
+ // Test the scenario of not loading the new container until it get
intialized
+ LookupExtractorFactory lookupExtractorFactory =
EasyMock.createMock(LookupExtractorFactory.class);
+ EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once();
+ EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once();
+
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
+ EasyMock.replay(lookupExtractorFactory);
+
+ Map<String, Object> lookupMap = new HashMap<>();
+ lookupMap.put("testMockForAddGetRemove", container);
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
+ EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
+ EasyMock.replay(config);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
+ lookupReferencesManager.start();
+ Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
+
+ LookupExtractorFactoryContainer testContainer = new
LookupExtractorFactoryContainer("0", lookupExtractorFactory);
+
+ lookupReferencesManager.add("test", testContainer);
+ lookupReferencesManager.handlePendingNotices();
+
+ Assert.assertEquals(Optional.of(testContainer),
lookupReferencesManager.get("test"));
+
+ LookupExtractorFactory badLookupExtractorFactory =
EasyMock.createMock(LookupExtractorFactory.class);
+
EasyMock.expect(badLookupExtractorFactory.start()).andReturn(false).anyTimes();
+ badLookupExtractorFactory.awaitInitialization();
+ EasyMock.expectLastCall().andThrow(new TimeoutException());
+
EasyMock.expect(badLookupExtractorFactory.destroy()).andReturn(true).once();
+
EasyMock.expect(badLookupExtractorFactory.isInitialized()).andReturn(false).anyTimes();
+ EasyMock.replay(badLookupExtractorFactory);
+ LookupExtractorFactoryContainer badContainer = new
LookupExtractorFactoryContainer("0", badLookupExtractorFactory);
+ lookupReferencesManager.add("test", badContainer);
+
+ lookupReferencesManager.handlePendingNotices();
+
+ Assert.assertEquals(Optional.of(testContainer),
lookupReferencesManager.get("test"));
+
+ lookupReferencesManager.remove("test", testContainer);
+ lookupReferencesManager.handlePendingNotices();
+
+ Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
+ }
+
+ @Test
+ public void testDropOldContainerAfterNewLoadGoodContainer() throws Exception
+ {
+ // Test the scenario of dropping the current container only when new
container gets initialized
+ LookupExtractorFactory lookupExtractorFactory =
EasyMock.createMock(LookupExtractorFactory.class);
+ EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once();
+ EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once();
+
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
+ EasyMock.replay(lookupExtractorFactory);
+
+ Map<String, Object> lookupMap = new HashMap<>();
+ lookupMap.put("testMockForAddGetRemove", container);
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
+ EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
+ EasyMock.replay(config);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
+ lookupReferencesManager.start();
+ Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
+
+ LookupExtractorFactoryContainer testContainer = new
LookupExtractorFactoryContainer("0", lookupExtractorFactory);
+
+ lookupReferencesManager.add("test", testContainer);
+ lookupReferencesManager.handlePendingNotices();
+
+ Assert.assertEquals(Optional.of(testContainer),
lookupReferencesManager.get("test"));
+
+ LookupExtractorFactory badLookupExtractorFactory =
EasyMock.createMock(LookupExtractorFactory.class);
+
EasyMock.expect(badLookupExtractorFactory.start()).andReturn(false).anyTimes();
+ badLookupExtractorFactory.awaitInitialization();
+ EasyMock.expectLastCall().andThrow(new TimeoutException());
+
EasyMock.expect(badLookupExtractorFactory.destroy()).andReturn(true).once();
+
EasyMock.expect(badLookupExtractorFactory.isInitialized()).andReturn(false).anyTimes();
+ EasyMock.replay(badLookupExtractorFactory);
+ LookupExtractorFactoryContainer badContainer = new
LookupExtractorFactoryContainer("0", badLookupExtractorFactory);
+ lookupReferencesManager.remove("test", badContainer); // new container to
load is badContainer here
+
+ lookupReferencesManager.handlePendingNotices();
+
+ Assert.assertEquals(Optional.of(testContainer),
lookupReferencesManager.get("test"));
+
+ lookupReferencesManager.remove("test", testContainer);
+ lookupReferencesManager.handlePendingNotices();
+
+ Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
+ }
@Test
public void testCloseIsCalledAfterStopping() throws Exception
{
- LookupExtractorFactory lookupExtractorFactory =
EasyMock.createStrictMock(LookupExtractorFactory.class);
+ LookupExtractorFactory lookupExtractorFactory =
EasyMock.createMock(LookupExtractorFactory.class);
EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once();
EasyMock.expect(lookupExtractorFactory.close()).andReturn(true).once();
+
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
EasyMock.replay(lookupExtractorFactory);
Map<String, Object> lookupMap = new HashMap<>();
lookupMap.put("testMockForCloseIsCalledAfterStopping", container);
@@ -234,7 +350,8 @@ public class LookupReferencesManagerTest
@Test
public void testDestroyIsCalledAfterRemove() throws Exception
{
- LookupExtractorFactory lookupExtractorFactory =
EasyMock.createStrictMock(LookupExtractorFactory.class);
+ LookupExtractorFactory lookupExtractorFactory =
EasyMock.createMock(LookupExtractorFactory.class);
+
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once();
EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once();
EasyMock.replay(lookupExtractorFactory);
@@ -256,11 +373,12 @@ public class LookupReferencesManagerTest
).addChunk(strResult);
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
EasyMock.replay(druidLeaderClient);
+ LookupExtractorFactoryContainer container = new
LookupExtractorFactoryContainer("0", lookupExtractorFactory);
lookupReferencesManager.start();
- lookupReferencesManager.add("testMock", new
LookupExtractorFactoryContainer("0", lookupExtractorFactory));
+ lookupReferencesManager.add("testMock", container);
lookupReferencesManager.handlePendingNotices();
- lookupReferencesManager.remove("testMock");
+ lookupReferencesManager.remove("testMock", container);
lookupReferencesManager.handlePendingNotices();
EasyMock.verify(lookupExtractorFactory);
@@ -385,7 +503,7 @@ public class LookupReferencesManagerTest
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
- lookupReferencesManager.remove("test");
+ lookupReferencesManager.remove("test", null);
lookupReferencesManager.handlePendingNotices();
}
@@ -480,7 +598,7 @@ public class LookupReferencesManagerTest
lookupReferencesManager.add("one", container1);
lookupReferencesManager.add("two", container2);
lookupReferencesManager.handlePendingNotices();
- lookupReferencesManager.remove("one");
+ lookupReferencesManager.remove("one", container1);
lookupReferencesManager.add("three", container3);
LookupsState state = lookupReferencesManager.getAllLookupsState();
@@ -526,6 +644,7 @@ public class LookupReferencesManagerTest
LookupExtractorFactory lookupExtractorFactory =
EasyMock.createMock(LookupExtractorFactory.class);
EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once();
EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once();
+
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
EasyMock.replay(lookupExtractorFactory);
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
@@ -541,7 +660,7 @@ public class LookupReferencesManagerTest
lookupReferencesManager.getAllLookupNames()
);
- lookupReferencesManager.remove("test");
+ lookupReferencesManager.remove("test", null);
while (lookupReferencesManager.get("test").isPresent()) {
Thread.sleep(100);
diff --git
a/server/src/test/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFnTest.java
b/server/src/test/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFnTest.java
index 5e877e8288c..532c1213e26 100644
---
a/server/src/test/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFnTest.java
+++
b/server/src/test/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFnTest.java
@@ -282,6 +282,16 @@ public class RegisteredLookupExtractionFnTest
return null;
}
+ @Override
+ public void awaitInitialization()
+ {
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return true;
+ }
@Override
public LookupExtractor get()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]