This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 08af98c73bf Add the capability to turboload segments onto historicals
(#17775)
08af98c73bf is described below
commit 08af98c73bf7f35e4c7e2b136a63158d11b02348
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Mon Mar 24 20:46:44 2025 +0530
Add the capability to turboload segments onto historicals (#17775)
Add the capability to set Historicals into a turbo loading mode,
to focus on loading segments at the cost of query performance.
Context
--------
Currently, when a new Historical is started, it initially starts out using
a bootstrap thread pool.
It uses this thread pool to load any existing cached segments and broadcast
segments.
Once it loads any segments from both these sources, the historical switches
to a smaller thread-pool
and begins to serve queries.
In certain cases, it would be useful to have the historical switch back to
this mode,
and focus on loading segments, either to continue loading the initial
non-bootstrap segments,
or to catch up with assigned segments.
This PR adds a coordinator dynamic config that allows servers to be
configured to use
the larger bootstrap threadpool to load segments faster.
Changes
---------
- Added a new dynamic coordinator configuration, `turboLoadingNodes`.
- Ignore `druid.coordinator.loadqueuepeon.http.batchSize` for servers in
`turboLoadingNodes`
- Add API on historical to return loading capabilities i.e. num loading
threads in normal and turbo mode
---
docs/api-reference/dynamic-configuration-api.md | 11 ++-
docs/configuration/index.md | 3 +-
.../coordination/DataSegmentChangeHandler.java | 1 +
.../coordination/SegmentLoadDropHandler.java | 77 +++++++++++++++---
.../coordinator/CoordinatorDynamicConfig.java | 50 ++++++++++--
.../config/HttpLoadQueuePeonConfig.java | 18 ++++-
.../coordinator/loading/HttpLoadQueuePeon.java | 93 +++++++++++++++++++---
.../coordinator/loading/LoadQueueTaskMaster.java | 17 +++-
.../druid/server/http/SegmentListerResource.java | 31 ++++++--
.../server/http/SegmentLoadingCapabilities.java | 64 +++++++++++++++
.../SegmentLoadingMode.java} | 13 ++-
.../coordination/SegmentLoadDropHandlerTest.java | 23 +++---
.../coordinator/DruidCoordinatorConfigTest.java | 4 +-
.../config/HttpLoadQueuePeonConfigTest.java | 62 +++++++++++++++
.../coordinator/loading/HttpLoadQueuePeonTest.java | 43 +++++++++-
.../simulate/CoordinatorSimulationBuilder.java | 11 ++-
.../simulate/TestSegmentLoadingHttpClient.java | 27 +++++++
.../server/http/CoordinatorDynamicConfigTest.java | 78 +++++++++++++-----
.../http/SegmentLoadingCapabilitiesTest.java | 52 ++++++++++++
.../java/org/apache/druid/cli/CliCoordinator.java | 6 +-
20 files changed, 589 insertions(+), 95 deletions(-)
diff --git a/docs/api-reference/dynamic-configuration-api.md
b/docs/api-reference/dynamic-configuration-api.md
index 90b7028f247..971aa81d206 100644
--- a/docs/api-reference/dynamic-configuration-api.md
+++ b/docs/api-reference/dynamic-configuration-api.md
@@ -105,7 +105,8 @@ Host: http://ROUTER_IP:ROUTER_PORT
"maxNonPrimaryReplicantsToLoad": 2147483647,
"useRoundRobinSegmentAssignment": true,
"smartSegmentLoading": true,
- "debugDimensions": null
+ "debugDimensions": null,
+ "turboLoadingNodes": []
}
```
@@ -172,7 +173,8 @@ curl
"http://ROUTER_IP:ROUTER_PORT/druid/coordinator/v1/config" \
"pauseCoordination": false,
"replicateAfterLoadTimeout": false,
"maxNonPrimaryReplicantsToLoad": 2147483647,
- "useRoundRobinSegmentAssignment": true
+ "useRoundRobinSegmentAssignment": true,
+ "turboLoadingNodes": []
}'
```
@@ -203,7 +205,8 @@ Content-Length: 683
"pauseCoordination": false,
"replicateAfterLoadTimeout": false,
"maxNonPrimaryReplicantsToLoad": 2147483647,
- "useRoundRobinSegmentAssignment": true
+ "useRoundRobinSegmentAssignment": true,
+ "turboLoadingNodes": []
}
```
@@ -289,7 +292,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"comment": "",
"ip": "127.0.0.1"
},
- "payload":
"{\"millisToWaitBeforeDeleting\":900000,\"maxSegmentsToMove\":5,\"replicantLifetime\":15,\"replicationThrottleLimit\":10,\"balancerComputeThreads\":1,\"killDataSourceWhitelist\":[],\"killPendingSegmentsSkipList\":[],\"maxSegmentsInNodeLoadingQueue\":100,\"decommissioningNodes\":[],\"decommissioningMaxPercentOfMaxSegmentsToMove\":70,\"pauseCoordination\":false,\"replicateAfterLoadTimeout\":false,\"maxNonPrimaryReplicantsToLoad\":2147483647,\"useRoundRobinSegmentAssignme
[...]
+ "payload":
"{\"millisToWaitBeforeDeleting\":900000,\"maxSegmentsToMove\":5,\"replicantLifetime\":15,\"replicationThrottleLimit\":10,\"balancerComputeThreads\":1,\"killDataSourceWhitelist\":[],\"killPendingSegmentsSkipList\":[],\"maxSegmentsInNodeLoadingQueue\":100,\"decommissioningNodes\":[],\"decommissioningMaxPercentOfMaxSegmentsToMove\":70,\"pauseCoordination\":false,\"replicateAfterLoadTimeout\":false,\"maxNonPrimaryReplicantsToLoad\":2147483647,\"useRoundRobinSegmentAssignme
[...]
"auditTime": "2023-10-03T20:59:51.622Z"
}
]
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index d75309b7ac3..687e1d688a6 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -885,7 +885,7 @@ These Coordinator static configurations can be defined in
the `coordinator/runti
|`druid.coordinator.kill.maxInterval`|The largest interval, as an [ISO 8601
duration](https://en.wikipedia.org/wiki/ISO_8601#Durations), of segments to
delete per kill task. Set to zero, e.g. `PT0S`, for unlimited. This only
applies when `druid.coordinator.kill.on=true`.|`P30D`|
|`druid.coordinator.balancer.strategy`|The [balancing
strategy](../design/coordinator.md#balancing-segments-in-a-tier) used by the
Coordinator to distribute segments among the Historical servers in a tier. The
`cost` strategy distributes segments by minimizing a cost function,
`diskNormalized` weights these costs with the disk usage ratios of the servers
and `random` distributes segments randomly.|`cost`|
|`druid.coordinator.loadqueuepeon.http.repeatDelay`|The start and repeat delay
(in milliseconds) for the load queue peon, which manages the load/drop queue of
segments for any server.|1 minute|
-|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop
requests to batch in one HTTP request. Note that it must be smaller than
`druid.segmentCache.numLoadingThreads` config on Historical service.|1|
+|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop
requests to batch in one HTTP request. Note that it must be smaller than or
equal to the `druid.segmentCache.numLoadingThreads` config on Historical
service. If this value is not configured, the coordinator uses the value of the
`numLoadingThreads` for the respective server. |
`druid.segmentCache.numLoadingThreads` |
|`druid.coordinator.asOverlord.enabled`|Boolean value for whether this
Coordinator service should act like an Overlord as well. This configuration
allows users to simplify a Druid cluster by not having to deploy any standalone
Overlord services. If set to true, then Overlord console is available at
`http://coordinator-host:port/console.html` and be sure to set
`druid.coordinator.asOverlord.overlordService` also.|false|
|`druid.coordinator.asOverlord.overlordService`| Required, if
`druid.coordinator.asOverlord.enabled` is `true`. This must be same value as
`druid.service` on standalone Overlord services and
`druid.selectors.indexing.serviceName` on Middle Managers.|NULL|
@@ -953,6 +953,7 @@ The following table shows the dynamic configuration
properties for the Coordinat
|`decommissioningNodes`|List of Historical servers to decommission.
Coordinator will not assign new segments to decommissioning servers, and
segments will be moved away from them to be placed on non-decommissioning
servers at the maximum rate specified by `maxSegmentsToMove`.|none|
|`pauseCoordination`|Boolean flag for whether or not the Coordinator should
execute its various duties of coordinating the cluster. Setting this to true
essentially pauses all coordination work while allowing the API to remain up.
Duties that are paused include all classes that implement the `CoordinatorDuty`
interface. Such duties include: segment balancing, segment compaction,
submitting kill tasks for unused segments (if enabled), logging of used
segments in the cluster, marking of ne [...]
|`replicateAfterLoadTimeout`|Boolean flag for whether or not additional
replication is needed for segments that have failed to load due to the expiry
of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator
will attempt to replicate the failed segment on a different historical server.
This helps improve the segment availability if there are a few slow Historicals
in the cluster. However, the slow Historical may still load the segment later
and the Coordinator may issu [...]
+|`turboLoadingNodes`| List of Historical servers to place in turbo loading
mode. These servers use a larger thread-pool to load segments faster but at the
cost of query performance. For servers specified in `turboLoadingNodes`,
`druid.coordinator.loadqueuepeon.http.batchSize` is ignored and the coordinator
uses the value of the respective `numLoadingThreads` instead. |none|
##### Smart segment loading
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java
b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java
index cd2a8c3740f..70758cd63ab 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java
@@ -28,5 +28,6 @@ import javax.annotation.Nullable;
public interface DataSegmentChangeHandler
{
void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback
callback);
+
void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback
callback);
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 12462adab2f..4d3e22c1a13 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -33,6 +33,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.http.SegmentLoadingMode;
import org.apache.druid.server.metrics.SegmentRowCountDistribution;
import org.apache.druid.timeline.DataSegment;
@@ -44,8 +45,11 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -63,7 +67,8 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
private final SegmentLoaderConfig config;
private final DataSegmentAnnouncer announcer;
private final SegmentManager segmentManager;
- private final ScheduledExecutorService exec;
+ private final ScheduledExecutorService normalLoadExec;
+ private final ThreadPoolExecutor turboLoadExec;
private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
@@ -88,9 +93,19 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
config,
announcer,
segmentManager,
- Executors.newScheduledThreadPool(
+ new ScheduledThreadPoolExecutor(
config.getNumLoadingThreads(),
- Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
+ Execs.makeThreadFactory("SegmentLoadDropHandler-normal-%s")
+ ),
+ // Create a fixed size threadpool which has a timeout of 1 minute.
Since they are all core threads, new threads
+ // will be created without enqueing the tasks till the capacity is
reached.
+ new ThreadPoolExecutor(
+ config.getNumBootstrapThreads(),
+ config.getNumBootstrapThreads(),
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ Execs.makeThreadFactory("SegmentLoadDropHandler-turbo-%s")
)
);
}
@@ -100,13 +115,17 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
SegmentLoaderConfig config,
DataSegmentAnnouncer announcer,
SegmentManager segmentManager,
- ScheduledExecutorService exec
+ ScheduledExecutorService normalLoadExec,
+ ThreadPoolExecutor turboLoadExec
)
{
this.config = config;
this.announcer = announcer;
this.segmentManager = segmentManager;
- this.exec = exec;
+ this.normalLoadExec = normalLoadExec;
+ this.turboLoadExec = turboLoadExec;
+
+ this.turboLoadExec.allowCoreThreadTimeOut(true);
this.segmentsToDelete = new ConcurrentSkipListSet<>();
requestStatuses =
CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build();
@@ -214,7 +233,7 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
"Completely removing segment[%s] in [%,d]ms.",
segment.getId(), config.getDropSegmentDelayMillis()
);
- exec.schedule(
+ normalLoadExec.schedule(
runnable,
config.getDropSegmentDelayMillis(),
TimeUnit.MILLISECONDS
@@ -244,14 +263,22 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
return ImmutableList.copyOf(segmentsToDelete);
}
- public ListenableFuture<List<DataSegmentChangeResponse>>
processBatch(List<DataSegmentChangeRequest> changeRequests)
+ /**
+ * Process a list of {@link DataSegmentChangeRequest}, invoking
+ * {@link #processRequest(DataSegmentChangeRequest, SegmentLoadingMode)} for
each one. Handles the computation
+ * asynchronously and returns a future to the result.
+ */
+ public ListenableFuture<List<DataSegmentChangeResponse>> processBatch(
+ List<DataSegmentChangeRequest> changeRequests,
+ SegmentLoadingMode segmentLoadingMode
+ )
{
boolean isAnyRequestDone = false;
Map<DataSegmentChangeRequest, AtomicReference<SegmentChangeStatus>>
statuses = Maps.newHashMapWithExpectedSize(changeRequests.size());
for (DataSegmentChangeRequest cr : changeRequests) {
- AtomicReference<SegmentChangeStatus> status = processRequest(cr);
+ AtomicReference<SegmentChangeStatus> status = processRequest(cr,
segmentLoadingMode);
if (status.get().getState() != SegmentChangeStatus.State.PENDING) {
isAnyRequestDone = true;
}
@@ -271,7 +298,15 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
return future;
}
- private AtomicReference<SegmentChangeStatus>
processRequest(DataSegmentChangeRequest changeRequest)
+ /**
+ * Process a {@link DataSegmentChangeRequest}, invoking the request's
+ * {@link DataSegmentChangeRequest#go(DataSegmentChangeHandler,
DataSegmentChangeCallback)}.
+ * The segmentLoadingMode parameter determines the thread pool to use.
+ */
+ private AtomicReference<SegmentChangeStatus> processRequest(
+ DataSegmentChangeRequest changeRequest,
+ SegmentLoadingMode segmentLoadingMode
+ )
{
synchronized (requestStatusesLock) {
AtomicReference<SegmentChangeStatus> status =
requestStatuses.getIfPresent(changeRequest);
@@ -282,10 +317,13 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
new DataSegmentChangeHandler()
{
@Override
- public void addSegment(DataSegment segment, @Nullable
DataSegmentChangeCallback callback)
+ public void addSegment(
+ DataSegment segment,
+ @Nullable DataSegmentChangeCallback callback
+ )
{
requestStatuses.put(changeRequest, new
AtomicReference<>(SegmentChangeStatus.PENDING));
- exec.submit(
+ getExecutorService(segmentLoadingMode).submit(
() -> SegmentLoadDropHandler.this.addSegment(
((SegmentChangeRequestLoad)
changeRequest).getSegment(),
() -> resolveWaitingFutures()
@@ -294,7 +332,10 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
}
@Override
- public void removeSegment(DataSegment segment, @Nullable
DataSegmentChangeCallback callback)
+ public void removeSegment(
+ DataSegment segment,
+ @Nullable DataSegmentChangeCallback callback
+ )
{
requestStatuses.put(changeRequest, new
AtomicReference<>(SegmentChangeStatus.PENDING));
SegmentLoadDropHandler.this.removeSegment(
@@ -386,5 +427,15 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
return true;
}
}
+
+ private ExecutorService getExecutorService(SegmentLoadingMode loadingMode)
+ {
+ return loadingMode == SegmentLoadingMode.TURBO ? turboLoadExec :
normalLoadExec;
+ }
+
+ public SegmentLoaderConfig getSegmentLoaderConfig()
+ {
+ return config;
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index d805bad5e01..806b6ebbee1 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -23,10 +23,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.http.SegmentLoadingMode;
import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
@@ -70,6 +72,8 @@ public class CoordinatorDynamicConfig
private final Map<String, String> debugDimensions;
private final Map<Dimension, String> validDebugDimensions;
+ private final Set<String> turboLoadingNodes;
+
/**
* Stale pending segments belonging to the data sources in this list are not
killed by {@code
* KillStalePendingSegments}. In other words, segments in these data sources
are "protected".
@@ -118,7 +122,8 @@ public class CoordinatorDynamicConfig
@JsonProperty("replicateAfterLoadTimeout") boolean
replicateAfterLoadTimeout,
@JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean
useRoundRobinSegmentAssignment,
@JsonProperty("smartSegmentLoading") @Nullable Boolean
smartSegmentLoading,
- @JsonProperty("debugDimensions") @Nullable Map<String, String>
debugDimensions
+ @JsonProperty("debugDimensions") @Nullable Map<String, String>
debugDimensions,
+ @JsonProperty("turboLoadingNodes") @Nullable Set<String>
turboLoadingNodes
)
{
this.markSegmentAsUnusedDelayMillis =
@@ -162,6 +167,7 @@ public class CoordinatorDynamicConfig
);
this.debugDimensions = debugDimensions;
this.validDebugDimensions = validateDebugDimensions(debugDimensions);
+ this.turboLoadingNodes = Configs.valueOrDefault(turboLoadingNodes,
Set.of());
}
private Map<Dimension, String> validateDebugDimensions(Map<String, String>
debugDimensions)
@@ -200,6 +206,13 @@ public class CoordinatorDynamicConfig
}
}
+ public SegmentLoadingMode getLoadingModeForServer(String serverName)
+ {
+ return turboLoadingNodes.contains(serverName) ?
+ SegmentLoadingMode.TURBO :
+ SegmentLoadingMode.NORMAL;
+ }
+
@JsonProperty("millisToWaitBeforeDeleting")
public long getMarkSegmentAsUnusedDelayMillis()
{
@@ -308,6 +321,19 @@ public class CoordinatorDynamicConfig
return replicateAfterLoadTimeout;
}
+ /**
+ * List of servers to put in turbo-loading mode. These servers will use a
larger thread pool to load
+ * segments. This causes decreases the average time taken to load segments.
However, this also means less resources
+ * available to query threads which may cause a drop in query performance.
+ *
+ * @return Set of host:port entries
+ */
+ @JsonProperty
+ public Set<String> getTurboLoadingNodes()
+ {
+ return turboLoadingNodes;
+ }
+
@Override
public String toString()
{
@@ -326,6 +352,7 @@ public class CoordinatorDynamicConfig
", decommissioningNodes=" + decommissioningNodes +
", pauseCoordination=" + pauseCoordination +
", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout +
+ ", turboLoadingNodes=" + turboLoadingNodes +
'}';
}
@@ -359,6 +386,7 @@ public class CoordinatorDynamicConfig
dataSourcesToNotKillStalePendingSegmentsIn,
that.dataSourcesToNotKillStalePendingSegmentsIn)
&& Objects.equals(decommissioningNodes, that.decommissioningNodes)
+ && Objects.equals(turboLoadingNodes, that.turboLoadingNodes)
&& Objects.equals(debugDimensions, that.debugDimensions);
}
@@ -378,7 +406,8 @@ public class CoordinatorDynamicConfig
dataSourcesToNotKillStalePendingSegmentsIn,
decommissioningNodes,
pauseCoordination,
- debugDimensions
+ debugDimensions,
+ turboLoadingNodes
);
}
@@ -430,6 +459,7 @@ public class CoordinatorDynamicConfig
private Boolean replicateAfterLoadTimeout;
private Boolean useRoundRobinSegmentAssignment;
private Boolean smartSegmentLoading;
+ private Set<String> turboLoadingNodes;
public Builder()
{
@@ -452,7 +482,8 @@ public class CoordinatorDynamicConfig
@JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean
replicateAfterLoadTimeout,
@JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean
useRoundRobinSegmentAssignment,
@JsonProperty("smartSegmentLoading") @Nullable Boolean
smartSegmentLoading,
- @JsonProperty("debugDimensions") @Nullable Map<String, String>
debugDimensions
+ @JsonProperty("debugDimensions") @Nullable Map<String, String>
debugDimensions,
+ @JsonProperty("turboLoadingNodes") @Nullable Set<String>
turboLoadingNodes
)
{
this.markSegmentAsUnusedDelayMillis = markSegmentAsUnusedDelayMillis;
@@ -471,6 +502,7 @@ public class CoordinatorDynamicConfig
this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
this.smartSegmentLoading = smartSegmentLoading;
this.debugDimensions = debugDimensions;
+ this.turboLoadingNodes = turboLoadingNodes;
}
public Builder withMarkSegmentAsUnusedDelayMillis(long leadingTimeMillis)
@@ -491,6 +523,12 @@ public class CoordinatorDynamicConfig
return this;
}
+ public Builder withTurboLoadingNodes(Set<String> turboLoadingNodes)
+ {
+ this.turboLoadingNodes = turboLoadingNodes;
+ return this;
+ }
+
public Builder withReplicantLifetime(int replicantLifetime)
{
this.replicantLifetime = replicantLifetime;
@@ -582,7 +620,8 @@ public class CoordinatorDynamicConfig
valueOrDefault(replicateAfterLoadTimeout,
Defaults.REPLICATE_AFTER_LOAD_TIMEOUT),
valueOrDefault(useRoundRobinSegmentAssignment,
Defaults.USE_ROUND_ROBIN_ASSIGNMENT),
valueOrDefault(smartSegmentLoading, Defaults.SMART_SEGMENT_LOADING),
- debugDimensions
+ debugDimensions,
+ turboLoadingNodes
);
}
@@ -612,7 +651,8 @@ public class CoordinatorDynamicConfig
valueOrDefault(replicateAfterLoadTimeout,
defaults.getReplicateAfterLoadTimeout()),
valueOrDefault(useRoundRobinSegmentAssignment,
defaults.isUseRoundRobinSegmentAssignment()),
valueOrDefault(smartSegmentLoading,
defaults.isSmartSegmentLoading()),
- valueOrDefault(debugDimensions, defaults.getDebugDimensions())
+ valueOrDefault(debugDimensions, defaults.getDebugDimensions()),
+ valueOrDefault(turboLoadingNodes, defaults.getTurboLoadingNodes())
);
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfig.java
index f6f402037d1..42af2e948cf 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfig.java
@@ -22,8 +22,11 @@ package org.apache.druid.server.coordinator.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;
+import org.apache.druid.java.util.common.RE;
import org.joda.time.Duration;
+import javax.annotation.Nullable;
+
public class HttpLoadQueuePeonConfig
{
private static final Duration DEFAULT_LOAD_TIMEOUT =
Duration.standardMinutes(15);
@@ -35,21 +38,28 @@ public class HttpLoadQueuePeonConfig
private final Duration repeatDelay;
@JsonProperty
- private final int batchSize;
+ @Nullable
+ private final Integer batchSize;
@JsonCreator
public HttpLoadQueuePeonConfig(
@JsonProperty("hostTimeout") Duration hostTimeout,
@JsonProperty("repeatDelay") Duration repeatDelay,
- @JsonProperty("batchSize") Integer batchSize
+ @JsonProperty("batchSize") @Nullable Integer batchSize
)
{
this.hostTimeout = Configs.valueOrDefault(hostTimeout,
Duration.standardMinutes(5));
this.repeatDelay = Configs.valueOrDefault(repeatDelay,
Duration.standardMinutes(1));
- this.batchSize = Configs.valueOrDefault(batchSize, 1);
+
+ if (batchSize != null && batchSize < 1) {
+ throw new RE("Batch size must be greater than 0.");
+ }
+
+ this.batchSize = batchSize;
}
- public int getBatchSize()
+ @Nullable
+ public Integer getBatchSize()
{
return batchSize;
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
index 015b5686769..4f5ff04911e 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
@@ -22,9 +22,11 @@ package org.apache.druid.server.coordinator.loading;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
@@ -45,6 +47,8 @@ import
org.apache.druid.server.coordinator.stats.CoordinatorStat;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
+import org.apache.druid.server.http.SegmentLoadingCapabilities;
+import org.apache.druid.server.http.SegmentLoadingMode;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -53,7 +57,6 @@ import org.joda.time.Duration;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import java.io.InputStream;
-import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
@@ -69,6 +72,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
/**
*
@@ -82,6 +86,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
new TypeReference<>() {};
private static final EmittingLogger log = new
EmittingLogger(HttpLoadQueuePeon.class);
+ private static final long DEFAULT_TIMEOUT = 10000L;
private final AtomicLong queuedSize = new AtomicLong(0);
private final AtomicReference<CoordinatorRunStats> stats = new
AtomicReference<>(new CoordinatorRunStats());
@@ -114,19 +119,21 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
- private final URL changeRequestURL;
private final String serverId;
private final AtomicBoolean mainLoopInProgress = new AtomicBoolean(false);
private final ExecutorService callBackExecutor;
+ private final Supplier<SegmentLoadingMode> loadingModeSupplier;
private final ObjectWriter requestBodyWriter;
+ private final SegmentLoadingCapabilities serverCapabilities;
public HttpLoadQueuePeon(
String baseUrl,
ObjectMapper jsonMapper,
HttpClient httpClient,
HttpLoadQueuePeonConfig config,
+ Supplier<SegmentLoadingMode> loadingModeSupplier,
ScheduledExecutorService processingExecutor,
ExecutorService callBackExecutor
)
@@ -139,17 +146,48 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
this.callBackExecutor = callBackExecutor;
this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = fetchSegmentLoadingCapabilities();
+ }
+
+ @VisibleForTesting
+ SegmentLoadingCapabilities fetchSegmentLoadingCapabilities()
+ {
try {
- this.changeRequestURL = new URL(
- new URL(baseUrl),
- StringUtils.nonStrictFormat(
- "druid-internal/v1/segments/changeRequests?timeout=%d",
- config.getHostTimeout().getMillis()
- )
+ final URL segmentLoadingCapabilitiesURL = new URL(
+ new URL(serverId),
+ "druid-internal/v1/segments/loadCapabilities"
+ );
+
+ BytesAccumulatingResponseHandler responseHandler = new
BytesAccumulatingResponseHandler();
+ InputStream stream = httpClient.go(
+ new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+ .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+ responseHandler,
+ new Duration(DEFAULT_TIMEOUT)
+ ).get();
+
+ if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
+ int batchSize = config.getBatchSize() == null ? 1 :
config.getBatchSize();
+ SegmentLoadingCapabilities defaultCapabilities = new
SegmentLoadingCapabilities(batchSize, batchSize);
+ log.warn(
+ "Historical capabilities endpoint not found at URL[%s]. Using
default values[%s].",
+ segmentLoadingCapabilitiesURL,
+ defaultCapabilities
+ );
+ return defaultCapabilities;
+ } else if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
+ log.makeAlert("Received status[%s] when fetching loading capabilities
from server[%s]", responseHandler.getStatus(), serverId);
+ throw new RE("Received status[%s] when fetching loading capabilities
from server[%s]", responseHandler.getStatus(), serverId);
+ }
+
+ return jsonMapper.readValue(
+ stream,
+ SegmentLoadingCapabilities.class
);
}
- catch (MalformedURLException ex) {
- throw new RuntimeException(ex);
+ catch (Throwable th) {
+ throw new RE(th, "Received error while fetching historical capabilities
from Server[%s].", serverId);
}
}
@@ -160,7 +198,8 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
return;
}
- final int batchSize = config.getBatchSize();
+ final SegmentLoadingMode loadingMode = loadingModeSupplier.get();
+ final int batchSize = calculateBatchSize(loadingMode);
final List<DataSegmentChangeRequest> newRequests = new
ArrayList<>(batchSize);
@@ -194,19 +233,28 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
if (newRequests.isEmpty()) {
log.trace(
"[%s]Found no load/drop requests. SegmentsToLoad[%d],
SegmentsToDrop[%d], batchSize[%d].",
- serverId, segmentsToLoad.size(), segmentsToDrop.size(),
config.getBatchSize()
+ serverId, segmentsToLoad.size(), segmentsToDrop.size(), batchSize
);
mainLoopInProgress.set(false);
return;
}
try {
- log.trace("Sending [%d] load/drop requests to Server[%s].",
newRequests.size(), serverId);
+ log.trace("Sending [%d] load/drop requests to Server[%s] in
loadingMode[%s].", newRequests.size(), serverId, loadingMode);
final boolean hasLoadRequests = newRequests.stream().anyMatch(r -> r
instanceof SegmentChangeRequestLoad);
if (hasLoadRequests && !loadingRateTracker.isLoadingBatch()) {
loadingRateTracker.markBatchLoadingStarted();
}
+ final URL changeRequestURL = new URL(
+ new URL(serverId),
+ StringUtils.nonStrictFormat(
+
"druid-internal/v1/segments/changeRequests?timeout=%d&loadingMode=%s",
+ config.getHostTimeout().getMillis(),
+ loadingMode
+ )
+ );
+
BytesAccumulatingResponseHandler responseHandler = new
BytesAccumulatingResponseHandler();
ListenableFuture<InputStream> future = httpClient.go(
new Request(HttpMethod.POST, changeRequestURL)
@@ -314,6 +362,25 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
}
}
+ /**
+ * Calculates the number of segments the server is capable of handling at a
time. If loading segments in turbo loading
+ * mode, returns the number of turbo loading threads on the server.
Otherwise, return the value set by the batch size
+ * runtime parameter, or number of normal threads on the server if the
parameter is not set.
+ * Always returns a positive integer.
+ */
+ @VisibleForTesting
+ int calculateBatchSize(SegmentLoadingMode loadingMode)
+ {
+ int batchSize;
+ if (SegmentLoadingMode.TURBO.equals(loadingMode)) {
+ batchSize = serverCapabilities.getNumTurboLoadingThreads();
+ } else {
+ batchSize = Configs.valueOrDefault(config.getBatchSize(),
serverCapabilities.getNumLoadingThreads());
+ }
+
+ return Math.max(batchSize, 1);
+ }
+
private void handleResponseStatus(DataSegmentChangeRequest changeRequest,
SegmentChangeStatus status)
{
changeRequest.go(
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java
index ed6d2d5a3ea..d9fdca36c67 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java
@@ -25,6 +25,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig;
import java.util.List;
@@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
/**
* Provides LoadQueuePeons
@@ -47,6 +49,7 @@ public class LoadQueueTaskMaster
private final ExecutorService callbackExec;
private final HttpLoadQueuePeonConfig config;
private final HttpClient httpClient;
+ private final Supplier<CoordinatorDynamicConfig>
coordinatorDynamicConfigSupplier;
@GuardedBy("this")
private final AtomicBoolean isLeader = new AtomicBoolean(false);
@@ -58,7 +61,8 @@ public class LoadQueueTaskMaster
ScheduledExecutorService peonExec,
ExecutorService callbackExec,
HttpLoadQueuePeonConfig config,
- HttpClient httpClient
+ HttpClient httpClient,
+ Supplier<CoordinatorDynamicConfig> coordinatorDynamicConfigSupplier
)
{
this.jsonMapper = jsonMapper;
@@ -66,11 +70,20 @@ public class LoadQueueTaskMaster
this.callbackExec = callbackExec;
this.config = config;
this.httpClient = httpClient;
+ this.coordinatorDynamicConfigSupplier = coordinatorDynamicConfigSupplier;
}
private LoadQueuePeon createPeon(ImmutableDruidServer server)
{
- return new HttpLoadQueuePeon(server.getURL(), jsonMapper, httpClient,
config, peonExec, callbackExec);
+ return new HttpLoadQueuePeon(
+ server.getURL(),
+ jsonMapper,
+ httpClient,
+ config,
+ () ->
coordinatorDynamicConfigSupplier.get().getLoadingModeForServer(server.getName()),
+ peonExec,
+ callbackExec
+ );
}
public Map<String, LoadQueuePeon> getAllPeons()
diff --git
a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
index 7c4392daf4e..48e03727199 100644
---
a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
+++
b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
@@ -31,6 +31,7 @@ import org.apache.druid.client.HttpServerInventoryView;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer;
import org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
@@ -54,6 +55,7 @@ import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.List;
@@ -219,10 +221,10 @@ public class SegmentListerResource
* This endpoint is used by HttpLoadQueuePeon to assign segment load/drop
requests batch. This endpoint makes the
* client wait till one of the following events occur. Note that this is
implemented using async IO so no jetty
* threads are held while in wait.
- *
- * (1) Given timeout elapses.
- * (2) Some load/drop request completed.
- *
+ * <ol>
+ * <li>Given timeout elapses.</li>
+ * <li>Some load/drop request completed.</li>
+ * </ol>
* It returns a map of "load/drop request -> SUCCESS/FAILED/PENDING status"
for each request in the batch.
*/
@POST
@@ -231,6 +233,7 @@ public class SegmentListerResource
@Consumes({MediaType.APPLICATION_JSON,
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public void applyDataSegmentChangeRequests(
@QueryParam("timeout") long timeout,
+ @QueryParam("loadingMode") @Nullable SegmentLoadingMode loadingMode,
List<DataSegmentChangeRequest> changeRequestList,
@Context final HttpServletRequest req
) throws IOException
@@ -252,7 +255,7 @@ public class SegmentListerResource
final ResponseContext context = createContext(req.getHeader("Accept"));
final ListenableFuture<List<DataSegmentChangeResponse>> future =
- loadDropRequestHandler.processBatch(changeRequestList);
+ loadDropRequestHandler.processBatch(changeRequestList, loadingMode ==
null ? SegmentLoadingMode.NORMAL : loadingMode);
final AsyncContext asyncContext = req.startAsync();
@@ -327,6 +330,24 @@ public class SegmentListerResource
asyncContext.setTimeout(timeout);
}
+ @GET
+ @Path("/loadCapabilities")
+ @Produces({MediaType.APPLICATION_JSON,
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
+ public Response getSegmentLoadingCapabilities(
+ @Context final HttpServletRequest req
+ )
+ {
+ if (loadDropRequestHandler == null) {
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
+ }
+
+ SegmentLoaderConfig config =
loadDropRequestHandler.getSegmentLoaderConfig();
+ SegmentLoadingCapabilities capabilitiesResponse =
+ new SegmentLoadingCapabilities(config.getNumLoadingThreads(),
config.getNumBootstrapThreads());
+
+ return
Response.status(Response.Status.OK).entity(capabilitiesResponse).build();
+ }
+
private void sendErrorResponse(HttpServletRequest req, int code, String
error) throws IOException
{
AsyncContext asyncContext = req.startAsync();
diff --git
a/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java
b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java
new file mode 100644
index 00000000000..9dba8af5e6b
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Contains information related to the capability of a server to load
segments, for example the number of threads
+ * available.
+ */
+public class SegmentLoadingCapabilities
+{
+ private final int numLoadingThreads;
+ private final int numTurboLoadingThreads;
+
+ @JsonCreator
+ public SegmentLoadingCapabilities(
+ @JsonProperty("numLoadingThreads") int numLoadingThreads,
+ @JsonProperty("numTurboLoadingThreads") int numTurboLoadingThreads
+ )
+ {
+ this.numLoadingThreads = numLoadingThreads;
+ this.numTurboLoadingThreads = numTurboLoadingThreads;
+ }
+
+ @JsonProperty("numLoadingThreads")
+ public int getNumLoadingThreads()
+ {
+ return numLoadingThreads;
+ }
+
+ @JsonProperty("numTurboLoadingThreads")
+ public int getNumTurboLoadingThreads()
+ {
+ return numTurboLoadingThreads;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SegmentLoadingCapabilities{" +
+ "numLoadingThreads=" + numLoadingThreads +
+ ", numTurboLoadingThreads=" + numTurboLoadingThreads +
+ '}';
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java
b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingMode.java
similarity index 70%
copy from
server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java
copy to
server/src/main/java/org/apache/druid/server/http/SegmentLoadingMode.java
index cd2a8c3740f..b3896a540bd 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java
+++ b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingMode.java
@@ -17,16 +17,13 @@
* under the License.
*/
-package org.apache.druid.server.coordination;
-
-import org.apache.druid.timeline.DataSegment;
-
-import javax.annotation.Nullable;
+package org.apache.druid.server.http;
/**
+ * Determines the threadpool used by the historical to load segments.
*/
-public interface DataSegmentChangeHandler
+public enum SegmentLoadingMode
{
- void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback
callback);
- void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback
callback);
+ NORMAL,
+ TURBO
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index cd2fe2dbd63..7a8822a60d8 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -30,6 +30,7 @@ import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.SegmentChangeStatus.State;
+import org.apache.druid.server.http.SegmentLoadingMode;
import org.apache.druid.timeline.DataSegment;
import org.junit.Assert;
import org.junit.Before;
@@ -48,6 +49,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.druid.server.TestSegmentUtils.makeSegment;
@@ -106,7 +108,7 @@ public class SegmentLoadDropHandlerTest
scheduledExecutorFactory = (corePoolSize, nameFormat) -> {
// Override normal behavior by adding the runnable to a list so that you
can make sure
- // all the shceduled runnables are executed by explicitly calling run()
on each item in the list
+ // all the scheduled runnables are executed by explicitly calling run()
on each item in the list
return new ScheduledThreadPoolExecutor(corePoolSize,
Execs.makeThreadFactory(nameFormat))
{
@Override
@@ -230,7 +232,7 @@ public class SegmentLoadDropHandlerTest
new SegmentChangeRequestDrop(segment2)
);
- ListenableFuture<List<DataSegmentChangeResponse>> future =
handler.processBatch(batch);
+ ListenableFuture<List<DataSegmentChangeResponse>> future =
handler.processBatch(batch, SegmentLoadingMode.NORMAL);
Map<DataSegmentChangeRequest, SegmentChangeStatus> expectedStatusMap = new
HashMap<>();
expectedStatusMap.put(batch.get(0), SegmentChangeStatus.PENDING);
@@ -244,7 +246,7 @@ public class SegmentLoadDropHandlerTest
runnable.run();
}
- result = handler.processBatch(ImmutableList.of(new
SegmentChangeRequestLoad(segment1))).get();
+ result = handler.processBatch(ImmutableList.of(new
SegmentChangeRequestLoad(segment1)), SegmentLoadingMode.TURBO).get();
Assert.assertEquals(SegmentChangeStatus.SUCCESS,
result.get(0).getStatus());
Assert.assertEquals(ImmutableList.of(segment1),
segmentAnnouncer.getObservedSegments());
@@ -271,7 +273,7 @@ public class SegmentLoadDropHandlerTest
DataSegment segment1 = makeSegment("batchtest1", "1",
Intervals.of("P1d/2011-04-01"));
List<DataSegmentChangeRequest> batch = ImmutableList.of(new
SegmentChangeRequestLoad(segment1));
- ListenableFuture<List<DataSegmentChangeResponse>> future =
handler.processBatch(batch);
+ ListenableFuture<List<DataSegmentChangeResponse>> future =
handler.processBatch(batch, SegmentLoadingMode.NORMAL);
for (Runnable runnable : scheduledRunnable) {
runnable.run();
@@ -280,7 +282,7 @@ public class SegmentLoadDropHandlerTest
Assert.assertEquals(State.FAILED, result.get(0).getStatus().getState());
Assert.assertEquals(ImmutableList.of(),
segmentAnnouncer.getObservedSegments());
- future = handler.processBatch(batch);
+ future = handler.processBatch(batch, SegmentLoadingMode.NORMAL);
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
@@ -343,7 +345,7 @@ public class SegmentLoadDropHandlerTest
List<DataSegmentChangeRequest> batch = ImmutableList.of(new
SegmentChangeRequestLoad(segment1));
// Request 1: Load the segment
- ListenableFuture<List<DataSegmentChangeResponse>> future =
handler.processBatch(batch);
+ ListenableFuture<List<DataSegmentChangeResponse>> future =
handler.processBatch(batch, SegmentLoadingMode.NORMAL);
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
@@ -354,7 +356,7 @@ public class SegmentLoadDropHandlerTest
// Request 2: Drop the segment
batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1));
- future = handler.processBatch(batch);
+ future = handler.processBatch(batch, SegmentLoadingMode.NORMAL);
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
@@ -372,7 +374,7 @@ public class SegmentLoadDropHandlerTest
// Request 3: Reload the segment
batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
- future = handler.processBatch(batch);
+ future = handler.processBatch(batch, SegmentLoadingMode.NORMAL);
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
@@ -389,7 +391,7 @@ public class SegmentLoadDropHandlerTest
// Request 4: Try to reload the segment - segment is loaded and announced
again
batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
- future = handler.processBatch(batch);
+ future = handler.processBatch(batch, SegmentLoadingMode.NORMAL);
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
@@ -420,7 +422,8 @@ public class SegmentLoadDropHandlerTest
config,
segmentAnnouncer,
segmentManager,
- scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]")
+ scheduledExecutorFactory.create(5, "LoadDropHandlerTest-[%d]"),
+ (ThreadPoolExecutor) scheduledExecutorFactory.create(5,
"TurboSegmentLoadDropHandlerTest-[%d]")
);
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
index 36cb9c8439f..d91cb62050a 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
@@ -101,7 +101,7 @@ public class DruidCoordinatorConfigTest
Assert.assertEquals(Duration.standardMinutes(1), config.getRepeatDelay());
Assert.assertEquals(Duration.standardMinutes(5), config.getHostTimeout());
Assert.assertEquals(Duration.standardMinutes(15), config.getLoadTimeout());
- Assert.assertEquals(1, config.getBatchSize());
+ Assert.assertNull(config.getBatchSize());
}
@Test
@@ -118,7 +118,7 @@ public class DruidCoordinatorConfigTest
Assert.assertEquals(Duration.standardMinutes(20), config.getRepeatDelay());
Assert.assertEquals(Duration.standardMinutes(10), config.getHostTimeout());
Assert.assertEquals(Duration.standardMinutes(15), config.getLoadTimeout());
- Assert.assertEquals(100, config.getBatchSize());
+ Assert.assertEquals(Integer.valueOf(100), config.getBatchSize());
}
@Test
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java
new file mode 100644
index 00000000000..4d9a2400215
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.config;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+
+public class HttpLoadQueuePeonConfigTest
+{
+ @Test
+ public void testValidateBatchSize() throws JsonProcessingException
+ {
+ ObjectMapper jsonMapper = new ObjectMapper();
+
+ MatcherAssert.assertThat(
+ Assert.assertThrows(ValueInstantiationException.class, () ->
+ jsonMapper.readValue("{\"batchSize\":0}",
HttpLoadQueuePeonConfig.class)
+ ),
+ CoreMatchers.allOf(
+ CoreMatchers.instanceOf(ValueInstantiationException.class),
+ ThrowableMessageMatcher.hasMessage(
+ CoreMatchers.containsString("Batch size must be greater than
0.")
+ )
+ )
+ );
+
+ HttpLoadQueuePeonConfig emptyConfig = jsonMapper.readValue(
+ "{}",
+ HttpLoadQueuePeonConfig.class
+ );
+ Assert.assertNull(emptyConfig.getBatchSize());
+
+ HttpLoadQueuePeonConfig config = jsonMapper.readValue(
+ "{\"batchSize\":2}",
+ HttpLoadQueuePeonConfig.class
+ );
+ Assert.assertEquals(2, config.getBatchSize().intValue());
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
index 1dcf384d2e0..e511ce77ee2 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
@@ -37,6 +37,8 @@ import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import
org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
+import org.apache.druid.server.http.SegmentLoadingCapabilities;
+import org.apache.druid.server.http.SegmentLoadingMode;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
@@ -81,13 +83,21 @@ public class HttpLoadQueuePeonTest
MAPPER,
httpClient,
new HttpLoadQueuePeonConfig(null, null, 10),
+ () -> SegmentLoadingMode.NORMAL,
new WrappingScheduledExecutorService(
"HttpLoadQueuePeonTest-%s",
httpClient.processingExecutor,
true
),
httpClient.callbackExecutor
- );
+ )
+ {
+ @Override
+ SegmentLoadingCapabilities fetchSegmentLoadingCapabilities()
+ {
+ return new SegmentLoadingCapabilities(1, 3);
+ }
+ };
httpLoadQueuePeon.start();
}
@@ -316,6 +326,37 @@ public class HttpLoadQueuePeonTest
);
}
+ @Test
+ public void testBatchSize()
+ {
+ Assert.assertEquals(10,
httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.NORMAL));
+
+ // Without a batch size runtime parameter
+ httpLoadQueuePeon = new HttpLoadQueuePeon(
+ "http://dummy:4000",
+ MAPPER,
+ httpClient,
+ new HttpLoadQueuePeonConfig(null, null, null),
+ () -> SegmentLoadingMode.NORMAL,
+ new WrappingScheduledExecutorService(
+ "HttpLoadQueuePeonTest-%s",
+ httpClient.processingExecutor,
+ true
+ ),
+ httpClient.callbackExecutor
+ )
+ {
+ @Override
+ SegmentLoadingCapabilities fetchSegmentLoadingCapabilities()
+ {
+ return new SegmentLoadingCapabilities(1, 3);
+ }
+ };
+
+ Assert.assertEquals(1,
httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.NORMAL));
+ Assert.assertEquals(3,
httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.TURBO));
+ }
+
private LoadPeonCallback markSegmentProcessed(DataSegment segment)
{
return success -> httpClient.processedSegments.add(segment);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index ddc360aa012..938b6b09b1a 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -454,19 +454,22 @@ public class CoordinatorSimulationBuilder
createBalancerStrategy(balancerStrategy),
new HttpLoadQueuePeonConfig(null, null, null)
);
+
+ JacksonConfigManager jacksonConfigManager = mockConfigManager();
+ setDynamicConfig(dynamicConfig);
+
this.loadQueueTaskMaster = new LoadQueueTaskMaster(
OBJECT_MAPPER,
executorFactory.create(1, ExecutorFactory.LOAD_QUEUE_EXECUTOR),
executorFactory.create(1, ExecutorFactory.LOAD_CALLBACK_EXECUTOR),
coordinatorConfig.getHttpLoadQueuePeonConfig(),
- httpClient
+ httpClient,
+ () -> dynamicConfig
);
+
this.loadQueueManager =
new SegmentLoadQueueManager(coordinatorInventoryView,
loadQueueTaskMaster);
- JacksonConfigManager jacksonConfigManager = mockConfigManager();
- setDynamicConfig(dynamicConfig);
-
this.lookupCoordinatorManager =
EasyMock.createNiceMock(LookupCoordinatorManager.class);
mocks.add(jacksonConfigManager);
mocks.add(lookupCoordinatorManager);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
index 5caa90d8dfd..877265d85b4 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
@@ -32,6 +33,7 @@ import
org.apache.druid.server.coordination.DataSegmentChangeHandler;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.DataSegmentChangeResponse;
import org.apache.druid.server.coordination.SegmentChangeStatus;
+import org.apache.druid.server.http.SegmentLoadingCapabilities;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponse;
@@ -79,12 +81,16 @@ public class TestSegmentLoadingHttpClient implements
HttpClient
}
@Override
+ @SuppressWarnings("unchecked")
public <Intermediate, Final> ListenableFuture<Final> go(
Request request,
HttpResponseHandler<Intermediate, Final> handler,
Duration readTimeout
)
{
+ if (request.getUrl().toString().contains("/loadCapabilities")) {
+ return getCapabilities(handler);
+ }
return executorService.submit(() -> processRequest(request, handler));
}
@@ -143,6 +149,27 @@ public class TestSegmentLoadingHttpClient implements
HttpClient
.collect(Collectors.toList());
}
+ private <Intermediate, Final> ListenableFuture<Final>
getCapabilities(HttpResponseHandler<Intermediate, Final> handler)
+ {
+ try {
+ // Set response content and status
+ final HttpResponse response = new
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ response.setContent(ChannelBuffers.EMPTY_BUFFER);
+ handler.handleResponse(response, NOOP_TRAFFIC_COP);
+
+ // Serialize
+ SettableFuture future = SettableFuture.create();
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ objectMapper.writeValue(baos, new SegmentLoadingCapabilities(1, 1));
+ future.set(new ByteArrayInputStream(baos.toByteArray()));
+ }
+ return future;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Processes each DataSegmentChangeRequest using the handler.
*/
diff --git
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
index eb7fb199287..1be987ddc27 100644
---
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
@@ -51,7 +51,8 @@ public class CoordinatorDynamicConfigTest
+ " \"maxSegmentsInNodeLoadingQueue\": 1,\n"
+ " \"decommissioningNodes\": [\"host1\", \"host2\"],\n"
+ " \"pauseCoordination\": false,\n"
- + " \"replicateAfterLoadTimeout\": false\n"
+ + " \"replicateAfterLoadTimeout\": false,\n"
+ + " \"turboLoadingNodes\":[\"host1\", \"host3\"]\n"
+ "}\n";
CoordinatorDynamicConfig actual = mapper.readValue(
@@ -65,6 +66,7 @@ public class CoordinatorDynamicConfigTest
);
ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
+ ImmutableSet<String> turboLoadingNodes = ImmutableSet.of("host1", "host3");
assertConfig(
actual,
1,
@@ -78,7 +80,8 @@ public class CoordinatorDynamicConfigTest
1,
decommissioning,
false,
- false
+ false,
+ turboLoadingNodes
);
actual =
CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
@@ -95,7 +98,8 @@ public class CoordinatorDynamicConfigTest
1,
ImmutableSet.of("host1"),
false,
- false
+ false,
+ turboLoadingNodes
);
actual = CoordinatorDynamicConfig.builder().build(actual);
@@ -112,7 +116,8 @@ public class CoordinatorDynamicConfigTest
1,
ImmutableSet.of("host1"),
false,
- false
+ false,
+ turboLoadingNodes
);
actual =
CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual);
@@ -129,7 +134,8 @@ public class CoordinatorDynamicConfigTest
1,
ImmutableSet.of("host1"),
true,
- false
+ false,
+ turboLoadingNodes
);
actual =
CoordinatorDynamicConfig.builder().withReplicateAfterLoadTimeout(true).build(actual);
@@ -146,7 +152,8 @@ public class CoordinatorDynamicConfigTest
1,
ImmutableSet.of("host1"),
true,
- true
+ true,
+ turboLoadingNodes
);
actual = CoordinatorDynamicConfig.builder().build(actual);
@@ -163,7 +170,8 @@ public class CoordinatorDynamicConfigTest
1,
ImmutableSet.of("host1"),
true,
- true
+ true,
+ turboLoadingNodes
);
actual =
CoordinatorDynamicConfig.builder().withKillTaskSlotRatio(0.1).build(actual);
@@ -180,7 +188,8 @@ public class CoordinatorDynamicConfigTest
1,
ImmutableSet.of("host1"),
true,
- true
+ true,
+ turboLoadingNodes
);
actual =
CoordinatorDynamicConfig.builder().withMaxKillTaskSlots(5).build(actual);
@@ -197,7 +206,8 @@ public class CoordinatorDynamicConfigTest
1,
ImmutableSet.of("host1"),
true,
- true
+ true,
+ turboLoadingNodes
);
}
@@ -233,7 +243,8 @@ public class CoordinatorDynamicConfigTest
true,
false,
false,
- null
+ null,
+ ImmutableSet.of("host1")
);
Assert.assertTrue(config.getSpecificDataSourcesToKillUnusedSegmentsIn().isEmpty());
}
@@ -257,7 +268,8 @@ public class CoordinatorDynamicConfigTest
true,
false,
false,
- null
+ null,
+ ImmutableSet.of("host1")
);
Assert.assertEquals(ImmutableSet.of("test1"),
config.getSpecificDataSourcesToKillUnusedSegmentsIn());
}
@@ -272,7 +284,8 @@ public class CoordinatorDynamicConfigTest
+ " \"replicationThrottleLimit\": 1,\n"
+ " \"balancerComputeThreads\": 2, \n"
+ " \"killDataSourceWhitelist\":
[\"test1\",\"test2\"],\n"
- + " \"maxSegmentsInNodeLoadingQueue\": 1\n"
+ + " \"maxSegmentsInNodeLoadingQueue\": 1,\n"
+ + " \"turboLoadingNodes\": [\"host3\",\"host4\"]\n"
+ "}\n";
CoordinatorDynamicConfig actual = mapper.readValue(
@@ -286,6 +299,7 @@ public class CoordinatorDynamicConfigTest
);
ImmutableSet<String> decommissioning = ImmutableSet.of();
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
+ ImmutableSet<String> turboLoading = ImmutableSet.of("host3", "host4");
assertConfig(
actual,
1,
@@ -299,7 +313,8 @@ public class CoordinatorDynamicConfigTest
1,
decommissioning,
false,
- false
+ false,
+ turboLoading
);
actual =
CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
@@ -316,7 +331,8 @@ public class CoordinatorDynamicConfigTest
1,
ImmutableSet.of("host1"),
false,
- false
+ false,
+ turboLoading
);
actual = CoordinatorDynamicConfig.builder().build(actual);
@@ -333,7 +349,8 @@ public class CoordinatorDynamicConfigTest
1,
ImmutableSet.of("host1"),
false,
- false
+ false,
+ turboLoading
);
}
@@ -372,7 +389,8 @@ public class CoordinatorDynamicConfigTest
1,
ImmutableSet.of(),
false,
- false
+ false,
+ ImmutableSet.of()
);
}
@@ -411,7 +429,8 @@ public class CoordinatorDynamicConfigTest
1,
decommissioning,
false,
- false
+ false,
+ ImmutableSet.of()
);
}
@@ -446,7 +465,8 @@ public class CoordinatorDynamicConfigTest
EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
ImmutableSet.of(),
false,
- false
+ false,
+ ImmutableSet.of()
);
}
@@ -468,7 +488,8 @@ public class CoordinatorDynamicConfigTest
EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
emptyList,
false,
- false
+ false,
+ ImmutableSet.of()
);
}
@@ -493,7 +514,8 @@ public class CoordinatorDynamicConfigTest
EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
ImmutableSet.of(),
false,
- false
+ false,
+ ImmutableSet.of()
);
}
@@ -511,6 +533,18 @@ public class CoordinatorDynamicConfigTest
);
}
+ @Test
+ public void testTurboLoadingNodes()
+ {
+ CoordinatorDynamicConfig config = CoordinatorDynamicConfig
+ .builder()
+ .withTurboLoadingNodes(ImmutableSet.of("localhost:8083"))
+ .build();
+
+ Assert.assertEquals(SegmentLoadingMode.NORMAL,
config.getLoadingModeForServer("localhost:8082"));
+ Assert.assertEquals(SegmentLoadingMode.TURBO,
config.getLoadingModeForServer("localhost:8083"));
+ }
+
@Test
public void testEqualsAndHashCode()
{
@@ -533,7 +567,8 @@ public class CoordinatorDynamicConfigTest
int expectedMaxSegmentsInNodeLoadingQueue,
Set<String> decommissioningNodes,
boolean pauseCoordination,
- boolean replicateAfterLoadTimeout
+ boolean replicateAfterLoadTimeout,
+ Set<String> turboLoadingNodes
)
{
Assert.assertEquals(
@@ -554,6 +589,7 @@ public class CoordinatorDynamicConfigTest
Assert.assertEquals(decommissioningNodes,
config.getDecommissioningNodes());
Assert.assertEquals(pauseCoordination, config.getPauseCoordination());
Assert.assertEquals(replicateAfterLoadTimeout,
config.getReplicateAfterLoadTimeout());
+ Assert.assertEquals(turboLoadingNodes, config.getTurboLoadingNodes());
}
private static int getDefaultNumBalancerThreads()
diff --git
a/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java
b/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java
new file mode 100644
index 00000000000..0819439a0aa
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SegmentLoadingCapabilitiesTest
+{
+ private final ObjectMapper jsonMapper = new DefaultObjectMapper();
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ SegmentLoadingCapabilities capabilities = new
SegmentLoadingCapabilities(1, 4);
+
+ SegmentLoadingCapabilities reread =
jsonMapper.readValue(jsonMapper.writeValueAsString(capabilities),
SegmentLoadingCapabilities.class);
+
+ Assert.assertEquals(capabilities.getNumLoadingThreads(),
reread.getNumLoadingThreads());
+ Assert.assertEquals(capabilities.getNumTurboLoadingThreads(),
reread.getNumTurboLoadingThreads());
+ }
+
+ @Test
+ public void testSerdeFromJson() throws JsonProcessingException
+ {
+ String json = "{\"numLoadingThreads\":3,\"numTurboLoadingThreads\":5}";
+ SegmentLoadingCapabilities reread = jsonMapper.readValue(json,
SegmentLoadingCapabilities.class);
+
+ Assert.assertEquals(3, reread.getNumLoadingThreads());
+ Assert.assertEquals(5, reread.getNumTurboLoadingThreads());
+ }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index b1fc5f63404..1f61d9716eb 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -306,7 +306,8 @@ public class CliCoordinator extends ServerRunnable
ScheduledExecutorFactory factory,
DruidCoordinatorConfig config,
@EscalatedGlobal HttpClient httpClient,
- Lifecycle lifecycle
+ Lifecycle lifecycle,
+ CoordinatorConfigManager coordinatorConfigManager
)
{
final ExecutorService callBackExec =
Execs.singleThreaded("LoadQueuePeon-callbackexec--%d");
@@ -316,7 +317,8 @@ public class CliCoordinator extends ServerRunnable
factory.create(1, "Master-PeonExec--%d"),
callBackExec,
config.getHttpLoadQueuePeonConfig(),
- httpClient
+ httpClient,
+ coordinatorConfigManager::getCurrentDynamicConfig
);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]