This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b53e08cf1b5 Revert "Use CoordinatorClient for fetching tier lookup
(#18142)" (#18189)
b53e08cf1b5 is described below
commit b53e08cf1b5c34dbca391c1f538ebe9b8dce3579
Author: Uddeshya Singh <[email protected]>
AuthorDate: Wed Jul 2 18:35:25 2025 +0530
Revert "Use CoordinatorClient for fetching tier lookup (#18142)" (#18189)
This reverts commit 6b3cb50cdd8fc8c3dd9db2b55351758634c975a5.
---
.../client/coordinator/CoordinatorClient.java | 11 -
.../client/coordinator/CoordinatorClientImpl.java | 36 --
.../client/coordinator/NoopCoordinatorClient.java | 10 -
.../query/lookup/LookupReferencesManager.java | 98 +++---
.../coordinator/CoordinatorClientImplTest.java | 57 +---
.../query/lookup/LookupReferencesManagerTest.java | 365 ++++++++++++++++-----
6 files changed, 346 insertions(+), 231 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
index b4c27991bb5..7c909a2722a 100644
---
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
+++
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
@@ -23,7 +23,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.segment.metadata.DataSourceInformation;
import org.apache.druid.server.compaction.CompactionStatusResponse;
@@ -33,7 +32,6 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
-import java.util.Map;
import java.util.Set;
public interface CoordinatorClient
@@ -103,13 +101,4 @@ public interface CoordinatorClient
* API: {@code POST /druid/coordinator/v1/config}
*/
ListenableFuture<Void>
updateCoordinatorDynamicConfig(CoordinatorDynamicConfig dynamicConfig);
-
- /**
- * Gets the lookup configuration for a tier.
- * <p>
- * API: {@code GET /druid/coordinator/v1/lookups/config/<tier>}
- *
- * @param tier The name of the tier for which the lookup configuration is to
be fetched.
- */
- ListenableFuture<Map<String, LookupExtractorFactoryContainer>>
fetchLookupsForTier(String tier);
}
diff --git
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
index 9597b90a8ce..4ad2b270d14 100644
---
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
+++
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
@@ -30,11 +30,8 @@ import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import
org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
-import org.apache.druid.java.util.http.client.response.BytesFullResponseHolder;
import
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
-import org.apache.druid.query.lookup.LookupUtils;
import org.apache.druid.rpc.IgnoreHttpResponseHandler;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
@@ -50,7 +47,6 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Set;
public class CoordinatorClientImpl implements CoordinatorClient
@@ -254,36 +250,4 @@ public class CoordinatorClientImpl implements
CoordinatorClient
IgnoreHttpResponseHandler.INSTANCE
);
}
-
- @Override
- public ListenableFuture<Map<String, LookupExtractorFactoryContainer>>
fetchLookupsForTier(
- String tier
- )
- {
- final String path = StringUtils.format(
- "/druid/coordinator/v1/lookups/config/%s?detailed=true",
- StringUtils.urlEncode(tier)
- );
-
- return FutureUtils.transform(
- client.asyncRequest(
- new RequestBuilder(HttpMethod.GET, path),
- new BytesFullResponseHandler()
- ),
- this::extractLookupFactory
- );
- }
-
- private Map<String, LookupExtractorFactoryContainer>
extractLookupFactory(BytesFullResponseHolder holder)
- {
- Map<String, Object> lookupNameToGenericConfig = JacksonUtils.readValue(
- jsonMapper,
- holder.getContent(),
- new TypeReference<>() {}
- );
- return LookupUtils.tryConvertObjectMapToLookupConfigMap(
- lookupNameToGenericConfig,
- jsonMapper
- );
- }
}
diff --git
a/server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
b/server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
index a5b8a3cf369..24ec6c8ff03 100644
---
a/server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
+++
b/server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
@@ -23,7 +23,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.segment.metadata.DataSourceInformation;
import org.apache.druid.server.compaction.CompactionStatusResponse;
@@ -33,7 +32,6 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
-import java.util.Map;
import java.util.Set;
public class NoopCoordinatorClient implements CoordinatorClient
@@ -104,12 +102,4 @@ public class NoopCoordinatorClient implements
CoordinatorClient
{
throw new UnsupportedOperationException();
}
-
- @Override
- public ListenableFuture<Map<String, LookupExtractorFactoryContainer>>
fetchLookupsForTier(
- String tier
- )
- {
- throw new UnsupportedOperationException();
- }
}
diff --git
a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
index 0af00cb6153..ae8a2f41acd 100644
---
a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
+++
b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
@@ -20,31 +20,34 @@
package org.apache.druid.query.lookup;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.commons.lang3.mutable.MutableBoolean;
-import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.concurrent.LifecycleLock;
+import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.rpc.HttpResponseException;
+import
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
+import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import javax.annotation.Nullable;
@@ -75,7 +78,7 @@ import java.util.stream.Collectors;
* This class provide a basic {@link LookupExtractorFactory} references
manager. It allows basic operations fetching,
* listing, adding and deleting of {@link LookupExtractor} objects, and can
take periodic snap shot of the loaded lookup
* extractor specifications in order to bootstrap nodes after restart.
- * <p>
+ *
* It also implements {@link LookupExtractorFactoryContainerProvider}, to
supply queries and indexing transformations
* with a reference to a {@link LookupExtractorFactoryContainer}. This class
is a companion of
* {@link org.apache.druid.server.lookup.cache.LookupCoordinatorManager},
which communicates with
@@ -86,6 +89,9 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
{
private static final EmittingLogger LOG = new
EmittingLogger(LookupReferencesManager.class);
+ private static final TypeReference<Map<String, Object>>
LOOKUPS_ALL_GENERIC_REFERENCE =
+ new TypeReference<>() {};
+
// Lookups state (loaded/to-be-loaded/to-be-dropped etc) is managed by
immutable LookupUpdateState instance.
// Any update to state is done by creating updated LookupUpdateState
instance and atomically setting that
// into the ref here.
@@ -105,7 +111,9 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
//for unit testing only
private final boolean testMode;
- private final CoordinatorClient coordinatorClient;
+ private final DruidLeaderClient druidLeaderClient;
+
+ private final ObjectMapper jsonMapper;
private final LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig;
@@ -117,18 +125,18 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
public LookupReferencesManager(
LookupConfig lookupConfig,
@Json ObjectMapper objectMapper,
- CoordinatorClient coordinatorClient,
+ @Coordinator DruidLeaderClient druidLeaderClient,
LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig
)
{
- this(lookupConfig, objectMapper, coordinatorClient,
lookupListeningAnnouncerConfig, false);
+ this(lookupConfig, objectMapper, druidLeaderClient,
lookupListeningAnnouncerConfig, false);
}
@VisibleForTesting
LookupReferencesManager(
LookupConfig lookupConfig,
ObjectMapper objectMapper,
- CoordinatorClient coordinatorClient,
+ DruidLeaderClient druidLeaderClient,
LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig,
boolean testMode
)
@@ -138,7 +146,8 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
} else {
this.lookupSnapshotTaker = new LookupSnapshotTaker(objectMapper,
lookupConfig.getSnapshotWorkingDir());
}
- this.coordinatorClient = coordinatorClient;
+ this.druidLeaderClient = druidLeaderClient;
+ this.jsonMapper = objectMapper;
this.lookupListeningAnnouncerConfig = lookupListeningAnnouncerConfig;
this.lookupConfig = lookupConfig;
this.testMode = testMode;
@@ -277,11 +286,7 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
if (lookupLoadingSpec.getMode() == LookupLoadingSpec.Mode.NONE ||
(lookupLoadingSpec.getMode() == LookupLoadingSpec.Mode.ONLY_REQUIRED
&& !lookupLoadingSpec.getLookupsToLoad().contains(lookupName))) {
- LOG.info(
- "Skipping notice to add lookup[%s] since current lookup loading
mode[%s] does not allow it.",
- lookupName,
- lookupLoadingSpec.getMode()
- );
+ LOG.info("Skipping notice to add lookup [%s] since current lookup
loading mode [%s] does not allow it.", lookupName, lookupLoadingSpec.getMode());
return;
}
addNotice(new LoadNotice(lookupName, lookupExtractorFactoryContainer,
lookupConfig.getLookupStartRetries()));
@@ -396,8 +401,7 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
lookupBeanList = getLookupsList();
if (lookupLoadingSpec.getMode() == LookupLoadingSpec.Mode.ONLY_REQUIRED
&& lookupBeanList != null) {
lookupBeanList = lookupBeanList.stream()
- .filter(lookupBean ->
lookupLoadingSpec.getLookupsToLoad()
-
.contains(lookupBean.getName()))
+ .filter(lookupBean ->
lookupLoadingSpec.getLookupsToLoad().contains(lookupBean.getName()))
.collect(Collectors.toList());
}
}
@@ -433,6 +437,7 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
* Returns a list of lookups from the coordinator if the coordinator is
available. If it's not available, returns null.
*
* @param tier lookup tier name
+ *
* @return list of LookupBean objects, or null
*/
@Nullable
@@ -471,24 +476,37 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
@Nullable
private Map<String, LookupExtractorFactoryContainer>
tryGetLookupListFromCoordinator(String tier)
+ throws IOException, InterruptedException
{
- try {
- return
FutureUtils.getUnchecked(coordinatorClient.fetchLookupsForTier(tier), true);
+ final StringFullResponseHolder response = fetchLookupsForTier(tier);
+ if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
+ LOG.warn("No lookups found for tier [%s], response [%s]", tier,
response);
+ return null;
+ } else if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+ throw new IOE(
+ "Error while fetching lookup code from Coordinator with status[%s]
and content[%s]",
+ response.getStatus(),
+ response.getContent()
+ );
}
- catch (Exception e) {
- Throwable rootCause = Throwables.getRootCause(e);
- if (rootCause instanceof HttpResponseException) {
- final HttpResponseException httpException = (HttpResponseException)
rootCause;
- if
(httpException.getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
- LOG.info(
- "No lookups found for tier [%s], status [%s]",
- tier,
- httpException.getResponse().getStatus()
- );
- return null;
- }
- }
- throw e;
+
+ // Older version of getSpecificTier returns a list of lookup names.
+ // Lookup loading is performed via snapshot if older version is present.
+ // This check is only for backward compatibility and should be removed in
a future release
+ if (response.getContent().startsWith("[")) {
+ LOG.info(
+ "Failed to retrieve lookup information from coordinator, " +
+ "because coordinator appears to be running on older Druid version. "
+
+ "Attempting to load lookups using snapshot instead"
+ );
+ return null;
+ } else {
+ Map<String, Object> lookupNameToGenericConfig =
+ jsonMapper.readValue(response.getContent(),
LOOKUPS_ALL_GENERIC_REFERENCE);
+ return LookupUtils.tryConvertObjectMapToLookupConfigMap(
+ lookupNameToGenericConfig,
+ jsonMapper
+ );
}
}
@@ -610,6 +628,15 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
}
}
+ private StringFullResponseHolder fetchLookupsForTier(String tier) throws
InterruptedException, IOException
+ {
+ return druidLeaderClient.go(
+ druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+
StringUtils.format("/druid/coordinator/v1/lookups/config/%s?detailed=true",
tier)
+ )
+ );
+ }
private void dropContainer(LookupExtractorFactoryContainer container, String
lookupName)
{
if (container != null) {
@@ -624,12 +651,10 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
}
}
}
-
@VisibleForTesting
interface Notice
{
- void handle(Map<String, LookupExtractorFactoryContainer> lookupMap,
LookupReferencesManager manager)
- throws Exception;
+ void handle(Map<String, LookupExtractorFactoryContainer> lookupMap,
LookupReferencesManager manager) throws Exception;
}
private static class LoadNotice implements Notice
@@ -716,7 +741,6 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
}
});
}
-
@Override
public String toString()
{
diff --git
a/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java
b/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java
index b6df230e034..4539bda8186 100644
---
a/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java
+++
b/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java
@@ -29,16 +29,12 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Injector;
import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
-import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.initialization.CoreInjectorBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.lookup.LookupExtractorFactory;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
-import org.apache.druid.query.lookup.MapLookupExtractorFactory;
import org.apache.druid.rpc.MockServiceClient;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.segment.column.ColumnType;
@@ -99,10 +95,7 @@ public class CoordinatorClientImplTest
jsonMapper.setInjectableValues(
new InjectableValues.Std(ImmutableMap.of(
DataSegment.PruneSpecsHolder.class.getName(),
- DataSegment.PruneSpecsHolder.DEFAULT
- ))
- );
- jsonMapper.registerSubtypes(MapLookupExtractorFactory.class);
+ DataSegment.PruneSpecsHolder.DEFAULT)));
serviceClient = new MockServiceClient();
coordinatorClient = new CoordinatorClientImpl(serviceClient, jsonMapper);
}
@@ -151,10 +144,7 @@ public class CoordinatorClientImplTest
.build();
serviceClient.expectAndRespond(
- new RequestBuilder(
- HttpMethod.GET,
-
"/druid/coordinator/v1/metadata/datasources/xyz/segments/def?includeUnused=false"
- ),
+ new RequestBuilder(HttpMethod.GET,
"/druid/coordinator/v1/metadata/datasources/xyz/segments/def?includeUnused=false"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(segment)
@@ -355,13 +345,10 @@ public class CoordinatorClientImplTest
.size(1)
.build(),
serverMetadataSet
- );
+ );
serviceClient.expectAndRespond(
- new RequestBuilder(
- HttpMethod.GET,
-
"/druid/coordinator/v1/datasources/xyz/intervals/2001-01-01T00:00:00.000Z_2002-01-01T00:00:00.000Z/serverview?full"
- ),
+ new RequestBuilder(HttpMethod.GET,
"/druid/coordinator/v1/datasources/xyz/intervals/2001-01-01T00:00:00.000Z_2002-01-01T00:00:00.000Z/serverview?full"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(Collections.singletonList(immutableSegmentLoadInfo1))
@@ -379,10 +366,7 @@ public class CoordinatorClientImplTest
);
serviceClient.expectAndRespond(
- new RequestBuilder(
- HttpMethod.GET,
-
"/druid/coordinator/v1/datasources/xyz/intervals/2501-01-01T00:00:00.000Z_2502-01-01T00:00:00.000Z/serverview?full"
- ),
+ new RequestBuilder(HttpMethod.GET,
"/druid/coordinator/v1/datasources/xyz/intervals/2501-01-01T00:00:00.000Z_2502-01-01T00:00:00.000Z/serverview?full"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(Collections.singletonList(immutableSegmentLoadInfo2))
@@ -487,35 +471,4 @@ public class CoordinatorClientImplTest
Assert.assertNull(coordinatorClient.updateCoordinatorDynamicConfig(config).get());
}
-
- @Test
- public void test_fetchLookupsForTier_detailedEnabled() throws Exception
- {
- LookupExtractorFactory lookupData = new MapLookupExtractorFactory(
- Map.of(
- "77483", "United States",
- "77484", "India"
- ),
- true
- );
- LookupExtractorFactoryContainer lookupDataContainer = new
LookupExtractorFactoryContainer("v0", lookupData);
- Map<String, LookupExtractorFactoryContainer> lookups = Map.of(
- "default_tier", lookupDataContainer
- );
-
- serviceClient.expectAndRespond(
- new RequestBuilder(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/default_tier?detailed=true"
- ),
- HttpResponseStatus.OK,
- Map.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
- DefaultObjectMapper.INSTANCE.writeValueAsBytes(lookups)
- );
-
- Assert.assertEquals(
- lookups,
-
FutureUtils.getUnchecked(coordinatorClient.fetchLookupsForTier("default_tier"),
true)
- );
- }
}
diff --git
a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
index 3631be75a74..d5b179176c6 100644
---
a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
+++
b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
@@ -22,18 +22,18 @@ package org.apache.druid.query.lookup;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.Futures;
-import org.apache.druid.client.coordinator.CoordinatorClientImpl;
+import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
import
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
-import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.HttpVersion;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -41,6 +41,7 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
+import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@@ -59,7 +60,7 @@ public class LookupReferencesManagerTest
LookupExtractorFactory lookupExtractorFactory;
LookupExtractorFactoryContainer container;
ObjectMapper mapper = new DefaultObjectMapper();
- private CoordinatorClientImpl coordinatorClient;
+ private DruidLeaderClient druidLeaderClient;
private LookupListeningAnnouncerConfig config;
@Before
@@ -67,7 +68,7 @@ public class LookupReferencesManagerTest
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
- coordinatorClient = EasyMock.createMock(CoordinatorClientImpl.class);
+ druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class);
config = EasyMock.createMock(LookupListeningAnnouncerConfig.class);
EasyMock.expect(config.getLookupLoadingSpec()).andReturn(LookupLoadingSpec.ALL).anyTimes();
@@ -80,29 +81,50 @@ public class LookupReferencesManagerTest
);
container = new LookupExtractorFactoryContainer("v0",
lookupExtractorFactory);
mapper.registerSubtypes(MapLookupExtractorFactory.class);
+ String temporaryPath = temporaryFolder.newFolder().getAbsolutePath();
lookupReferencesManager = new LookupReferencesManager(
new LookupConfig(temporaryFolder.newFolder().getAbsolutePath()),
mapper,
- coordinatorClient,
+ druidLeaderClient,
config,
true
);
}
+ private static HttpResponse newEmptyResponse(final HttpResponseStatus status)
+ {
+ final HttpResponse response = EasyMock.createNiceMock(HttpResponse.class);
+ EasyMock.expect(response.getStatus()).andReturn(status).anyTimes();
+ EasyMock.expect(response.getContent()).andReturn(new
BigEndianHeapChannelBuffer(0));
+ EasyMock.replay(response);
+ return response;
+ }
+
@Test
- public void testStartStop() throws IOException
+ public void testStartStop() throws InterruptedException, IOException
{
lookupReferencesManager = new LookupReferencesManager(
new LookupConfig(null),
- mapper, coordinatorClient, config
+ mapper, druidLeaderClient, config
);
- Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
+ Map<String, Object> lookupMap = new HashMap<>();
lookupMap.put("testMockForStartStop", container);
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
Assert.assertFalse(lookupReferencesManager.lifecycleLock.awaitStarted(1,
TimeUnit.MICROSECONDS));
Assert.assertNull(lookupReferencesManager.mainThread);
Assert.assertNull(lookupReferencesManager.stateRef.get());
@@ -150,12 +172,23 @@ public class LookupReferencesManagerTest
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
EasyMock.replay(lookupExtractorFactory);
- Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
+ Map<String, Object> lookupMap = new HashMap<>();
lookupMap.put("testMockForAddGetRemove", container);
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
@@ -182,12 +215,23 @@ public class LookupReferencesManagerTest
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
EasyMock.replay(lookupExtractorFactory);
- Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
+ Map<String, Object> lookupMap = new HashMap<>();
lookupMap.put("testMockForAddGetRemove", container);
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
@@ -228,12 +272,23 @@ public class LookupReferencesManagerTest
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
EasyMock.replay(lookupExtractorFactory);
- Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
+ Map<String, Object> lookupMap = new HashMap<>();
lookupMap.put("testMockForAddGetRemove", container);
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
@@ -263,7 +318,6 @@ public class LookupReferencesManagerTest
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
}
-
@Test
public void testCloseIsCalledAfterStopping() throws Exception
{
@@ -272,12 +326,23 @@ public class LookupReferencesManagerTest
EasyMock.expect(lookupExtractorFactory.close()).andReturn(true).once();
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
EasyMock.replay(lookupExtractorFactory);
- Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
+ Map<String, Object> lookupMap = new HashMap<>();
lookupMap.put("testMockForCloseIsCalledAfterStopping", container);
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
lookupReferencesManager.add("testMock", new
LookupExtractorFactoryContainer("0", lookupExtractorFactory));
lookupReferencesManager.handlePendingNotices();
@@ -295,12 +360,23 @@ public class LookupReferencesManagerTest
EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once();
EasyMock.replay(lookupExtractorFactory);
- Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
+ Map<String, Object> lookupMap = new HashMap<>();
lookupMap.put("testMockForDestroyIsCalledAfterRemove", container);
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
LookupExtractorFactoryContainer container = new
LookupExtractorFactoryContainer("0", lookupExtractorFactory);
lookupReferencesManager.start();
lookupReferencesManager.add("testMock", container);
@@ -315,12 +391,23 @@ public class LookupReferencesManagerTest
@Test
public void testGetNotThere() throws Exception
{
- Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
+ Map<String, Object> lookupMap = new HashMap<>();
lookupMap.put("testMockForGetNotThere", container);
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
Assert.assertEquals(Optional.empty(),
lookupReferencesManager.get("notThere"));
}
@@ -337,12 +424,23 @@ public class LookupReferencesManagerTest
EasyMock.expect(lookupExtractorFactory2.start()).andReturn(true).once();
EasyMock.replay(lookupExtractorFactory1, lookupExtractorFactory2);
- Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
+ Map<String, Object> lookupMap = new HashMap<>();
lookupMap.put("testMockForUpdateWithHigherVersion", container);
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
lookupReferencesManager.add("testName", new
LookupExtractorFactoryContainer("1", lookupExtractorFactory1));
lookupReferencesManager.handlePendingNotices();
@@ -362,12 +460,23 @@ public class LookupReferencesManagerTest
LookupExtractorFactory lookupExtractorFactory2 =
EasyMock.createNiceMock(LookupExtractorFactory.class);
EasyMock.replay(lookupExtractorFactory1, lookupExtractorFactory2);
- Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
+ Map<String, Object> lookupMap = new HashMap<>();
lookupMap.put("testMockForUpdateWithLowerVersion", container);
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
lookupReferencesManager.add("testName", new
LookupExtractorFactoryContainer("1", lookupExtractorFactory1));
lookupReferencesManager.handlePendingNotices();
@@ -385,27 +494,48 @@ public class LookupReferencesManagerTest
EasyMock.expect(lookupExtractorFactory1.start()).andReturn(true).once();
EasyMock.expect(lookupExtractorFactory1.isInitialized()).andReturn(false).anyTimes();
EasyMock.replay(lookupExtractorFactory1);
- Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
+ Map<String, Object> lookupMap = new HashMap<>();
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
lookupReferencesManager.add("testName", new
LookupExtractorFactoryContainer("1", lookupExtractorFactory1));
lookupReferencesManager.handlePendingNotices();
Assert.assertTrue(lookupReferencesManager.get("testName").isPresent());
EasyMock.verify(lookupExtractorFactory1);
}
-
@Test
public void testRemoveNonExisting() throws Exception
{
- Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
+ Map<String, Object> lookupMap = new HashMap<>();
lookupMap.put("testMockForRemoveNonExisting", container);
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
lookupReferencesManager.remove("test", null);
lookupReferencesManager.handlePendingNotices();
@@ -423,11 +553,20 @@ public class LookupReferencesManagerTest
"0",
new MapLookupExtractorFactory(ImmutableMap.of("key2", "value2"), true)
);
- Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
+ Map<String, Object> lookupMap = new HashMap<>();
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(
+ druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")
+ ).andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
lookupReferencesManager.add("one", container1);
lookupReferencesManager.add("two", container2);
@@ -437,7 +576,7 @@ public class LookupReferencesManagerTest
Assert.assertEquals(
ImmutableSet.of("one", "two"),
- (lookupReferencesManager).getAllLookupNames()
+ ((LookupExtractorFactoryContainerProvider)
lookupReferencesManager).getAllLookupNames()
);
}
@@ -480,11 +619,22 @@ public class LookupReferencesManagerTest
), true
)
);
- Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
+ Map<String, Object> lookupMap = new HashMap<>();
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
lookupReferencesManager.add("one", container1);
lookupReferencesManager.add("two", container2);
@@ -510,14 +660,25 @@ public class LookupReferencesManagerTest
{
LookupReferencesManager lookupReferencesManager = new
LookupReferencesManager(
new LookupConfig(temporaryFolder.newFolder().getAbsolutePath()),
- mapper, coordinatorClient, config
+ mapper, druidLeaderClient, config
);
- Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
+ Map<String, Object> lookupMap = new HashMap<>();
lookupMap.put("testMockForRealModeWithMainThread", container);
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
Assert.assertTrue(lookupReferencesManager.mainThread.isAlive());
@@ -588,14 +749,25 @@ public class LookupReferencesManagerTest
), true
)
);
- Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
+ Map<String, Object> lookupMap = new HashMap<>();
lookupMap.put("testLookup1", container1);
lookupMap.put("testLookup2", container2);
lookupMap.put("testLookup3", container3);
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
Assert.assertEquals(Optional.of(container1),
lookupReferencesManager.get("testLookup1"));
@@ -624,16 +796,27 @@ public class LookupReferencesManagerTest
)
);
EasyMock.reset(config);
- EasyMock.reset(coordinatorClient);
+ EasyMock.reset(druidLeaderClient);
Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
lookupMap.put("testLookup1", container1);
lookupMap.put("testLookup2", container2);
lookupMap.put("testLookup3", container3);
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
EasyMock.expect(config.getLookupLoadingSpec()).andReturn(lookupLoadingSpec);
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+ EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
return lookupMap;
@@ -661,9 +844,7 @@ public class LookupReferencesManagerTest
public void testCoordinatorLoadSubsetOfLookups() throws Exception
{
Map<String, LookupExtractorFactoryContainer> lookupMap =
- getLookupMapForSelectiveLoadingOfLookups(
- LookupLoadingSpec.loadOnly(ImmutableSet.of("testLookup1",
"testLookup2"))
- );
+
getLookupMapForSelectiveLoadingOfLookups(LookupLoadingSpec.loadOnly(ImmutableSet.of("testLookup1",
"testLookup2")));
Assert.assertEquals(Optional.of(lookupMap.get("testLookup1")),
lookupReferencesManager.get("testLookup1"));
Assert.assertEquals(Optional.of(lookupMap.get("testLookup2")),
lookupReferencesManager.get("testLookup2"));
Assert.assertFalse(lookupReferencesManager.get("testLookup3").isPresent());
@@ -722,15 +903,21 @@ public class LookupReferencesManagerTest
lookupReferencesManager = new LookupReferencesManager(
lookupConfig,
mapper,
- coordinatorClient,
+ druidLeaderClient,
config
);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
-
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andThrow(new
RuntimeException()).anyTimes();
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request)
+ .anyTimes();
+ EasyMock.expect(druidLeaderClient.go(request)).andThrow(new
IllegalStateException()).anyTimes();
+ EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
lookupReferencesManager.add("testMockForLoadLookupOnCoordinatorFailure",
container);
@@ -748,26 +935,23 @@ public class LookupReferencesManagerTest
lookupReferencesManager = new LookupReferencesManager(
lookupConfig,
mapper,
- coordinatorClient,
+ druidLeaderClient,
config,
true
);
EasyMock.reset(config);
- EasyMock.reset(coordinatorClient);
+ EasyMock.reset(druidLeaderClient);
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.expect(config.getLookupLoadingSpec()).andReturn(LookupLoadingSpec.ALL).anyTimes();
EasyMock.replay(config);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(
- Futures.immediateFailedFuture(
- new HttpResponseException(
- new StringFullResponseHolder(
- new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.NOT_FOUND),
- StandardCharsets.UTF_8
- )
- )
- )
- ).anyTimes();
- EasyMock.replay(coordinatorClient);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request)
+ .anyTimes();
+ EasyMock.expect(druidLeaderClient.go(request)).andThrow(new
IllegalStateException()).anyTimes();
+ EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
Assert.assertEquals(
Optional.of(container),
@@ -789,15 +973,26 @@ public class LookupReferencesManagerTest
LookupReferencesManager lookupReferencesManager = new
LookupReferencesManager(
lookupConfig,
mapper,
- coordinatorClient,
+ druidLeaderClient,
config
);
- Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
+ Map<String, Object> lookupMap = new HashMap<>();
lookupMap.put("testMockForDisableLookupSync", container);
+ String strResult = mapper.writeValueAsString(lookupMap);
+ Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
+ EasyMock.expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
+ .andReturn(request);
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ newEmptyResponse(HttpResponseStatus.OK),
+ StandardCharsets.UTF_8
+ ).addChunk(strResult);
+ EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
-
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
lookupReferencesManager.start();
Assert.assertEquals(Optional.empty(),
lookupReferencesManager.get("testMockForDisableLookupSync"));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]