This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6b3cb50cdd8 Use CoordinatorClient for fetching tier lookup (#18142)
6b3cb50cdd8 is described below
commit 6b3cb50cdd8fc8c3dd9db2b55351758634c975a5
Author: Uddeshya Singh <[email protected]>
AuthorDate: Fri Jun 27 18:08:40 2025 +0530
Use CoordinatorClient for fetching tier lookup (#18142)
This patch is part of the effort to phase out `DruidLeaderClient` and
utilize `CoordinatorClient`
to interact with Coordinators.
This patch does the following:
- Add API `CoordinatorClient.fetchLookupForTier()`
- Replace use of `DruidLeaderClient` in `LookupReferencesManager` with
`CoordinatorClient`
---
.../client/coordinator/CoordinatorClient.java | 11 +
.../client/coordinator/CoordinatorClientImpl.java | 36 ++
.../client/coordinator/NoopCoordinatorClient.java | 10 +
.../query/lookup/LookupReferencesManager.java | 98 +++---
.../coordinator/CoordinatorClientImplTest.java | 57 +++-
.../query/lookup/LookupReferencesManagerTest.java | 365 +++++----------------
6 files changed, 231 insertions(+), 346 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
index c4f112917c8..ba06b43b737 100644
---
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
+++
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
@@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.segment.metadata.DataSourceInformation;
import org.apache.druid.server.compaction.CompactionStatusResponse;
@@ -32,6 +33,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public interface CoordinatorClient
@@ -94,4 +96,13 @@ public interface CoordinatorClient
* API: {@code GET /druid/coordinator/v1/config}
*/
ListenableFuture<CoordinatorDynamicConfig> getCoordinatorDynamicConfig();
+
+ /**
+ * Gets the lookup configuration for a tier
+ * <p>
+ * API: {@code GET /druid/coordinator/v1/lookups/config/<tier>}
+ *
+ * @param tier The name of the tier for which the lookup
configuration is to be fetched.
+ */
+ ListenableFuture<Map<String, LookupExtractorFactoryContainer>>
fetchLookupsForTier(String tier);
}
diff --git
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
index bb89bc965e8..092bf5c2a5c 100644
---
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
+++
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
@@ -30,8 +30,11 @@ import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import
org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
+import org.apache.druid.java.util.http.client.response.BytesFullResponseHolder;
import
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
+import org.apache.druid.query.lookup.LookupUtils;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceRetryPolicy;
@@ -46,6 +49,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class CoordinatorClientImpl implements CoordinatorClient
@@ -239,4 +243,36 @@ public class CoordinatorClientImpl implements
CoordinatorClient
)
);
}
+
+ @Override
+ public ListenableFuture<Map<String, LookupExtractorFactoryContainer>>
fetchLookupsForTier(
+ String tier
+ )
+ {
+ final String path = StringUtils.format(
+ "/druid/coordinator/v1/lookups/config/%s?detailed=true",
+ StringUtils.urlEncode(tier)
+ );
+
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.GET, path),
+ new BytesFullResponseHandler()
+ ),
+ this::extractLookupFactory
+ );
+ }
+
+ private Map<String, LookupExtractorFactoryContainer>
extractLookupFactory(BytesFullResponseHolder holder)
+ {
+ Map<String, Object> lookupNameToGenericConfig = JacksonUtils.readValue(
+ jsonMapper,
+ holder.getContent(),
+ new TypeReference<>() {}
+ );
+ return LookupUtils.tryConvertObjectMapToLookupConfigMap(
+ lookupNameToGenericConfig,
+ jsonMapper
+ );
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
b/server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
index f65be567ff0..ceb5dd1944b 100644
---
a/server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
+++
b/server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
@@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.segment.metadata.DataSourceInformation;
import org.apache.druid.server.compaction.CompactionStatusResponse;
@@ -32,6 +33,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class NoopCoordinatorClient implements CoordinatorClient
@@ -97,4 +99,12 @@ public class NoopCoordinatorClient implements
CoordinatorClient
throw new UnsupportedOperationException();
}
+ @Override
+ public ListenableFuture<Map<String, LookupExtractorFactoryContainer>>
fetchLookupsForTier(
+ String tier
+ )
+ {
+ throw new UnsupportedOperationException();
+ }
+
}
diff --git
a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
index ae8a2f41acd..0af00cb6153 100644
---
a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
+++
b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
@@ -20,34 +20,31 @@
package org.apache.druid.query.lookup;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.commons.lang3.mutable.MutableBoolean;
-import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.concurrent.LifecycleLock;
-import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
-import
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
+import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
-import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import javax.annotation.Nullable;
@@ -78,7 +75,7 @@ import java.util.stream.Collectors;
* This class provide a basic {@link LookupExtractorFactory} references
manager. It allows basic operations fetching,
* listing, adding and deleting of {@link LookupExtractor} objects, and can
take periodic snap shot of the loaded lookup
* extractor specifications in order to bootstrap nodes after restart.
- *
+ * <p>
* It also implements {@link LookupExtractorFactoryContainerProvider}, to
supply queries and indexing transformations
* with a reference to a {@link LookupExtractorFactoryContainer}. This class
is a companion of
* {@link org.apache.druid.server.lookup.cache.LookupCoordinatorManager},
which communicates with
@@ -89,9 +86,6 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
{
private static final EmittingLogger LOG = new
EmittingLogger(LookupReferencesManager.class);
- private static final TypeReference<Map<String, Object>>
LOOKUPS_ALL_GENERIC_REFERENCE =
- new TypeReference<>() {};
-
// Lookups state (loaded/to-be-loaded/to-be-dropped etc) is managed by
immutable LookupUpdateState instance.
// Any update to state is done by creating updated LookupUpdateState
instance and atomically setting that
// into the ref here.
@@ -111,9 +105,7 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
//for unit testing only
private final boolean testMode;
- private final DruidLeaderClient druidLeaderClient;
-
- private final ObjectMapper jsonMapper;
+ private final CoordinatorClient coordinatorClient;
private final LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig;
@@ -125,18 +117,18 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
public LookupReferencesManager(
LookupConfig lookupConfig,
@Json ObjectMapper objectMapper,
- @Coordinator DruidLeaderClient druidLeaderClient,
+ CoordinatorClient coordinatorClient,
LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig
)
{
- this(lookupConfig, objectMapper, druidLeaderClient,
lookupListeningAnnouncerConfig, false);
+ this(lookupConfig, objectMapper, coordinatorClient,
lookupListeningAnnouncerConfig, false);
}
@VisibleForTesting
LookupReferencesManager(
LookupConfig lookupConfig,
ObjectMapper objectMapper,
- DruidLeaderClient druidLeaderClient,
+ CoordinatorClient coordinatorClient,
LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig,
boolean testMode
)
@@ -146,8 +138,7 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
} else {
this.lookupSnapshotTaker = new LookupSnapshotTaker(objectMapper,
lookupConfig.getSnapshotWorkingDir());
}
- this.druidLeaderClient = druidLeaderClient;
- this.jsonMapper = objectMapper;
+ this.coordinatorClient = coordinatorClient;
this.lookupListeningAnnouncerConfig = lookupListeningAnnouncerConfig;
this.lookupConfig = lookupConfig;
this.testMode = testMode;
@@ -286,7 +277,11 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
if (lookupLoadingSpec.getMode() == LookupLoadingSpec.Mode.NONE ||
(lookupLoadingSpec.getMode() == LookupLoadingSpec.Mode.ONLY_REQUIRED
&& !lookupLoadingSpec.getLookupsToLoad().contains(lookupName))) {
- LOG.info("Skipping notice to add lookup [%s] since current lookup
loading mode [%s] does not allow it.", lookupName, lookupLoadingSpec.getMode());
+ LOG.info(
+ "Skipping notice to add lookup[%s] since current lookup loading
mode[%s] does not allow it.",
+ lookupName,
+ lookupLoadingSpec.getMode()
+ );
return;
}
addNotice(new LoadNotice(lookupName, lookupExtractorFactoryContainer,
lookupConfig.getLookupStartRetries()));
@@ -401,7 +396,8 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
lookupBeanList = getLookupsList();
if (lookupLoadingSpec.getMode() == LookupLoadingSpec.Mode.ONLY_REQUIRED
&& lookupBeanList != null) {
lookupBeanList = lookupBeanList.stream()
- .filter(lookupBean ->
lookupLoadingSpec.getLookupsToLoad().contains(lookupBean.getName()))
+ .filter(lookupBean ->
lookupLoadingSpec.getLookupsToLoad()
+
.contains(lookupBean.getName()))
.collect(Collectors.toList());
}
}
@@ -437,7 +433,6 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
* Returns a list of lookups from the coordinator if the coordinator is
available. If it's not available, returns null.
*
* @param tier lookup tier name
- *
* @return list of LookupBean objects, or null
*/
@Nullable
@@ -476,37 +471,24 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
@Nullable
private Map<String, LookupExtractorFactoryContainer>
tryGetLookupListFromCoordinator(String tier)
- throws IOException, InterruptedException
{
- final StringFullResponseHolder response = fetchLookupsForTier(tier);
- if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
- LOG.warn("No lookups found for tier [%s], response [%s]", tier,
response);
- return null;
- } else if (!response.getStatus().equals(HttpResponseStatus.OK)) {
- throw new IOE(
- "Error while fetching lookup code from Coordinator with status[%s]
and content[%s]",
- response.getStatus(),
- response.getContent()
- );
+ try {
+ return
FutureUtils.getUnchecked(coordinatorClient.fetchLookupsForTier(tier), true);
}
-
- // Older version of getSpecificTier returns a list of lookup names.
- // Lookup loading is performed via snapshot if older version is present.
- // This check is only for backward compatibility and should be removed in
a future release
- if (response.getContent().startsWith("[")) {
- LOG.info(
- "Failed to retrieve lookup information from coordinator, " +
- "because coordinator appears to be running on older Druid version. "
+
- "Attempting to load lookups using snapshot instead"
- );
- return null;
- } else {
- Map<String, Object> lookupNameToGenericConfig =
- jsonMapper.readValue(response.getContent(),
LOOKUPS_ALL_GENERIC_REFERENCE);
- return LookupUtils.tryConvertObjectMapToLookupConfigMap(
- lookupNameToGenericConfig,
- jsonMapper
- );
+ catch (Exception e) {
+ Throwable rootCause = Throwables.getRootCause(e);
+ if (rootCause instanceof HttpResponseException) {
+ final HttpResponseException httpException = (HttpResponseException)
rootCause;
+ if
(httpException.getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
+ LOG.info(
+ "No lookups found for tier [%s], status [%s]",
+ tier,
+ httpException.getResponse().getStatus()
+ );
+ return null;
+ }
+ }
+ throw e;
}
}
@@ -628,15 +610,6 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
}
}
- private StringFullResponseHolder fetchLookupsForTier(String tier) throws
InterruptedException, IOException
- {
- return druidLeaderClient.go(
- druidLeaderClient.makeRequest(
- HttpMethod.GET,
-
StringUtils.format("/druid/coordinator/v1/lookups/config/%s?detailed=true",
tier)
- )
- );
- }
private void dropContainer(LookupExtractorFactoryContainer container, String
lookupName)
{
if (container != null) {
@@ -651,10 +624,12 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
}
}
}
+
@VisibleForTesting
interface Notice
{
- void handle(Map<String, LookupExtractorFactoryContainer> lookupMap,
LookupReferencesManager manager) throws Exception;
+ void handle(Map<String, LookupExtractorFactoryContainer> lookupMap,
LookupReferencesManager manager)
+ throws Exception;
}
private static class LoadNotice implements Notice
@@ -741,6 +716,7 @@ public class LookupReferencesManager implements
LookupExtractorFactoryContainerP
}
});
}
+
@Override
public String toString()
{
diff --git
a/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java
b/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java
index 67180d9e1b2..ab4b01ab4c7 100644
---
a/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java
+++
b/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java
@@ -29,12 +29,16 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Injector;
import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.initialization.CoreInjectorBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.lookup.LookupExtractorFactory;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
+import org.apache.druid.query.lookup.MapLookupExtractorFactory;
import org.apache.druid.rpc.MockServiceClient;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.segment.column.ColumnType;
@@ -95,7 +99,10 @@ public class CoordinatorClientImplTest
jsonMapper.setInjectableValues(
new InjectableValues.Std(ImmutableMap.of(
DataSegment.PruneSpecsHolder.class.getName(),
- DataSegment.PruneSpecsHolder.DEFAULT)));
+ DataSegment.PruneSpecsHolder.DEFAULT
+ ))
+ );
+ jsonMapper.registerSubtypes(MapLookupExtractorFactory.class);
serviceClient = new MockServiceClient();
coordinatorClient = new CoordinatorClientImpl(serviceClient, jsonMapper);
}
@@ -144,7 +151,10 @@ public class CoordinatorClientImplTest
.build();
serviceClient.expectAndRespond(
- new RequestBuilder(HttpMethod.GET,
"/druid/coordinator/v1/metadata/datasources/xyz/segments/def?includeUnused=false"),
+ new RequestBuilder(
+ HttpMethod.GET,
+
"/druid/coordinator/v1/metadata/datasources/xyz/segments/def?includeUnused=false"
+ ),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(segment)
@@ -345,10 +355,13 @@ public class CoordinatorClientImplTest
.size(1)
.build(),
serverMetadataSet
- );
+ );
serviceClient.expectAndRespond(
- new RequestBuilder(HttpMethod.GET,
"/druid/coordinator/v1/datasources/xyz/intervals/2001-01-01T00:00:00.000Z_2002-01-01T00:00:00.000Z/serverview?full"),
+ new RequestBuilder(
+ HttpMethod.GET,
+
"/druid/coordinator/v1/datasources/xyz/intervals/2001-01-01T00:00:00.000Z_2002-01-01T00:00:00.000Z/serverview?full"
+ ),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(Collections.singletonList(immutableSegmentLoadInfo1))
@@ -366,7 +379,10 @@ public class CoordinatorClientImplTest
);
serviceClient.expectAndRespond(
- new RequestBuilder(HttpMethod.GET,
"/druid/coordinator/v1/datasources/xyz/intervals/2501-01-01T00:00:00.000Z_2502-01-01T00:00:00.000Z/serverview?full"),
+ new RequestBuilder(
+ HttpMethod.GET,
+
"/druid/coordinator/v1/datasources/xyz/intervals/2501-01-01T00:00:00.000Z_2502-01-01T00:00:00.000Z/serverview?full"
+ ),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(Collections.singletonList(immutableSegmentLoadInfo2))
@@ -450,4 +466,35 @@ public class CoordinatorClientImplTest
coordinatorClient.getCoordinatorDynamicConfig().get()
);
}
+
+ @Test
+ public void test_fetchLookupsForTier_detailedEnabled() throws Exception
+ {
+ LookupExtractorFactory lookupData = new MapLookupExtractorFactory(
+ Map.of(
+ "77483", "United States",
+ "77484", "India"
+ ),
+ true
+ );
+ LookupExtractorFactoryContainer lookupDataContainer = new
LookupExtractorFactoryContainer("v0", lookupData);
+ Map<String, LookupExtractorFactoryContainer> lookups = Map.of(
+ "default_tier", lookupDataContainer
+ );
+
+ serviceClient.expectAndRespond(
+ new RequestBuilder(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/default_tier?detailed=true"
+ ),
+ HttpResponseStatus.OK,
+ Map.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+ DefaultObjectMapper.INSTANCE.writeValueAsBytes(lookups)
+ );
+
+ Assert.assertEquals(
+ lookups,
+
FutureUtils.getUnchecked(coordinatorClient.fetchLookupsForTier("default_tier"),
true)
+ );
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
index d5b179176c6..3631be75a74 100644
---
a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
+++
b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
@@ -22,18 +22,18 @@ package org.apache.druid.query.lookup;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import org.apache.druid.discovery.DruidLeaderClient;
+import com.google.common.util.concurrent.Futures;
+import org.apache.druid.client.coordinator.CoordinatorClientImpl;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.http.client.Request;
import
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
+import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
-import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -41,7 +41,6 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
-import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@@ -60,7 +59,7 @@ public class LookupReferencesManagerTest
LookupExtractorFactory lookupExtractorFactory;
LookupExtractorFactoryContainer container;
ObjectMapper mapper = new DefaultObjectMapper();
- private DruidLeaderClient druidLeaderClient;
+ private CoordinatorClientImpl coordinatorClient;
private LookupListeningAnnouncerConfig config;
@Before
@@ -68,7 +67,7 @@ public class LookupReferencesManagerTest
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
- druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class);
+ coordinatorClient = EasyMock.createMock(CoordinatorClientImpl.class);
config = EasyMock.createMock(LookupListeningAnnouncerConfig.class);
EasyMock.expect(config.getLookupLoadingSpec()).andReturn(LookupLoadingSpec.ALL).anyTimes();
@@ -81,50 +80,29 @@ public class LookupReferencesManagerTest
);
container = new LookupExtractorFactoryContainer("v0",
lookupExtractorFactory);
mapper.registerSubtypes(MapLookupExtractorFactory.class);
- String temporaryPath = temporaryFolder.newFolder().getAbsolutePath();
lookupReferencesManager = new LookupReferencesManager(
new LookupConfig(temporaryFolder.newFolder().getAbsolutePath()),
mapper,
- druidLeaderClient,
+ coordinatorClient,
config,
true
);
}
- private static HttpResponse newEmptyResponse(final HttpResponseStatus status)
- {
- final HttpResponse response = EasyMock.createNiceMock(HttpResponse.class);
- EasyMock.expect(response.getStatus()).andReturn(status).anyTimes();
- EasyMock.expect(response.getContent()).andReturn(new
BigEndianHeapChannelBuffer(0));
- EasyMock.replay(response);
- return response;
- }
-
@Test
- public void testStartStop() throws InterruptedException, IOException
+ public void testStartStop() throws IOException
{
lookupReferencesManager = new LookupReferencesManager(
new LookupConfig(null),
- mapper, druidLeaderClient, config
+ mapper, coordinatorClient, config
);
- Map<String, Object> lookupMap = new HashMap<>();
+ Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
lookupMap.put("testMockForStartStop", container);
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
+ EasyMock.replay(coordinatorClient);
Assert.assertFalse(lookupReferencesManager.lifecycleLock.awaitStarted(1,
TimeUnit.MICROSECONDS));
Assert.assertNull(lookupReferencesManager.mainThread);
Assert.assertNull(lookupReferencesManager.stateRef.get());
@@ -172,23 +150,12 @@ public class LookupReferencesManagerTest
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
EasyMock.replay(lookupExtractorFactory);
- Map<String, Object> lookupMap = new HashMap<>();
+ Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
lookupMap.put("testMockForAddGetRemove", container);
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
+ EasyMock.replay(coordinatorClient);
lookupReferencesManager.start();
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
@@ -215,23 +182,12 @@ public class LookupReferencesManagerTest
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
EasyMock.replay(lookupExtractorFactory);
- Map<String, Object> lookupMap = new HashMap<>();
+ Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
lookupMap.put("testMockForAddGetRemove", container);
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
+ EasyMock.replay(coordinatorClient);
lookupReferencesManager.start();
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
@@ -272,23 +228,12 @@ public class LookupReferencesManagerTest
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
EasyMock.replay(lookupExtractorFactory);
- Map<String, Object> lookupMap = new HashMap<>();
+ Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
lookupMap.put("testMockForAddGetRemove", container);
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
+ EasyMock.replay(coordinatorClient);
lookupReferencesManager.start();
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
@@ -318,6 +263,7 @@ public class LookupReferencesManagerTest
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
}
+
@Test
public void testCloseIsCalledAfterStopping() throws Exception
{
@@ -326,23 +272,12 @@ public class LookupReferencesManagerTest
EasyMock.expect(lookupExtractorFactory.close()).andReturn(true).once();
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
EasyMock.replay(lookupExtractorFactory);
- Map<String, Object> lookupMap = new HashMap<>();
+ Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
lookupMap.put("testMockForCloseIsCalledAfterStopping", container);
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
+ EasyMock.replay(coordinatorClient);
lookupReferencesManager.start();
lookupReferencesManager.add("testMock", new
LookupExtractorFactoryContainer("0", lookupExtractorFactory));
lookupReferencesManager.handlePendingNotices();
@@ -360,23 +295,12 @@ public class LookupReferencesManagerTest
EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once();
EasyMock.replay(lookupExtractorFactory);
- Map<String, Object> lookupMap = new HashMap<>();
+ Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
lookupMap.put("testMockForDestroyIsCalledAfterRemove", container);
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
+ EasyMock.replay(coordinatorClient);
LookupExtractorFactoryContainer container = new
LookupExtractorFactoryContainer("0", lookupExtractorFactory);
lookupReferencesManager.start();
lookupReferencesManager.add("testMock", container);
@@ -391,23 +315,12 @@ public class LookupReferencesManagerTest
@Test
public void testGetNotThere() throws Exception
{
- Map<String, Object> lookupMap = new HashMap<>();
+ Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
lookupMap.put("testMockForGetNotThere", container);
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
+ EasyMock.replay(coordinatorClient);
lookupReferencesManager.start();
Assert.assertEquals(Optional.empty(),
lookupReferencesManager.get("notThere"));
}
@@ -424,23 +337,12 @@ public class LookupReferencesManagerTest
EasyMock.expect(lookupExtractorFactory2.start()).andReturn(true).once();
EasyMock.replay(lookupExtractorFactory1, lookupExtractorFactory2);
- Map<String, Object> lookupMap = new HashMap<>();
+ Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
lookupMap.put("testMockForUpdateWithHigherVersion", container);
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
+ EasyMock.replay(coordinatorClient);
lookupReferencesManager.start();
lookupReferencesManager.add("testName", new
LookupExtractorFactoryContainer("1", lookupExtractorFactory1));
lookupReferencesManager.handlePendingNotices();
@@ -460,23 +362,12 @@ public class LookupReferencesManagerTest
LookupExtractorFactory lookupExtractorFactory2 =
EasyMock.createNiceMock(LookupExtractorFactory.class);
EasyMock.replay(lookupExtractorFactory1, lookupExtractorFactory2);
- Map<String, Object> lookupMap = new HashMap<>();
+ Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
lookupMap.put("testMockForUpdateWithLowerVersion", container);
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
+ EasyMock.replay(coordinatorClient);
lookupReferencesManager.start();
lookupReferencesManager.add("testName", new
LookupExtractorFactoryContainer("1", lookupExtractorFactory1));
lookupReferencesManager.handlePendingNotices();
@@ -494,48 +385,27 @@ public class LookupReferencesManagerTest
EasyMock.expect(lookupExtractorFactory1.start()).andReturn(true).once();
EasyMock.expect(lookupExtractorFactory1.isInitialized()).andReturn(false).anyTimes();
EasyMock.replay(lookupExtractorFactory1);
- Map<String, Object> lookupMap = new HashMap<>();
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
+ Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
+ EasyMock.replay(coordinatorClient);
lookupReferencesManager.start();
lookupReferencesManager.add("testName", new
LookupExtractorFactoryContainer("1", lookupExtractorFactory1));
lookupReferencesManager.handlePendingNotices();
Assert.assertTrue(lookupReferencesManager.get("testName").isPresent());
EasyMock.verify(lookupExtractorFactory1);
}
+
@Test
public void testRemoveNonExisting() throws Exception
{
- Map<String, Object> lookupMap = new HashMap<>();
+ Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
lookupMap.put("testMockForRemoveNonExisting", container);
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
+ EasyMock.replay(coordinatorClient);
lookupReferencesManager.start();
lookupReferencesManager.remove("test", null);
lookupReferencesManager.handlePendingNotices();
@@ -553,20 +423,11 @@ public class LookupReferencesManagerTest
"0",
new MapLookupExtractorFactory(ImmutableMap.of("key2", "value2"), true)
);
- Map<String, Object> lookupMap = new HashMap<>();
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
+ Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(
- druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")
- ).andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
+ EasyMock.replay(coordinatorClient);
lookupReferencesManager.start();
lookupReferencesManager.add("one", container1);
lookupReferencesManager.add("two", container2);
@@ -576,7 +437,7 @@ public class LookupReferencesManagerTest
Assert.assertEquals(
ImmutableSet.of("one", "two"),
- ((LookupExtractorFactoryContainerProvider)
lookupReferencesManager).getAllLookupNames()
+ (lookupReferencesManager).getAllLookupNames()
);
}
@@ -619,22 +480,11 @@ public class LookupReferencesManagerTest
), true
)
);
- Map<String, Object> lookupMap = new HashMap<>();
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
+ Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
+ EasyMock.replay(coordinatorClient);
lookupReferencesManager.start();
lookupReferencesManager.add("one", container1);
lookupReferencesManager.add("two", container2);
@@ -660,25 +510,14 @@ public class LookupReferencesManagerTest
{
LookupReferencesManager lookupReferencesManager = new
LookupReferencesManager(
new LookupConfig(temporaryFolder.newFolder().getAbsolutePath()),
- mapper, druidLeaderClient, config
+ mapper, coordinatorClient, config
);
- Map<String, Object> lookupMap = new HashMap<>();
+ Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
lookupMap.put("testMockForRealModeWithMainThread", container);
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
+ EasyMock.replay(coordinatorClient);
lookupReferencesManager.start();
Assert.assertTrue(lookupReferencesManager.mainThread.isAlive());
@@ -749,25 +588,14 @@ public class LookupReferencesManagerTest
), true
)
);
- Map<String, Object> lookupMap = new HashMap<>();
+ Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
lookupMap.put("testLookup1", container1);
lookupMap.put("testLookup2", container2);
lookupMap.put("testLookup3", container3);
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
+ EasyMock.replay(coordinatorClient);
lookupReferencesManager.start();
Assert.assertEquals(Optional.of(container1),
lookupReferencesManager.get("testLookup1"));
@@ -796,27 +624,16 @@ public class LookupReferencesManagerTest
)
);
EasyMock.reset(config);
- EasyMock.reset(druidLeaderClient);
+ EasyMock.reset(coordinatorClient);
Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
lookupMap.put("testLookup1", container1);
lookupMap.put("testLookup2", container2);
lookupMap.put("testLookup3", container3);
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
EasyMock.expect(config.getLookupLoadingSpec()).andReturn(lookupLoadingSpec);
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
+ EasyMock.replay(coordinatorClient);
lookupReferencesManager.start();
return lookupMap;
@@ -844,7 +661,9 @@ public class LookupReferencesManagerTest
public void testCoordinatorLoadSubsetOfLookups() throws Exception
{
Map<String, LookupExtractorFactoryContainer> lookupMap =
-
getLookupMapForSelectiveLoadingOfLookups(LookupLoadingSpec.loadOnly(ImmutableSet.of("testLookup1",
"testLookup2")));
+ getLookupMapForSelectiveLoadingOfLookups(
+ LookupLoadingSpec.loadOnly(ImmutableSet.of("testLookup1",
"testLookup2"))
+ );
Assert.assertEquals(Optional.of(lookupMap.get("testLookup1")),
lookupReferencesManager.get("testLookup1"));
Assert.assertEquals(Optional.of(lookupMap.get("testLookup2")),
lookupReferencesManager.get("testLookup2"));
Assert.assertFalse(lookupReferencesManager.get("testLookup3").isPresent());
@@ -903,21 +722,15 @@ public class LookupReferencesManagerTest
lookupReferencesManager = new LookupReferencesManager(
lookupConfig,
mapper,
- druidLeaderClient,
+ coordinatorClient,
config
);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request)
- .anyTimes();
- EasyMock.expect(druidLeaderClient.go(request)).andThrow(new
IllegalStateException()).anyTimes();
- EasyMock.replay(druidLeaderClient);
+
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andThrow(new
RuntimeException()).anyTimes();
+ EasyMock.replay(coordinatorClient);
lookupReferencesManager.start();
lookupReferencesManager.add("testMockForLoadLookupOnCoordinatorFailure",
container);
@@ -935,23 +748,26 @@ public class LookupReferencesManagerTest
lookupReferencesManager = new LookupReferencesManager(
lookupConfig,
mapper,
- druidLeaderClient,
+ coordinatorClient,
config,
true
);
EasyMock.reset(config);
- EasyMock.reset(druidLeaderClient);
+ EasyMock.reset(coordinatorClient);
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.expect(config.getLookupLoadingSpec()).andReturn(LookupLoadingSpec.ALL).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request)
- .anyTimes();
- EasyMock.expect(druidLeaderClient.go(request)).andThrow(new
IllegalStateException()).anyTimes();
- EasyMock.replay(druidLeaderClient);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(
+ Futures.immediateFailedFuture(
+ new HttpResponseException(
+ new StringFullResponseHolder(
+ new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.NOT_FOUND),
+ StandardCharsets.UTF_8
+ )
+ )
+ )
+ ).anyTimes();
+ EasyMock.replay(coordinatorClient);
lookupReferencesManager.start();
Assert.assertEquals(
Optional.of(container),
@@ -973,26 +789,15 @@ public class LookupReferencesManagerTest
LookupReferencesManager lookupReferencesManager = new
LookupReferencesManager(
lookupConfig,
mapper,
- druidLeaderClient,
+ coordinatorClient,
config
);
- Map<String, Object> lookupMap = new HashMap<>();
+ Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
lookupMap.put("testMockForDisableLookupSync", container);
- String strResult = mapper.writeValueAsString(lookupMap);
- Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
- EasyMock.expect(druidLeaderClient.makeRequest(
- HttpMethod.GET,
- "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
- ))
- .andReturn(request);
- StringFullResponseHolder responseHolder = new StringFullResponseHolder(
- newEmptyResponse(HttpResponseStatus.OK),
- StandardCharsets.UTF_8
- ).addChunk(strResult);
- EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
+
EasyMock.expect(coordinatorClient.fetchLookupsForTier(LOOKUP_TIER)).andReturn(Futures.immediateFuture(lookupMap));
lookupReferencesManager.start();
Assert.assertEquals(Optional.empty(),
lookupReferencesManager.get("testMockForDisableLookupSync"));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]