kfaraz commented on code in PR #17775:
URL: https://github.com/apache/druid/pull/17775#discussion_r2008644608


##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -192,26 +168,23 @@ private HistoricalLoadingCapabilities 
fetchSegmentLoadingCapabilities()
       ).get();
 
       if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
-        log.info(
+        log.warn(
             "Historical capabilities endpoint not found at server[%s]. Using 
default values.",
-            new URL(serverId)
+            segmentLoadingCapabilitiesURL
         );
-        return new HistoricalLoadingCapabilities(1, 1);
-      }
-
-      if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
+        return new SegmentLoadingCapabilities(1, 1);
+      } else if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
+        log.makeAlert("Error when fetching capabilities from server[%s]. 
Received [%s]", new URL(serverId), responseHandler.getStatus());

Review Comment:
   ```suggestion
           log.makeAlert("Received status[%s] when fetching loading 
capabilities from server[%s]", responseHandler.getStatus(), serverId);
   ```



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -88,9 +93,17 @@ public SegmentLoadDropHandler(
         config,
         announcer,
         segmentManager,
-        Executors.newScheduledThreadPool(
+        new ScheduledThreadPoolExecutor(
             config.getNumLoadingThreads(),
-            Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
+            Execs.makeThreadFactory("SegmentLoadDropHandler-normal-%s")
+        ),
+        new ThreadPoolExecutor(
+            config.getNumBootstrapThreads(),

Review Comment:
   Use core size of 1.



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -192,26 +168,23 @@ private HistoricalLoadingCapabilities 
fetchSegmentLoadingCapabilities()
       ).get();
 
       if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
-        log.info(
+        log.warn(
             "Historical capabilities endpoint not found at server[%s]. Using 
default values.",

Review Comment:
   ```suggestion
               "Historical capabilities endpoint not found at URL[%s]. Using 
default values.",
   ```



##########
server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java:
##########
@@ -82,29 +78,26 @@ public class HttpLoadQueuePeonTest
   public void setUp()
   {
     httpClient = new TestHttpClient();
-    JacksonConfigManager configManager = 
EasyMock.createNiceMock(JacksonConfigManager.class);
-    EasyMock.expect(
-        configManager.watch(
-            EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
-            EasyMock.anyObject(Class.class),
-            EasyMock.anyObject()
-        )
-    ).andReturn(new 
AtomicReference<>(CoordinatorDynamicConfig.builder().build())).anyTimes();
-    EasyMock.replay(configManager);
     httpLoadQueuePeon = new HttpLoadQueuePeon(
         "http://dummy:4000";,
         MAPPER,
         httpClient,
         new HttpLoadQueuePeonConfig(null, null, 10),
+        () -> SegmentLoadingMode.NORMAL,
         new WrappingScheduledExecutorService(
             "HttpLoadQueuePeonTest-%s",
             httpClient.processingExecutor,
             true
         ),
-        httpClient.callbackExecutor,
-        () -> SegmentLoadingMode.NORMAL,
-        new HistoricalLoadingCapabilities(1, 3)
-    );
+        httpClient.callbackExecutor
+    )
+    {
+      @Override
+      SegmentLoadingCapabilities fetchSegmentLoadingCapabilities()

Review Comment:
   Non-blocker:
   Do we need to override this?
   Can't we make the `HttpClient` handle this API to return what we want?



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -192,26 +168,23 @@ private HistoricalLoadingCapabilities 
fetchSegmentLoadingCapabilities()
       ).get();
 
       if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
-        log.info(
+        log.warn(
             "Historical capabilities endpoint not found at server[%s]. Using 
default values.",
-            new URL(serverId)
+            segmentLoadingCapabilitiesURL
         );
-        return new HistoricalLoadingCapabilities(1, 1);
-      }
-
-      if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
+        return new SegmentLoadingCapabilities(1, 1);

Review Comment:
   I wonder what we should do in a case where a node is in turbo-mode but the 
API call to fetch capabilities has failed.
   
   In that case, I feel that we should use the `batchSize` (for both normal and 
turbo) if it has been configured and is higher than 1.



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -192,26 +168,23 @@ private HistoricalLoadingCapabilities 
fetchSegmentLoadingCapabilities()
       ).get();
 
       if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
-        log.info(
+        log.warn(
             "Historical capabilities endpoint not found at server[%s]. Using 
default values.",
-            new URL(serverId)
+            segmentLoadingCapabilitiesURL
         );
-        return new HistoricalLoadingCapabilities(1, 1);
-      }
-
-      if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
+        return new SegmentLoadingCapabilities(1, 1);
+      } else if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
+        log.makeAlert("Error when fetching capabilities from server[%s]. 
Received [%s]", new URL(serverId), responseHandler.getStatus());

Review Comment:
   Why do we need to use `URL(serverId)` instead of just `serverId`?



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -88,9 +93,17 @@ public SegmentLoadDropHandler(
         config,
         announcer,
         segmentManager,
-        Executors.newScheduledThreadPool(
+        new ScheduledThreadPoolExecutor(

Review Comment:
   This can remain as before:
   ```suggestion
           Executors.newScheduledThreadPool(
   ```



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -271,7 +296,16 @@ public ListenableFuture<List<DataSegmentChangeResponse>> 
processBatch(List<DataS
     return future;
   }
 
-  private AtomicReference<SegmentChangeStatus> 
processRequest(DataSegmentChangeRequest changeRequest)
+  /**
+   * Process a {@link DataSegmentChangeRequest}, invoking the request's
+   * {@link DataSegmentChangeRequest#go(DataSegmentChangeHandler, 
DataSegmentChangeCallback)}.
+   * The segmentLoadingMode parameter determines the thread pool to use.
+   * Returns an atomic reference to the segment status.

Review Comment:
   This line can be skipped.
   ```suggestion
   ```



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -88,9 +93,17 @@ public SegmentLoadDropHandler(
         config,
         announcer,
         segmentManager,
-        Executors.newScheduledThreadPool(
+        new ScheduledThreadPoolExecutor(
             config.getNumLoadingThreads(),
-            Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
+            Execs.makeThreadFactory("SegmentLoadDropHandler-normal-%s")
+        ),
+        new ThreadPoolExecutor(
+            config.getNumBootstrapThreads(),
+            config.getNumBootstrapThreads(),
+            60L,
+            TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            Execs.makeThreadFactory("SegmentLoadDropHandler-turbo-%s")

Review Comment:
   This can be moved to `Execs` as a utility method to reuse in the code.



##########
server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java:
##########
@@ -65,6 +66,7 @@ public void testSerde() throws Exception
     );
     ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
     ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
+    ImmutableSet<String> turboLoading = ImmutableSet.of("host1", "host3");

Review Comment:
   ```suggestion
       ImmutableSet<String> turboLoadingNodes = ImmutableSet.of("host1", 
"host3");
   ```



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -100,13 +113,17 @@ public SegmentLoadDropHandler(
       SegmentLoaderConfig config,
       DataSegmentAnnouncer announcer,
       SegmentManager segmentManager,
-      ScheduledExecutorService exec
+      ScheduledExecutorService normalLoadExec,
+      ThreadPoolExecutor turboLoadExec
   )
   {
     this.config = config;
     this.announcer = announcer;
     this.segmentManager = segmentManager;
-    this.exec = exec;
+    this.normalLoadExec = normalLoadExec;
+    this.turboLoadExec = turboLoadExec;
+
+    this.turboLoadExec.allowCoreThreadTimeOut(true);

Review Comment:
   We don't need this if we use core pool size of 1.



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,45 @@ public HttpLoadQueuePeon(
     this.callBackExecutor = callBackExecutor;
 
     this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = fetchSegmentLoadingCapabilities();
+  }
+
+  @VisibleForTesting
+  SegmentLoadingCapabilities fetchSegmentLoadingCapabilities()
+  {
     try {
-      this.changeRequestURL = new URL(
-          new URL(baseUrl),
-          StringUtils.nonStrictFormat(
-              "druid-internal/v1/segments/changeRequests?timeout=%d",
-              config.getHostTimeout().getMillis()
-          )
+      final URL segmentLoadingCapabilitiesURL = new URL(
+          new URL(serverId),
+          "druid-internal/v1/segments/loadCapabilities"
+      );
+
+      BytesAccumulatingResponseHandler responseHandler = new 
BytesAccumulatingResponseHandler();
+      InputStream stream = httpClient.go(
+          new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+              .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+          responseHandler,
+          new Duration(10000)
+      ).get();

Review Comment:
   `.get()` should probably have a timeout.



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -192,26 +168,23 @@ private HistoricalLoadingCapabilities 
fetchSegmentLoadingCapabilities()
       ).get();
 
       if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
-        log.info(
+        log.warn(
             "Historical capabilities endpoint not found at server[%s]. Using 
default values.",

Review Comment:
   It would be nice to log the values being used as well.



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,45 @@ public HttpLoadQueuePeon(
     this.callBackExecutor = callBackExecutor;
 
     this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = fetchSegmentLoadingCapabilities();
+  }
+
+  @VisibleForTesting
+  SegmentLoadingCapabilities fetchSegmentLoadingCapabilities()
+  {
     try {
-      this.changeRequestURL = new URL(
-          new URL(baseUrl),
-          StringUtils.nonStrictFormat(
-              "druid-internal/v1/segments/changeRequests?timeout=%d",
-              config.getHostTimeout().getMillis()
-          )
+      final URL segmentLoadingCapabilitiesURL = new URL(
+          new URL(serverId),
+          "druid-internal/v1/segments/loadCapabilities"
+      );
+
+      BytesAccumulatingResponseHandler responseHandler = new 
BytesAccumulatingResponseHandler();
+      InputStream stream = httpClient.go(
+          new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+              .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+          responseHandler,
+          new Duration(10000)

Review Comment:
   This should be a constant in this class.



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -223,7 +196,12 @@ private void doSegmentManagement()
     }
 
     final SegmentLoadingMode loadingMode = loadingModeSupplier.get();
-    int batchSize = calculateBatchSize(loadingMode);
+    final int batchSize = calculateBatchSize(loadingMode);
+
+    if (batchSize < 1) {
+      log.error("Batch size must be greater than 0.");
+      throw new RE("Batch size must be greater than 0.");
+    }

Review Comment:
   We should retain the validation in this class but let's not throw an 
exception here.
   The validation can move to `calculateBatchSize` method so that we always 
return a positive integer from there.
   
   Also, please ensure that there are required validations in 
`SegmentLoadingConfig` and `HttpLoadQueuePeonConfig` around `numLoadingThreads` 
, `numBootstrapThreads` and `batchSize` respectively.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to