kfaraz commented on code in PR #17775:
URL: https://github.com/apache/druid/pull/17775#discussion_r2008644608
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -192,26 +168,23 @@ private HistoricalLoadingCapabilities
fetchSegmentLoadingCapabilities()
).get();
if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
- log.info(
+ log.warn(
"Historical capabilities endpoint not found at server[%s]. Using
default values.",
- new URL(serverId)
+ segmentLoadingCapabilitiesURL
);
- return new HistoricalLoadingCapabilities(1, 1);
- }
-
- if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
+ return new SegmentLoadingCapabilities(1, 1);
+ } else if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
+ log.makeAlert("Error when fetching capabilities from server[%s].
Received [%s]", new URL(serverId), responseHandler.getStatus());
Review Comment:
```suggestion
log.makeAlert("Received status[%s] when fetching loading
capabilities from server[%s]", responseHandler.getStatus(), serverId);
```
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -88,9 +93,17 @@ public SegmentLoadDropHandler(
config,
announcer,
segmentManager,
- Executors.newScheduledThreadPool(
+ new ScheduledThreadPoolExecutor(
config.getNumLoadingThreads(),
- Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
+ Execs.makeThreadFactory("SegmentLoadDropHandler-normal-%s")
+ ),
+ new ThreadPoolExecutor(
+ config.getNumBootstrapThreads(),
Review Comment:
Use core size of 1.
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -192,26 +168,23 @@ private HistoricalLoadingCapabilities
fetchSegmentLoadingCapabilities()
).get();
if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
- log.info(
+ log.warn(
"Historical capabilities endpoint not found at server[%s]. Using
default values.",
Review Comment:
```suggestion
"Historical capabilities endpoint not found at URL[%s]. Using
default values.",
```
##########
server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java:
##########
@@ -82,29 +78,26 @@ public class HttpLoadQueuePeonTest
public void setUp()
{
httpClient = new TestHttpClient();
- JacksonConfigManager configManager =
EasyMock.createNiceMock(JacksonConfigManager.class);
- EasyMock.expect(
- configManager.watch(
- EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
- EasyMock.anyObject(Class.class),
- EasyMock.anyObject()
- )
- ).andReturn(new
AtomicReference<>(CoordinatorDynamicConfig.builder().build())).anyTimes();
- EasyMock.replay(configManager);
httpLoadQueuePeon = new HttpLoadQueuePeon(
"http://dummy:4000",
MAPPER,
httpClient,
new HttpLoadQueuePeonConfig(null, null, 10),
+ () -> SegmentLoadingMode.NORMAL,
new WrappingScheduledExecutorService(
"HttpLoadQueuePeonTest-%s",
httpClient.processingExecutor,
true
),
- httpClient.callbackExecutor,
- () -> SegmentLoadingMode.NORMAL,
- new HistoricalLoadingCapabilities(1, 3)
- );
+ httpClient.callbackExecutor
+ )
+ {
+ @Override
+ SegmentLoadingCapabilities fetchSegmentLoadingCapabilities()
Review Comment:
Non-blocker:
Do we need to override this?
Can't we make the `HttpClient` handle this API to return what we want?
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -192,26 +168,23 @@ private HistoricalLoadingCapabilities
fetchSegmentLoadingCapabilities()
).get();
if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
- log.info(
+ log.warn(
"Historical capabilities endpoint not found at server[%s]. Using
default values.",
- new URL(serverId)
+ segmentLoadingCapabilitiesURL
);
- return new HistoricalLoadingCapabilities(1, 1);
- }
-
- if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
+ return new SegmentLoadingCapabilities(1, 1);
Review Comment:
I wonder what we should do in a case where a node is in turbo-mode but the
API call to fetch capabilities has failed.
In that case, I feel that we should use the `batchSize` (for both normal and
turbo) if it has been configured and is higher than 1.
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -192,26 +168,23 @@ private HistoricalLoadingCapabilities
fetchSegmentLoadingCapabilities()
).get();
if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
- log.info(
+ log.warn(
"Historical capabilities endpoint not found at server[%s]. Using
default values.",
- new URL(serverId)
+ segmentLoadingCapabilitiesURL
);
- return new HistoricalLoadingCapabilities(1, 1);
- }
-
- if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
+ return new SegmentLoadingCapabilities(1, 1);
+ } else if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
+ log.makeAlert("Error when fetching capabilities from server[%s].
Received [%s]", new URL(serverId), responseHandler.getStatus());
Review Comment:
Why do we need to use `URL(serverId)` instead of just `serverId`?
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -88,9 +93,17 @@ public SegmentLoadDropHandler(
config,
announcer,
segmentManager,
- Executors.newScheduledThreadPool(
+ new ScheduledThreadPoolExecutor(
Review Comment:
This can remain as before:
```suggestion
Executors.newScheduledThreadPool(
```
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -271,7 +296,16 @@ public ListenableFuture<List<DataSegmentChangeResponse>>
processBatch(List<DataS
return future;
}
- private AtomicReference<SegmentChangeStatus>
processRequest(DataSegmentChangeRequest changeRequest)
+ /**
+ * Process a {@link DataSegmentChangeRequest}, invoking the request's
+ * {@link DataSegmentChangeRequest#go(DataSegmentChangeHandler,
DataSegmentChangeCallback)}.
+ * The segmentLoadingMode parameter determines the thread pool to use.
+ * Returns an atomic reference to the segment status.
Review Comment:
This line can be skipped.
```suggestion
```
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -88,9 +93,17 @@ public SegmentLoadDropHandler(
config,
announcer,
segmentManager,
- Executors.newScheduledThreadPool(
+ new ScheduledThreadPoolExecutor(
config.getNumLoadingThreads(),
- Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
+ Execs.makeThreadFactory("SegmentLoadDropHandler-normal-%s")
+ ),
+ new ThreadPoolExecutor(
+ config.getNumBootstrapThreads(),
+ config.getNumBootstrapThreads(),
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ Execs.makeThreadFactory("SegmentLoadDropHandler-turbo-%s")
Review Comment:
This can be moved to `Execs` as a utility method to reuse in the code.
##########
server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java:
##########
@@ -65,6 +66,7 @@ public void testSerde() throws Exception
);
ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
+ ImmutableSet<String> turboLoading = ImmutableSet.of("host1", "host3");
Review Comment:
```suggestion
ImmutableSet<String> turboLoadingNodes = ImmutableSet.of("host1",
"host3");
```
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -100,13 +113,17 @@ public SegmentLoadDropHandler(
SegmentLoaderConfig config,
DataSegmentAnnouncer announcer,
SegmentManager segmentManager,
- ScheduledExecutorService exec
+ ScheduledExecutorService normalLoadExec,
+ ThreadPoolExecutor turboLoadExec
)
{
this.config = config;
this.announcer = announcer;
this.segmentManager = segmentManager;
- this.exec = exec;
+ this.normalLoadExec = normalLoadExec;
+ this.turboLoadExec = turboLoadExec;
+
+ this.turboLoadExec.allowCoreThreadTimeOut(true);
Review Comment:
We don't need this if we use core pool size of 1.
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,45 @@ public HttpLoadQueuePeon(
this.callBackExecutor = callBackExecutor;
this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = fetchSegmentLoadingCapabilities();
+ }
+
+ @VisibleForTesting
+ SegmentLoadingCapabilities fetchSegmentLoadingCapabilities()
+ {
try {
- this.changeRequestURL = new URL(
- new URL(baseUrl),
- StringUtils.nonStrictFormat(
- "druid-internal/v1/segments/changeRequests?timeout=%d",
- config.getHostTimeout().getMillis()
- )
+ final URL segmentLoadingCapabilitiesURL = new URL(
+ new URL(serverId),
+ "druid-internal/v1/segments/loadCapabilities"
+ );
+
+ BytesAccumulatingResponseHandler responseHandler = new
BytesAccumulatingResponseHandler();
+ InputStream stream = httpClient.go(
+ new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+ .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+ responseHandler,
+ new Duration(10000)
+ ).get();
Review Comment:
`.get()` should probably have a timeout.
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -192,26 +168,23 @@ private HistoricalLoadingCapabilities
fetchSegmentLoadingCapabilities()
).get();
if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
- log.info(
+ log.warn(
"Historical capabilities endpoint not found at server[%s]. Using
default values.",
Review Comment:
It would be nice to log the values being used as well.
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,45 @@ public HttpLoadQueuePeon(
this.callBackExecutor = callBackExecutor;
this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = fetchSegmentLoadingCapabilities();
+ }
+
+ @VisibleForTesting
+ SegmentLoadingCapabilities fetchSegmentLoadingCapabilities()
+ {
try {
- this.changeRequestURL = new URL(
- new URL(baseUrl),
- StringUtils.nonStrictFormat(
- "druid-internal/v1/segments/changeRequests?timeout=%d",
- config.getHostTimeout().getMillis()
- )
+ final URL segmentLoadingCapabilitiesURL = new URL(
+ new URL(serverId),
+ "druid-internal/v1/segments/loadCapabilities"
+ );
+
+ BytesAccumulatingResponseHandler responseHandler = new
BytesAccumulatingResponseHandler();
+ InputStream stream = httpClient.go(
+ new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+ .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+ responseHandler,
+ new Duration(10000)
Review Comment:
This should be a constant in this class.
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -223,7 +196,12 @@ private void doSegmentManagement()
}
final SegmentLoadingMode loadingMode = loadingModeSupplier.get();
- int batchSize = calculateBatchSize(loadingMode);
+ final int batchSize = calculateBatchSize(loadingMode);
+
+ if (batchSize < 1) {
+ log.error("Batch size must be greater than 0.");
+ throw new RE("Batch size must be greater than 0.");
+ }
Review Comment:
We should retain the validation in this class but let's not throw an
exception here.
The validation can move to `calculateBatchSize` method so that we always
return a positive integer from there.
Also, please ensure that there are required validations in
`SegmentLoadingConfig` and `HttpLoadQueuePeonConfig` around `numLoadingThreads`
, `numBootstrapThreads` and `batchSize` respectively.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]