kfaraz commented on code in PR #17863:
URL: https://github.com/apache/druid/pull/17863#discussion_r2030892447


##########
docs/configuration/index.md:
##########
@@ -954,6 +954,7 @@ The following table shows the dynamic configuration 
properties for the Coordinat
 |`pauseCoordination`|Boolean flag for whether or not the Coordinator should 
execute its various duties of coordinating the cluster. Setting this to true 
essentially pauses all coordination work while allowing the API to remain up. 
Duties that are paused include all classes that implement the `CoordinatorDuty` 
interface. Such duties include: segment balancing, segment compaction, 
submitting kill tasks for unused segments (if enabled), logging of used 
segments in the cluster, marking of newly unused or overshadowed segments, 
matching and execution of load/drop rules for used segments, unloading segments 
that are no longer marked as used from Historical servers. An example of when 
an admin may want to pause coordination would be if they are doing deep storage 
maintenance on HDFS name nodes with downtime and don't want the Coordinator to 
be directing Historical nodes to hit the name node with API requests until 
maintenance is done and the deep store is declared healthy for use again.|fa
 lse|
 |`replicateAfterLoadTimeout`|Boolean flag for whether or not additional 
replication is needed for segments that have failed to load due to the expiry 
of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator 
will attempt to replicate the failed segment on a different historical server. 
This helps improve the segment availability if there are a few slow Historicals 
in the cluster. However, the slow Historical may still load the segment later 
and the Coordinator may issue drop requests if the segment is 
over-replicated.|false|
 |`turboLoadingNodes`| Experimental. List of Historical servers to place in 
turbo loading mode. These servers use a larger thread-pool to load segments 
faster but at the cost of query performance. For servers specified in 
`turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is 
ignored and the coordinator uses the value of the respective 
`numLoadingThreads` instead.<br/>Please use this config with caution. All 
servers should eventually be removed from this list once the segment loading on 
the respective historicals is finished. |none|
+|`cloneServers`| Experimental. Map of source to target Historicals to place 
into cloning mode. Target historicals are not involved in normal segment 
assignment or segment balancing. Instead, segment assignments to the source 
Historical are mirrored to the target Historical, so that the source Historical 
becomes an exact copy. Segments on the target Historical do not count towards 
the desired segment replication counts.<br/>Please use this config with 
caution. All servers should eventually be removed from this list once the 
desired state on the respective historicals is achieved. |none|

Review Comment:
   ```suggestion
   |`cloneServers`| Experimental. Map from source Historical server to target 
Historical server which should be made a clone of the source. The target 
Historical does not participate in regular segment assignment or balancing. 
Instead, the Coordinator mirrors any segment assignment made to the source 
Historical onto the target Historical, so that the target becomes an exact copy 
of the source. Segments on the target Historical do not count towards replica 
counts either.<br/>Use this config with caution. All servers should eventually 
be removed from this list once the desired state on the respective Historicals 
is achieved. |none|
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.loading.SegmentAction;
+import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Handles cloning of historicals. Given the historical to historical clone 
mappings, based on
+ * {@link CoordinatorDynamicConfig#getCloneServers()}, copies any segments 
load or unload requests from the source
+ * historical to the target historical.

Review Comment:
   Please add a line to this javadoc mentioning that if the source disappears, 
the target will remain in the last known state of the source unless removed 
from `cloneServers`.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.loading.SegmentAction;
+import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Handles cloning of historicals. Given the historical to historical clone 
mappings, based on
+ * {@link CoordinatorDynamicConfig#getCloneServers()}, copies any segments 
load or unload requests from the source
+ * historical to the target historical.
+ */
+public class CloneHistoricals implements CoordinatorDuty
+{
+  private static final Logger log = new Logger(CloneHistoricals.class);
+  private final SegmentLoadQueueManager loadQueueManager;
+
+  public CloneHistoricals(SegmentLoadQueueManager loadQueueManager)
+  {
+    this.loadQueueManager = loadQueueManager;
+  }
+
+  @Override
+  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
+  {
+    final Map<String, String> cloneServers = 
params.getCoordinatorDynamicConfig().getCloneServers();
+    final CoordinatorRunStats stats = params.getCoordinatorStats();
+
+    if (cloneServers.isEmpty()) {
+      // No servers to be cloned.
+      return params;
+    }
+
+    // Create a map of host to historical.
+    final Map<String, ServerHolder> historicalMap = params.getDruidCluster()
+                                                          .getHistoricals()
+                                                          .values()
+                                                          .stream()
+                                                          
.flatMap(Collection::stream)
+                                                          
.collect(Collectors.toMap(
+                                                              serverHolder -> 
serverHolder.getServer().getHost(),
+                                                              serverHolder -> 
serverHolder
+                                                          ));
+
+    for (Map.Entry<String, String> entry : cloneServers.entrySet()) {
+      log.debug("Handling cloning for mapping: [%s]", entry);

Review Comment:
   We can probably remove this.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.loading.SegmentAction;
+import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Handles cloning of historicals. Given the historical to historical clone 
mappings, based on
+ * {@link CoordinatorDynamicConfig#getCloneServers()}, copies any segments 
load or unload requests from the source
+ * historical to the target historical.
+ */
+public class CloneHistoricals implements CoordinatorDuty
+{
+  private static final Logger log = new Logger(CloneHistoricals.class);
+  private final SegmentLoadQueueManager loadQueueManager;
+
+  public CloneHistoricals(SegmentLoadQueueManager loadQueueManager)
+  {
+    this.loadQueueManager = loadQueueManager;
+  }
+
+  @Override
+  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
+  {
+    final Map<String, String> cloneServers = 
params.getCoordinatorDynamicConfig().getCloneServers();
+    final CoordinatorRunStats stats = params.getCoordinatorStats();
+
+    if (cloneServers.isEmpty()) {
+      // No servers to be cloned.
+      return params;
+    }
+
+    // Create a map of host to historical.
+    final Map<String, ServerHolder> historicalMap = params.getDruidCluster()
+                                                          .getHistoricals()
+                                                          .values()
+                                                          .stream()
+                                                          
.flatMap(Collection::stream)
+                                                          
.collect(Collectors.toMap(
+                                                              serverHolder -> 
serverHolder.getServer().getHost(),
+                                                              serverHolder -> 
serverHolder
+                                                          ));
+
+    for (Map.Entry<String, String> entry : cloneServers.entrySet()) {
+      log.debug("Handling cloning for mapping: [%s]", entry);
+
+      final String sourceHistoricalName = entry.getKey();
+      final ServerHolder sourceServer = 
historicalMap.get(sourceHistoricalName);
+
+      if (sourceServer == null) {
+        log.error(
+            "Could not process clone mapping[%s] as source historical[%s] does 
not exist.",
+            sourceHistoricalName,
+            entry
+        );
+        continue;
+      }
+
+      final String targetHistoricalName = entry.getValue();
+      final ServerHolder targetServer = 
historicalMap.get(targetHistoricalName);
+
+      if (targetServer == null) {
+        log.error(
+            "Could not process clone mapping[%s] as target historical[%s] does 
not exist.",
+            targetHistoricalName,
+            entry
+        );
+        continue;
+      }

Review Comment:
   We can probably merge these conditions into one.
   
   ```suggestion
         final String targetHistoricalName = entry.getValue();
         final ServerHolder targetServer = 
historicalMap.get(targetHistoricalName);
   
         if (sourceServer == null || targetServer == null) {
           log.error(
               "Could not process clone mapping[%s] as historical[%s] does not 
exist.",
               entry,
               (sourceServer == null ? sourceServerName : targetHistoricalName)
           );
           continue;
         }
   ```



##########
docs/configuration/index.md:
##########
@@ -954,6 +954,7 @@ The following table shows the dynamic configuration 
properties for the Coordinat
 |`pauseCoordination`|Boolean flag for whether or not the Coordinator should 
execute its various duties of coordinating the cluster. Setting this to true 
essentially pauses all coordination work while allowing the API to remain up. 
Duties that are paused include all classes that implement the `CoordinatorDuty` 
interface. Such duties include: segment balancing, segment compaction, 
submitting kill tasks for unused segments (if enabled), logging of used 
segments in the cluster, marking of newly unused or overshadowed segments, 
matching and execution of load/drop rules for used segments, unloading segments 
that are no longer marked as used from Historical servers. An example of when 
an admin may want to pause coordination would be if they are doing deep storage 
maintenance on HDFS name nodes with downtime and don't want the Coordinator to 
be directing Historical nodes to hit the name node with API requests until 
maintenance is done and the deep store is declared healthy for use again.|fa
 lse|
 |`replicateAfterLoadTimeout`|Boolean flag for whether or not additional 
replication is needed for segments that have failed to load due to the expiry 
of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator 
will attempt to replicate the failed segment on a different historical server. 
This helps improve the segment availability if there are a few slow Historicals 
in the cluster. However, the slow Historical may still load the segment later 
and the Coordinator may issue drop requests if the segment is 
over-replicated.|false|
 |`turboLoadingNodes`| Experimental. List of Historical servers to place in 
turbo loading mode. These servers use a larger thread-pool to load segments 
faster but at the cost of query performance. For servers specified in 
`turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is 
ignored and the coordinator uses the value of the respective 
`numLoadingThreads` instead.<br/>Please use this config with caution. All 
servers should eventually be removed from this list once the segment loading on 
the respective historicals is finished. |none|
+|`cloneServers`| Experimental. Map of source to target Historicals to place 
into cloning mode. Target historicals are not involved in normal segment 
assignment or segment balancing. Instead, segment assignments to the source 
Historical are mirrored to the target Historical, so that the source Historical 
becomes an exact copy. Segments on the target Historical do not count towards 
the desired segment replication counts.<br/>Please use this config with 
caution. All servers should eventually be removed from this list once the 
desired state on the respective historicals is achieved. |none|

Review Comment:
   Maybe also add a line about "if the source disappears, the target remains in 
the last known state of the source server until removed from the cloneServers".



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java:
##########
@@ -446,7 +446,8 @@ private int dropReplicas(
     for (ServerHolder server : eligibleServers) {
       if (server.isDecommissioning()) {
         eligibleDyingServers.add(server);
-      } else {
+      } else if (!server.isUnmanaged()) {

Review Comment:
   Rather than having this check in all places, we should keep a separate set 
of `managedHistoricals` in `DruidCluster`, and add a method 
`getManagedHistoricals()` and all places that do segment assignment, balancing 
etc, should use `getManagedHistoricals()` instead of `getHistoricals()`.
   
   Similarly, you would also need to keep a set of `allManagedServers`.
   Please let me know if you need any further clarification on this.



##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -322,6 +325,18 @@ public boolean getReplicateAfterLoadTimeout()
     return replicateAfterLoadTimeout;
   }
 
+  /**
+   * Map of source to target Historicals to place into cloning mode. Target 
historicals are not involved in normal
+   * segment assignment or segment balancing. Instead, segment assignments to 
the source Historical are mirrored to
+   * the target Historical, so that the source Historical becomes an exact 
copy. Segments on the target Historical do
+   * not count towards the desired segment replication counts.
+   */
+  @JsonProperty
+  public Map<String, String> getCloneServers()

Review Comment:
   Given the format of the other configs like `turboLoadingNodes` and 
`decommissoningNodes`, and also the overall logic, I think it would make more 
sense for the map to be from target to source rather than the opposite.
   
   It's easier to reason about the map in that way, as "cloneServers is the 
list of servers that clone some other servers".
   Also, then multiple target servers could potentially clone the same source 
server (although I can't think of a case when we might need this).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to