jon-wei closed pull request #6444: fixes for LookupReferencesManagerTest
URL: https://github.com/apache/incubator-druid/pull/6444
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/docs/content/querying/lookups.md b/docs/content/querying/lookups.md
index 17dcef9c3eb..479376bef31 100644
--- a/docs/content/querying/lookups.md
+++ b/docs/content/querying/lookups.md
@@ -361,6 +361,7 @@ It is possible to save the configuration across restarts
such that a node will n
|`druid.lookup.numLookupLoadingThreads`|Number of threads for loading the
lookups in parallel on startup. This thread pool is destroyed once startup is
done. It is not kept during the lifetime of the JVM|Available Processors / 2|
|`druid.lookup.coordinatorFetchRetries`|How many times to retry to fetch the
lookup bean list from coordinator, during the sync on startup.|3|
|`druid.lookup.lookupStartRetries`|How many times to retry to start each
lookup, either during the sync on startup, or during the runtime.|3|
+|`druid.lookup.coordinatorRetryDelay`|How long to delay (in millis) between
retries to fetch lookup list from the coordinator during the sync on
startup.|60_000|
## Introspect a Lookup
diff --git
a/processing/src/main/java/org/apache/druid/query/lookup/LookupConfig.java
b/processing/src/main/java/org/apache/druid/query/lookup/LookupConfig.java
index fd2dc84b547..6746e08b4b3 100644
--- a/processing/src/main/java/org/apache/druid/query/lookup/LookupConfig.java
+++ b/processing/src/main/java/org/apache/druid/query/lookup/LookupConfig.java
@@ -28,6 +28,7 @@
public class LookupConfig
{
+ static int DEFAULT_COORDINATOR_RETRY_DELAY = 60_000;
@JsonProperty("snapshotWorkingDir")
private String snapshotWorkingDir;
@@ -43,6 +44,13 @@
@JsonProperty("coordinatorFetchRetries")
private int coordinatorFetchRetries = 3;
+ // By default, add an extra minute in addition to the retry wait. In
RetryUtils, retry wait starts from a few
+ // seconds, that is likely not enough to coordinator to be back to healthy
state, e. g. if it experiences
+ // 30-second GC pause.
+ @Min(0)
+ @JsonProperty("coordinatorRetryDelay")
+ private int coordinatorRetryDelay = DEFAULT_COORDINATOR_RETRY_DELAY;
+
@Min(1)
@JsonProperty("lookupStartRetries")
private int lookupStartRetries = 3;
@@ -84,6 +92,11 @@ public int getLookupStartRetries()
return lookupStartRetries;
}
+ public int getCoordinatorRetryDelay()
+ {
+ return coordinatorRetryDelay;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -100,7 +113,8 @@ public boolean equals(Object o)
enableLookupSyncOnStartup == that.enableLookupSyncOnStartup &&
numLookupLoadingThreads == that.numLookupLoadingThreads &&
coordinatorFetchRetries == that.coordinatorFetchRetries &&
- lookupStartRetries == that.lookupStartRetries;
+ lookupStartRetries == that.lookupStartRetries &&
+ coordinatorRetryDelay == that.coordinatorRetryDelay;
}
@Override
@@ -111,7 +125,8 @@ public int hashCode()
enableLookupSyncOnStartup,
numLookupLoadingThreads,
coordinatorFetchRetries,
- lookupStartRetries
+ lookupStartRetries,
+ coordinatorRetryDelay
);
}
@@ -124,6 +139,7 @@ public String toString()
", numLookupLoadingThreads=" + numLookupLoadingThreads +
", coordinatorFetchRetries=" + coordinatorFetchRetries +
", lookupStartRetries=" + lookupStartRetries +
+ ", coordinatorRetryDelay=" + coordinatorRetryDelay +
'}';
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/lookup/LookupConfigTest.java
b/processing/src/test/java/org/apache/druid/query/lookup/LookupConfigTest.java
index 6c268d92103..ca18913ab7b 100644
---
a/processing/src/test/java/org/apache/druid/query/lookup/LookupConfigTest.java
+++
b/processing/src/test/java/org/apache/druid/query/lookup/LookupConfigTest.java
@@ -53,7 +53,8 @@ public void testSerdeWithNonDefaults() throws Exception
+ " \"snapshotWorkingDir\": \"/tmp\",\n"
+ " \"numLookupLoadingThreads\": 4,\n"
+ " \"coordinatorFetchRetries\": 4,\n"
- + " \"lookupStartRetries\": 4 \n"
+ + " \"lookupStartRetries\": 4,\n"
+ + " \"coordinatorRetryDelay\": 100 \n"
+ "}\n";
LookupConfig config = mapper.readValue(
mapper.writeValueAsString(
@@ -67,5 +68,6 @@ public void testSerdeWithNonDefaults() throws Exception
Assert.assertEquals(4, config.getNumLookupLoadingThreads());
Assert.assertEquals(4, config.getCoordinatorFetchRetries());
Assert.assertEquals(4, config.getLookupStartRetries());
+ Assert.assertEquals(100, config.getCoordinatorRetryDelay());
}
}
diff --git
a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
index 8d97cbee253..6a6ca6cd5a4 100644
---
a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
+++
b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
@@ -389,11 +389,11 @@ private void loadAllLookupsAndInitStateRef()
() -> {
if (firstAttempt.isTrue()) {
firstAttempt.setValue(false);
- } else {
- // Adding an extra minute in addition to the retry wait. In
RetryUtils, retry wait starts from a few
- // seconds, that is likely not enough to coordinator to be back
to healthy state, e. g. if it experiences
- // 30-second GC pause.
- Thread.sleep(60_000);
+ } else if (lookupConfig.getCoordinatorRetryDelay() > 0) {
+ // Adding any configured extra time in addition to the retry
wait. In RetryUtils, retry wait starts from
+ // a few seconds, that is likely not enough to coordinator to be
back to healthy state, e. g. if it
+ // experiences 30-second GC pause. Default is 1 minute
+ Thread.sleep(lookupConfig.getCoordinatorRetryDelay());
}
return tryGetLookupListFromCoordinator(tier);
},
diff --git
a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
index f155895a80e..5c0f16815e7 100644
---
a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
+++
b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
@@ -50,20 +50,15 @@
public class LookupReferencesManagerTest
{
- LookupReferencesManager lookupReferencesManager;
-
- private DruidLeaderClient druidLeaderClient;
-
- private LookupListeningAnnouncerConfig config;
-
private static final String LOOKUP_TIER = "lookupTier";
-
- LookupExtractorFactory lookupExtractorFactory;
-
- LookupExtractorFactoryContainer container;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ LookupReferencesManager lookupReferencesManager;
+ LookupExtractorFactory lookupExtractorFactory;
+ LookupExtractorFactoryContainer container;
ObjectMapper mapper = new DefaultObjectMapper();
+ private DruidLeaderClient druidLeaderClient;
+ private LookupListeningAnnouncerConfig config;
@Before
public void setUp() throws IOException
@@ -85,7 +80,9 @@ public void setUp() throws IOException
String temporaryPath = temporaryFolder.newFolder().getAbsolutePath();
lookupReferencesManager = new LookupReferencesManager(
new LookupConfig(temporaryFolder.newFolder().getAbsolutePath()),
- mapper, druidLeaderClient, config,
+ mapper,
+ druidLeaderClient,
+ config,
true
);
}
@@ -104,7 +101,10 @@ public void testStartStop() throws InterruptedException,
IOException
Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
- expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+ expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
.andReturn(request);
FullResponseHolder responseHolder = new FullResponseHolder(
HttpResponseStatus.OK,
@@ -165,7 +165,10 @@ public void testAddGetRemove() throws Exception
Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
- expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+ expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
.andReturn(request);
FullResponseHolder responseHolder = new FullResponseHolder(
HttpResponseStatus.OK,
@@ -203,7 +206,10 @@ public void testCloseIsCalledAfterStopping() throws
Exception
Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
- expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+ expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
.andReturn(request);
FullResponseHolder responseHolder = new FullResponseHolder(
HttpResponseStatus.OK,
@@ -234,7 +240,10 @@ public void testDestroyIsCalledAfterRemove() throws
Exception
Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
- expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+ expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
.andReturn(request);
FullResponseHolder responseHolder = new FullResponseHolder(
HttpResponseStatus.OK,
@@ -262,7 +271,10 @@ public void testGetNotThere() throws Exception
Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
- expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+ expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
.andReturn(request);
FullResponseHolder responseHolder = new FullResponseHolder(
HttpResponseStatus.OK,
@@ -292,7 +304,10 @@ public void testUpdateWithHigherVersion() throws Exception
Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
- expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+ expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
.andReturn(request);
FullResponseHolder responseHolder = new FullResponseHolder(
HttpResponseStatus.OK,
@@ -326,7 +341,10 @@ public void testUpdateWithLowerVersion() throws Exception
Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
- expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+ expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
.andReturn(request);
FullResponseHolder responseHolder = new FullResponseHolder(
HttpResponseStatus.OK,
@@ -354,7 +372,10 @@ public void testRemoveNonExisting() throws Exception
Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
- expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+ expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
.andReturn(request);
FullResponseHolder responseHolder = new FullResponseHolder(
HttpResponseStatus.OK,
@@ -405,7 +426,10 @@ public void testGetAllLookupsState() throws Exception
Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
- expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+ expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
.andReturn(request);
FullResponseHolder responseHolder = new FullResponseHolder(
HttpResponseStatus.OK,
@@ -447,7 +471,10 @@ public void testRealModeWithMainThread() throws Exception
Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
- expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+ expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
.andReturn(request);
FullResponseHolder responseHolder = new FullResponseHolder(
HttpResponseStatus.OK,
@@ -523,7 +550,10 @@ public void testCoordinatorLookupSync() throws Exception
Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
- expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+ expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
.andReturn(request);
FullResponseHolder responseHolder = new FullResponseHolder(
HttpResponseStatus.OK,
@@ -543,13 +573,31 @@ public void testCoordinatorLookupSync() throws Exception
@Test
public void testLoadLookupOnCoordinatorFailure() throws Exception
{
+ LookupConfig lookupConfig = new
LookupConfig(temporaryFolder.newFolder().getAbsolutePath())
+ {
+ @Override
+ public int getCoordinatorRetryDelay()
+ {
+ return 10;
+ }
+ };
+ lookupReferencesManager = new LookupReferencesManager(
+ lookupConfig,
+ mapper,
+ druidLeaderClient,
+ config
+ );
+
Map<String, Object> lookupMap = new HashMap<>();
lookupMap.put("testMockForLoadLookupOnCoordinatorFailure", container);
String strResult = mapper.writeValueAsString(lookupMap);
Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
- expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+ expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
.andReturn(request)
.anyTimes();
FullResponseHolder responseHolder = new FullResponseHolder(
@@ -564,16 +612,30 @@ public void testLoadLookupOnCoordinatorFailure() throws
Exception
lookupReferencesManager.add("testMockForLoadLookupOnCoordinatorFailure",
container);
lookupReferencesManager.handlePendingNotices();
lookupReferencesManager.stop();
+ lookupConfig = new
LookupConfig(lookupReferencesManager.lookupSnapshotTaker.getPersistFile(LOOKUP_TIER).getParent())
+ {
+ @Override
+ public int getCoordinatorRetryDelay()
+ {
+ return 10;
+ }
+ };
+
lookupReferencesManager = new LookupReferencesManager(
- new
LookupConfig(lookupReferencesManager.lookupSnapshotTaker.getPersistFile(LOOKUP_TIER).getParent()),
- mapper, druidLeaderClient, config,
+ lookupConfig,
+ mapper,
+ druidLeaderClient,
+ config,
true
);
reset(config);
reset(druidLeaderClient);
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
- expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+ expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
.andReturn(request)
.anyTimes();
expect(druidLeaderClient.go(request)).andThrow(new
IllegalStateException()).anyTimes();
@@ -585,9 +647,19 @@ public void testLoadLookupOnCoordinatorFailure() throws
Exception
@Test
public void testDisableLookupSync() throws Exception
{
+ LookupConfig lookupConfig = new LookupConfig(null)
+ {
+ @Override
+ public boolean getEnableLookupSyncOnStartup()
+ {
+ return false;
+ }
+ };
LookupReferencesManager lookupReferencesManager = new
LookupReferencesManager(
- new LookupConfig(null),
- mapper, druidLeaderClient, config
+ lookupConfig,
+ mapper,
+ druidLeaderClient,
+ config
);
Map<String, Object> lookupMap = new HashMap<>();
lookupMap.put("testMockForDisableLookupSync", container);
@@ -595,7 +667,10 @@ public void testDisableLookupSync() throws Exception
Request request = new Request(HttpMethod.GET, new
URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
- expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
+ expect(druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+ ))
.andReturn(request);
FullResponseHolder responseHolder = new FullResponseHolder(
HttpResponseStatus.OK,
@@ -607,5 +682,4 @@ public void testDisableLookupSync() throws Exception
lookupReferencesManager.start();
Assert.assertNull(lookupReferencesManager.get("testMockForDisableLookupSync"));
}
-
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]