diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 92942a068cf..6242eced1f2 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -694,6 +694,7 @@ These coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0| |`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. `cachingCost` is logically equivalent to `cost` but is more CPU-efficient on large clusters and will replace `cost` in the future versions, users are invited to try it. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`| |`druid.coordinator.loadqueuepeon.repeatDelay`|The start and repeat delay for the loadqueuepeon , which manages the load and drop of segments.|PT0.050S (50 ms)| +|`druid.coordinator.loadqueuepeon.threadPoolSize`|How many threads to use for the loadqueuepeon to manage the load and drop of segments.|1| |`druid.coordinator.asOverlord.enabled`|Boolean value for whether this coordinator node should act like an overlord as well. This configuration allows users to simplify a druid cluster by not having to deploy any standalone overlord nodes. If set to true, then overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also. See next.|false| |`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord nodes and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL| diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java index e0972260ac9..a0e444f4415 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java @@ -223,24 +223,26 @@ public void unmarkSegmentToDrop(DataSegment dataSegment) private void processSegmentChangeRequest() { - if (currentlyProcessing != null) { - log.debug( - "Server[%s] skipping processSegmentChangeRequest because something is currently loading[%s].", - basePath, - currentlyProcessing.getSegmentIdentifier() - ); + synchronized (lock) { + if (currentlyProcessing != null) { + log.debug( + "Server[%s] skipping processSegmentChangeRequest because something is currently loading[%s].", + basePath, + currentlyProcessing.getSegmentIdentifier() + ); - return; - } + return; + } - if (!segmentsToDrop.isEmpty()) { - currentlyProcessing = segmentsToDrop.firstEntry().getValue(); - log.debug("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); - } else if (!segmentsToLoad.isEmpty()) { - currentlyProcessing = segmentsToLoad.firstEntry().getValue(); - log.debug("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); - } else { - return; + if (!segmentsToDrop.isEmpty()) { + currentlyProcessing = segmentsToDrop.firstEntry().getValue(); + log.debug("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); + } else if (!segmentsToLoad.isEmpty()) { + currentlyProcessing = segmentsToLoad.firstEntry().getValue(); + log.debug("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); + } else { + return; + } } try { diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 7f36e0cb606..212a752cf5c 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -233,10 +233,14 @@ public LoadQueueTaskMaster getLoadQueueTaskMaster( ZkPathsConfig zkPaths ) { + int poolSize = Integer.parseInt(properties.getProperty( + "druid.coordinator.loadqueuepeon.threadPoolSize", + "1" + )); return new LoadQueueTaskMaster( curator, jsonMapper, - factory.create(1, "Master-PeonExec--%d"), + factory.create(poolSize, "Master-PeonExec--%d"), Executors.newSingleThreadExecutor(), config, httpClient,
With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
