kfaraz commented on code in PR #17863: URL: https://github.com/apache/druid/pull/17863#discussion_r2030892447
########## docs/configuration/index.md: ########## @@ -954,6 +954,7 @@ The following table shows the dynamic configuration properties for the Coordinat |`pauseCoordination`|Boolean flag for whether or not the Coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` interface. Such duties include: segment balancing, segment compaction, submitting kill tasks for unused segments (if enabled), logging of used segments in the cluster, marking of newly unused or overshadowed segments, matching and execution of load/drop rules for used segments, unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS name nodes with downtime and don't want the Coordinator to be directing Historical nodes to hit the name node with API requests until maintenance is done and the deep store is declared healthy for use again.|fa lse| |`replicateAfterLoadTimeout`|Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow Historicals in the cluster. However, the slow Historical may still load the segment later and the Coordinator may issue drop requests if the segment is over-replicated.|false| |`turboLoadingNodes`| Experimental. List of Historical servers to place in turbo loading mode. These servers use a larger thread-pool to load segments faster but at the cost of query performance. For servers specified in `turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is ignored and the coordinator uses the value of the respective `numLoadingThreads` instead.<br/>Please use this config with caution. All servers should eventually be removed from this list once the segment loading on the respective historicals is finished. |none| +|`cloneServers`| Experimental. Map of source to target Historicals to place into cloning mode. Target historicals are not involved in normal segment assignment or segment balancing. Instead, segment assignments to the source Historical are mirrored to the target Historical, so that the source Historical becomes an exact copy. Segments on the target Historical do not count towards the desired segment replication counts.<br/>Please use this config with caution. All servers should eventually be removed from this list once the desired state on the respective historicals is achieved. |none| Review Comment: ```suggestion |`cloneServers`| Experimental. Map from source Historical server to target Historical server which should be made a clone of the source. The target Historical does not participate in regular segment assignment or balancing. Instead, the Coordinator mirrors any segment assignment made to the source Historical onto the target Historical, so that the target becomes an exact copy of the source. Segments on the target Historical do not count towards replica counts either.<br/>Use this config with caution. All servers should eventually be removed from this list once the desired state on the respective Historicals is achieved. |none| ``` ########## server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.ServerHolder; +import org.apache.druid.server.coordinator.loading.SegmentAction; +import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; +import org.apache.druid.timeline.DataSegment; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Handles cloning of historicals. Given the historical to historical clone mappings, based on + * {@link CoordinatorDynamicConfig#getCloneServers()}, copies any segments load or unload requests from the source + * historical to the target historical. Review Comment: Please add a line to this javadoc mentioning that if the source disappears, the target will remain in the last known state of the source unless removed from `cloneServers`. ########## server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.ServerHolder; +import org.apache.druid.server.coordinator.loading.SegmentAction; +import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; +import org.apache.druid.timeline.DataSegment; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Handles cloning of historicals. Given the historical to historical clone mappings, based on + * {@link CoordinatorDynamicConfig#getCloneServers()}, copies any segments load or unload requests from the source + * historical to the target historical. + */ +public class CloneHistoricals implements CoordinatorDuty +{ + private static final Logger log = new Logger(CloneHistoricals.class); + private final SegmentLoadQueueManager loadQueueManager; + + public CloneHistoricals(SegmentLoadQueueManager loadQueueManager) + { + this.loadQueueManager = loadQueueManager; + } + + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + final Map<String, String> cloneServers = params.getCoordinatorDynamicConfig().getCloneServers(); + final CoordinatorRunStats stats = params.getCoordinatorStats(); + + if (cloneServers.isEmpty()) { + // No servers to be cloned. + return params; + } + + // Create a map of host to historical. + final Map<String, ServerHolder> historicalMap = params.getDruidCluster() + .getHistoricals() + .values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toMap( + serverHolder -> serverHolder.getServer().getHost(), + serverHolder -> serverHolder + )); + + for (Map.Entry<String, String> entry : cloneServers.entrySet()) { + log.debug("Handling cloning for mapping: [%s]", entry); Review Comment: We can probably remove this. ########## server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.ServerHolder; +import org.apache.druid.server.coordinator.loading.SegmentAction; +import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; +import org.apache.druid.timeline.DataSegment; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Handles cloning of historicals. Given the historical to historical clone mappings, based on + * {@link CoordinatorDynamicConfig#getCloneServers()}, copies any segments load or unload requests from the source + * historical to the target historical. + */ +public class CloneHistoricals implements CoordinatorDuty +{ + private static final Logger log = new Logger(CloneHistoricals.class); + private final SegmentLoadQueueManager loadQueueManager; + + public CloneHistoricals(SegmentLoadQueueManager loadQueueManager) + { + this.loadQueueManager = loadQueueManager; + } + + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + final Map<String, String> cloneServers = params.getCoordinatorDynamicConfig().getCloneServers(); + final CoordinatorRunStats stats = params.getCoordinatorStats(); + + if (cloneServers.isEmpty()) { + // No servers to be cloned. + return params; + } + + // Create a map of host to historical. + final Map<String, ServerHolder> historicalMap = params.getDruidCluster() + .getHistoricals() + .values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toMap( + serverHolder -> serverHolder.getServer().getHost(), + serverHolder -> serverHolder + )); + + for (Map.Entry<String, String> entry : cloneServers.entrySet()) { + log.debug("Handling cloning for mapping: [%s]", entry); + + final String sourceHistoricalName = entry.getKey(); + final ServerHolder sourceServer = historicalMap.get(sourceHistoricalName); + + if (sourceServer == null) { + log.error( + "Could not process clone mapping[%s] as source historical[%s] does not exist.", + sourceHistoricalName, + entry + ); + continue; + } + + final String targetHistoricalName = entry.getValue(); + final ServerHolder targetServer = historicalMap.get(targetHistoricalName); + + if (targetServer == null) { + log.error( + "Could not process clone mapping[%s] as target historical[%s] does not exist.", + targetHistoricalName, + entry + ); + continue; + } Review Comment: We can probably merge these conditions into one. ```suggestion final String targetHistoricalName = entry.getValue(); final ServerHolder targetServer = historicalMap.get(targetHistoricalName); if (sourceServer == null || targetServer == null) { log.error( "Could not process clone mapping[%s] as historical[%s] does not exist.", entry, (sourceServer == null ? sourceServerName : targetHistoricalName) ); continue; } ``` ########## docs/configuration/index.md: ########## @@ -954,6 +954,7 @@ The following table shows the dynamic configuration properties for the Coordinat |`pauseCoordination`|Boolean flag for whether or not the Coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` interface. Such duties include: segment balancing, segment compaction, submitting kill tasks for unused segments (if enabled), logging of used segments in the cluster, marking of newly unused or overshadowed segments, matching and execution of load/drop rules for used segments, unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS name nodes with downtime and don't want the Coordinator to be directing Historical nodes to hit the name node with API requests until maintenance is done and the deep store is declared healthy for use again.|fa lse| |`replicateAfterLoadTimeout`|Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow Historicals in the cluster. However, the slow Historical may still load the segment later and the Coordinator may issue drop requests if the segment is over-replicated.|false| |`turboLoadingNodes`| Experimental. List of Historical servers to place in turbo loading mode. These servers use a larger thread-pool to load segments faster but at the cost of query performance. For servers specified in `turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is ignored and the coordinator uses the value of the respective `numLoadingThreads` instead.<br/>Please use this config with caution. All servers should eventually be removed from this list once the segment loading on the respective historicals is finished. |none| +|`cloneServers`| Experimental. Map of source to target Historicals to place into cloning mode. Target historicals are not involved in normal segment assignment or segment balancing. Instead, segment assignments to the source Historical are mirrored to the target Historical, so that the source Historical becomes an exact copy. Segments on the target Historical do not count towards the desired segment replication counts.<br/>Please use this config with caution. All servers should eventually be removed from this list once the desired state on the respective historicals is achieved. |none| Review Comment: Maybe also add a line about "if the source disappears, the target remains in the last known state of the source server until removed from the cloneServers". ########## server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java: ########## @@ -446,7 +446,8 @@ private int dropReplicas( for (ServerHolder server : eligibleServers) { if (server.isDecommissioning()) { eligibleDyingServers.add(server); - } else { + } else if (!server.isUnmanaged()) { Review Comment: Rather than having this check in all places, we should keep a separate set of `managedHistoricals` in `DruidCluster`, and add a method `getManagedHistoricals()` and all places that do segment assignment, balancing etc, should use `getManagedHistoricals()` instead of `getHistoricals()`. Similarly, you would also need to keep a set of `allManagedServers`. Please let me know if you need any further clarification on this. ########## server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java: ########## @@ -322,6 +325,18 @@ public boolean getReplicateAfterLoadTimeout() return replicateAfterLoadTimeout; } + /** + * Map of source to target Historicals to place into cloning mode. Target historicals are not involved in normal + * segment assignment or segment balancing. Instead, segment assignments to the source Historical are mirrored to + * the target Historical, so that the source Historical becomes an exact copy. Segments on the target Historical do + * not count towards the desired segment replication counts. + */ + @JsonProperty + public Map<String, String> getCloneServers() Review Comment: Given the format of the other configs like `turboLoadingNodes` and `decommissoningNodes`, and also the overall logic, I think it would make more sense for the map to be from target to source rather than the opposite. It's easier to reason about the map in that way, as "cloneServers is the list of servers that clone some other servers". Also, then multiple target servers could potentially clone the same source server (although I can't think of a case when we might need this). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
