jon-wei closed pull request #6444: fixes for LookupReferencesManagerTest
URL: https://github.com/apache/incubator-druid/pull/6444
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/content/querying/lookups.md b/docs/content/querying/lookups.md
index 17dcef9c3eb..479376bef31 100644
--- a/docs/content/querying/lookups.md
+++ b/docs/content/querying/lookups.md
@@ -361,6 +361,7 @@ It is possible to save the configuration across restarts 
such that a node will n
 |`druid.lookup.numLookupLoadingThreads`|Number of threads for loading the 
lookups in parallel on startup. This thread pool is destroyed once startup is 
done. It is not kept during the lifetime of the JVM|Available Processors / 2|
 |`druid.lookup.coordinatorFetchRetries`|How many times to retry to fetch the 
lookup bean list from coordinator, during the sync on startup.|3|
 |`druid.lookup.lookupStartRetries`|How many times to retry to start each 
lookup, either during the sync on startup, or during the runtime.|3|
+|`druid.lookup.coordinatorRetryDelay`|How long to delay (in millis) between 
retries to fetch lookup list from the coordinator during the sync on 
startup.|60_000|
 
 ## Introspect a Lookup
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/lookup/LookupConfig.java 
b/processing/src/main/java/org/apache/druid/query/lookup/LookupConfig.java
index fd2dc84b547..6746e08b4b3 100644
--- a/processing/src/main/java/org/apache/druid/query/lookup/LookupConfig.java
+++ b/processing/src/main/java/org/apache/druid/query/lookup/LookupConfig.java
@@ -28,6 +28,7 @@
 
 public class LookupConfig
 {
+  static int DEFAULT_COORDINATOR_RETRY_DELAY = 60_000;
 
   @JsonProperty("snapshotWorkingDir")
   private String snapshotWorkingDir;
@@ -43,6 +44,13 @@
   @JsonProperty("coordinatorFetchRetries")
   private int coordinatorFetchRetries = 3;
 
+  // By default, add an extra minute in addition to the retry wait. In 
RetryUtils, retry wait starts from a few
+  // seconds, that is likely not enough to coordinator to be back to healthy 
state, e. g. if it experiences
+  // 30-second GC pause.
+  @Min(0)
+  @JsonProperty("coordinatorRetryDelay")
+  private int coordinatorRetryDelay = DEFAULT_COORDINATOR_RETRY_DELAY;
+
   @Min(1)
   @JsonProperty("lookupStartRetries")
   private int lookupStartRetries = 3;
@@ -84,6 +92,11 @@ public int getLookupStartRetries()
     return lookupStartRetries;
   }
 
+  public int getCoordinatorRetryDelay()
+  {
+    return coordinatorRetryDelay;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -100,7 +113,8 @@ public boolean equals(Object o)
            enableLookupSyncOnStartup == that.enableLookupSyncOnStartup &&
            numLookupLoadingThreads == that.numLookupLoadingThreads &&
            coordinatorFetchRetries == that.coordinatorFetchRetries &&
-           lookupStartRetries == that.lookupStartRetries;
+           lookupStartRetries == that.lookupStartRetries &&
+           coordinatorRetryDelay == that.coordinatorRetryDelay;
   }
 
   @Override
@@ -111,7 +125,8 @@ public int hashCode()
         enableLookupSyncOnStartup,
         numLookupLoadingThreads,
         coordinatorFetchRetries,
-        lookupStartRetries
+        lookupStartRetries,
+        coordinatorRetryDelay
     );
   }
 
@@ -124,6 +139,7 @@ public String toString()
            ", numLookupLoadingThreads=" + numLookupLoadingThreads +
            ", coordinatorFetchRetries=" + coordinatorFetchRetries +
            ", lookupStartRetries=" + lookupStartRetries +
+           ", coordinatorRetryDelay=" + coordinatorRetryDelay +
            '}';
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/lookup/LookupConfigTest.java 
b/processing/src/test/java/org/apache/druid/query/lookup/LookupConfigTest.java
index 6c268d92103..ca18913ab7b 100644
--- 
a/processing/src/test/java/org/apache/druid/query/lookup/LookupConfigTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/lookup/LookupConfigTest.java
@@ -53,7 +53,8 @@ public void testSerdeWithNonDefaults() throws Exception
                   + "  \"snapshotWorkingDir\": \"/tmp\",\n"
                   + "  \"numLookupLoadingThreads\": 4,\n"
                   + "  \"coordinatorFetchRetries\": 4,\n"
-                  + "  \"lookupStartRetries\": 4 \n"
+                  + "  \"lookupStartRetries\": 4,\n"
+                  + "  \"coordinatorRetryDelay\": 100 \n"
                   + "}\n";
     LookupConfig config = mapper.readValue(
         mapper.writeValueAsString(
@@ -67,5 +68,6 @@ public void testSerdeWithNonDefaults() throws Exception
     Assert.assertEquals(4, config.getNumLookupLoadingThreads());
     Assert.assertEquals(4, config.getCoordinatorFetchRetries());
     Assert.assertEquals(4, config.getLookupStartRetries());
+    Assert.assertEquals(100, config.getCoordinatorRetryDelay());
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
 
b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
index 8d97cbee253..6a6ca6cd5a4 100644
--- 
a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
+++ 
b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
@@ -389,11 +389,11 @@ private void loadAllLookupsAndInitStateRef()
           () -> {
             if (firstAttempt.isTrue()) {
               firstAttempt.setValue(false);
-            } else {
-              // Adding an extra minute in addition to the retry wait. In 
RetryUtils, retry wait starts from a few
-              // seconds, that is likely not enough to coordinator to be back 
to healthy state, e. g. if it experiences
-              // 30-second GC pause.
-              Thread.sleep(60_000);
+            } else if (lookupConfig.getCoordinatorRetryDelay() > 0) {
+              // Adding any configured extra time in addition to the retry 
wait. In RetryUtils, retry wait starts from
+              // a few seconds, that is likely not enough to coordinator to be 
back to healthy state, e. g. if it
+              // experiences 30-second GC pause. Default is 1 minute
+              Thread.sleep(lookupConfig.getCoordinatorRetryDelay());
             }
             return tryGetLookupListFromCoordinator(tier);
           },
diff --git 
a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
 
b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
index f155895a80e..5c0f16815e7 100644
--- 
a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
@@ -50,20 +50,15 @@
 
 public class LookupReferencesManagerTest
 {
-  LookupReferencesManager lookupReferencesManager;
-
-  private DruidLeaderClient druidLeaderClient;
-
-  private LookupListeningAnnouncerConfig config;
-
   private static final String LOOKUP_TIER = "lookupTier";
-
-  LookupExtractorFactory lookupExtractorFactory;
-
-  LookupExtractorFactoryContainer container;
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+  LookupReferencesManager lookupReferencesManager;
+  LookupExtractorFactory lookupExtractorFactory;
+  LookupExtractorFactoryContainer container;
   ObjectMapper mapper = new DefaultObjectMapper();
+  private DruidLeaderClient druidLeaderClient;
+  private LookupListeningAnnouncerConfig config;
 
   @Before
   public void setUp() throws IOException
@@ -85,7 +80,9 @@ public void setUp() throws IOException
     String temporaryPath = temporaryFolder.newFolder().getAbsolutePath();
     lookupReferencesManager = new LookupReferencesManager(
         new LookupConfig(temporaryFolder.newFolder().getAbsolutePath()),
-        mapper, druidLeaderClient, config,
+        mapper,
+        druidLeaderClient,
+        config,
         true
     );
   }
@@ -104,7 +101,10 @@ public void testStartStop() throws InterruptedException, 
IOException
     Request request = new Request(HttpMethod.GET, new 
URL("http://localhost:1234/xx";));
     expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
     replay(config);
-    expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+    expect(druidLeaderClient.makeRequest(
+        HttpMethod.GET,
+        "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+    ))
         .andReturn(request);
     FullResponseHolder responseHolder = new FullResponseHolder(
         HttpResponseStatus.OK,
@@ -165,7 +165,10 @@ public void testAddGetRemove() throws Exception
     Request request = new Request(HttpMethod.GET, new 
URL("http://localhost:1234/xx";));
     expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
     replay(config);
-    expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+    expect(druidLeaderClient.makeRequest(
+        HttpMethod.GET,
+        "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+    ))
         .andReturn(request);
     FullResponseHolder responseHolder = new FullResponseHolder(
         HttpResponseStatus.OK,
@@ -203,7 +206,10 @@ public void testCloseIsCalledAfterStopping() throws 
Exception
     Request request = new Request(HttpMethod.GET, new 
URL("http://localhost:1234/xx";));
     expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
     replay(config);
-    expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+    expect(druidLeaderClient.makeRequest(
+        HttpMethod.GET,
+        "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+    ))
         .andReturn(request);
     FullResponseHolder responseHolder = new FullResponseHolder(
         HttpResponseStatus.OK,
@@ -234,7 +240,10 @@ public void testDestroyIsCalledAfterRemove() throws 
Exception
     Request request = new Request(HttpMethod.GET, new 
URL("http://localhost:1234/xx";));
     expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
     replay(config);
-    expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+    expect(druidLeaderClient.makeRequest(
+        HttpMethod.GET,
+        "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+    ))
         .andReturn(request);
     FullResponseHolder responseHolder = new FullResponseHolder(
         HttpResponseStatus.OK,
@@ -262,7 +271,10 @@ public void testGetNotThere() throws Exception
     Request request = new Request(HttpMethod.GET, new 
URL("http://localhost:1234/xx";));
     expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
     replay(config);
-    expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+    expect(druidLeaderClient.makeRequest(
+        HttpMethod.GET,
+        "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+    ))
         .andReturn(request);
     FullResponseHolder responseHolder = new FullResponseHolder(
         HttpResponseStatus.OK,
@@ -292,7 +304,10 @@ public void testUpdateWithHigherVersion() throws Exception
     Request request = new Request(HttpMethod.GET, new 
URL("http://localhost:1234/xx";));
     expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
     replay(config);
-    expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+    expect(druidLeaderClient.makeRequest(
+        HttpMethod.GET,
+        "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+    ))
         .andReturn(request);
     FullResponseHolder responseHolder = new FullResponseHolder(
         HttpResponseStatus.OK,
@@ -326,7 +341,10 @@ public void testUpdateWithLowerVersion() throws Exception
     Request request = new Request(HttpMethod.GET, new 
URL("http://localhost:1234/xx";));
     expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
     replay(config);
-    expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+    expect(druidLeaderClient.makeRequest(
+        HttpMethod.GET,
+        "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+    ))
         .andReturn(request);
     FullResponseHolder responseHolder = new FullResponseHolder(
         HttpResponseStatus.OK,
@@ -354,7 +372,10 @@ public void testRemoveNonExisting() throws Exception
     Request request = new Request(HttpMethod.GET, new 
URL("http://localhost:1234/xx";));
     expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
     replay(config);
-    expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+    expect(druidLeaderClient.makeRequest(
+        HttpMethod.GET,
+        "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+    ))
         .andReturn(request);
     FullResponseHolder responseHolder = new FullResponseHolder(
         HttpResponseStatus.OK,
@@ -405,7 +426,10 @@ public void testGetAllLookupsState() throws Exception
     Request request = new Request(HttpMethod.GET, new 
URL("http://localhost:1234/xx";));
     expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
     replay(config);
-    expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+    expect(druidLeaderClient.makeRequest(
+        HttpMethod.GET,
+        "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+    ))
         .andReturn(request);
     FullResponseHolder responseHolder = new FullResponseHolder(
         HttpResponseStatus.OK,
@@ -447,7 +471,10 @@ public void testRealModeWithMainThread() throws Exception
     Request request = new Request(HttpMethod.GET, new 
URL("http://localhost:1234/xx";));
     expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
     replay(config);
-    expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+    expect(druidLeaderClient.makeRequest(
+        HttpMethod.GET,
+        "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+    ))
         .andReturn(request);
     FullResponseHolder responseHolder = new FullResponseHolder(
         HttpResponseStatus.OK,
@@ -523,7 +550,10 @@ public void testCoordinatorLookupSync() throws Exception
     Request request = new Request(HttpMethod.GET, new 
URL("http://localhost:1234/xx";));
     expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
     replay(config);
-    expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+    expect(druidLeaderClient.makeRequest(
+        HttpMethod.GET,
+        "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+    ))
         .andReturn(request);
     FullResponseHolder responseHolder = new FullResponseHolder(
         HttpResponseStatus.OK,
@@ -543,13 +573,31 @@ public void testCoordinatorLookupSync() throws Exception
   @Test
   public void testLoadLookupOnCoordinatorFailure() throws Exception
   {
+    LookupConfig lookupConfig = new 
LookupConfig(temporaryFolder.newFolder().getAbsolutePath())
+    {
+      @Override
+      public int getCoordinatorRetryDelay()
+      {
+        return 10;
+      }
+    };
+    lookupReferencesManager = new LookupReferencesManager(
+        lookupConfig,
+        mapper,
+        druidLeaderClient,
+        config
+    );
+
     Map<String, Object> lookupMap = new HashMap<>();
     lookupMap.put("testMockForLoadLookupOnCoordinatorFailure", container);
     String strResult = mapper.writeValueAsString(lookupMap);
     Request request = new Request(HttpMethod.GET, new 
URL("http://localhost:1234/xx";));
     expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
     replay(config);
-    expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+    expect(druidLeaderClient.makeRequest(
+        HttpMethod.GET,
+        "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+    ))
         .andReturn(request)
         .anyTimes();
     FullResponseHolder responseHolder = new FullResponseHolder(
@@ -564,16 +612,30 @@ public void testLoadLookupOnCoordinatorFailure() throws 
Exception
     lookupReferencesManager.add("testMockForLoadLookupOnCoordinatorFailure", 
container);
     lookupReferencesManager.handlePendingNotices();
     lookupReferencesManager.stop();
+    lookupConfig = new 
LookupConfig(lookupReferencesManager.lookupSnapshotTaker.getPersistFile(LOOKUP_TIER).getParent())
+    {
+      @Override
+      public int getCoordinatorRetryDelay()
+      {
+        return 10;
+      }
+    };
+
     lookupReferencesManager = new LookupReferencesManager(
-        new 
LookupConfig(lookupReferencesManager.lookupSnapshotTaker.getPersistFile(LOOKUP_TIER).getParent()),
-        mapper, druidLeaderClient, config,
+        lookupConfig,
+        mapper,
+        druidLeaderClient,
+        config,
         true
     );
     reset(config);
     reset(druidLeaderClient);
     expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
     replay(config);
-    expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
+    expect(druidLeaderClient.makeRequest(
+        HttpMethod.GET,
+        "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+    ))
         .andReturn(request)
         .anyTimes();
     expect(druidLeaderClient.go(request)).andThrow(new 
IllegalStateException()).anyTimes();
@@ -585,9 +647,19 @@ public void testLoadLookupOnCoordinatorFailure() throws 
Exception
   @Test
   public void testDisableLookupSync() throws Exception
   {
+    LookupConfig lookupConfig = new LookupConfig(null)
+    {
+      @Override
+      public boolean getEnableLookupSyncOnStartup()
+      {
+        return false;
+      }
+    };
     LookupReferencesManager lookupReferencesManager = new 
LookupReferencesManager(
-        new LookupConfig(null),
-        mapper, druidLeaderClient, config
+        lookupConfig,
+        mapper,
+        druidLeaderClient,
+        config
     );
     Map<String, Object> lookupMap = new HashMap<>();
     lookupMap.put("testMockForDisableLookupSync", container);
@@ -595,7 +667,10 @@ public void testDisableLookupSync() throws Exception
     Request request = new Request(HttpMethod.GET, new 
URL("http://localhost:1234/xx";));
     expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
     replay(config);
-    expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
+    expect(druidLeaderClient.makeRequest(
+        HttpMethod.GET,
+        "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
+    ))
         .andReturn(request);
     FullResponseHolder responseHolder = new FullResponseHolder(
         HttpResponseStatus.OK,
@@ -607,5 +682,4 @@ public void testDisableLookupSync() throws Exception
     lookupReferencesManager.start();
     
Assert.assertNull(lookupReferencesManager.get("testMockForDisableLookupSync"));
   }
-
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to