cryptoe commented on code in PR #17899:
URL: https://github.com/apache/druid/pull/17899#discussion_r2048632065


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java:
##########
@@ -162,7 +163,7 @@ public List<InputSlice> sliceDynamic(
    */
   int findWorkerForServerSelector(final ServerSelector serverSelector, final 
int maxNumSlices)
   {
-    final QueryableDruidServer server = serverSelector.pick(null);
+    final QueryableDruidServer server = serverSelector.pick(null, 
HistoricalFilter.IDENTITY_FILTER);

Review Comment:
   Lets mention in the release notes clearly that the routing filtering will 
not work for DART. 
   Also mention that since MSQ does not query historicals, that execution 
engine is not affected by these changes. 



##########
docs/querying/query-context.md:
##########
@@ -66,6 +66,7 @@ See [SQL query context](sql-query-context.md) for other query 
context parameters
 |`debug`| `false` | Flag indicating whether to enable debugging outputs for 
the query. When set to false, no additional logs will be produced (logs 
produced will be entirely dependent on your logging level). When set to true, 
the following addition logs will be produced:<br />- Log the stack trace of the 
exception (if any) produced by the query |
 |`setProcessingThreadNames`|`true`| Whether processing thread names will be 
set to `queryType_dataSource_intervals` while processing a query. This aids in 
interpreting thread dumps, and is on by default. Query overhead can be reduced 
slightly by setting this to `false`. This has a tiny effect in most scenarios, 
but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
 |`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge 
two Project operators when inlining expressions causes complexity to increase. 
Implemented as a workaround to exception `There are not enough rules to produce 
a node with desired properties: convention=DRUID, sort=[]` thrown after 
rejecting the merge of two projects.|
+|`cloneQueryMode`|`EXCLUDE`| Indicates whether clone Historicals should be 
queried by brokers. Clone servers are created by the `cloneServers` Coordinator 
dynamic configuration. Possible values are `EXCLUDE`, `INCLUDE` and `ONLY`. 
`EXCLUDE` means that clone Historicals are not queried by the broker. `ONLY` 
indicates that when given a choice between the clone Historical and the 
original Historical which is being cloned, the broker chooses the clones; 
Historicals which are not involved in the cloning process will still be 
queried. `INCLUDE` means that broker queries any Historical without regarding 
clone status. |

Review Comment:
   Instead of `ONLY` should it be `CLONE_PREFERRED`. 



##########
processing/src/main/java/org/apache/druid/query/CloneQueryMode.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+public enum CloneQueryMode

Review Comment:
   Also mention how this mode is set ie via Query context. 



##########
server/src/main/java/org/apache/druid/client/CoordinatorDynamicConfigView.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class CoordinatorDynamicConfigView
+{
+  private static final Logger log = new 
Logger(CoordinatorDynamicConfigView.class);
+  private final CoordinatorClient coordinatorClient;
+
+  @Inject
+  public CoordinatorDynamicConfigView(CoordinatorClient coordinatorClient)
+  {
+    this.coordinatorClient = coordinatorClient;
+  }
+
+  private final AtomicReference<CoordinatorDynamicConfig> config = new 
AtomicReference<>();
+
+  public CoordinatorDynamicConfig getConfig()
+  {
+    return config.get();
+  }
+
+  public Set<String> getTargetCloneServers()
+  {
+    CoordinatorDynamicConfig coordinatorDynamicConfig = config.get();
+    return coordinatorDynamicConfig.getCloneServers().keySet();

Review Comment:
   Is this map immutable ?



##########
server/src/main/java/org/apache/druid/client/CoordinatorDynamicConfigView.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class CoordinatorDynamicConfigView
+{
+  private static final Logger log = new 
Logger(CoordinatorDynamicConfigView.class);
+  private final CoordinatorClient coordinatorClient;
+
+  @Inject
+  public CoordinatorDynamicConfigView(CoordinatorClient coordinatorClient)
+  {
+    this.coordinatorClient = coordinatorClient;
+  }
+
+  private final AtomicReference<CoordinatorDynamicConfig> config = new 
AtomicReference<>();
+
+  public CoordinatorDynamicConfig getConfig()
+  {
+    return config.get();
+  }
+
+  public Set<String> getTargetCloneServers()
+  {
+    CoordinatorDynamicConfig coordinatorDynamicConfig = config.get();
+    return coordinatorDynamicConfig.getCloneServers().keySet();
+  }
+
+  public Set<String> getSourceCloneServers()
+  {
+    CoordinatorDynamicConfig coordinatorDynamicConfig = config.get();

Review Comment:
   If you wanna do this via atomic reference, create a snapshot method in the 
coordinatorDynamicConfig. 



##########
server/src/main/java/org/apache/druid/client/CoordinatorDynamicConfigView.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class CoordinatorDynamicConfigView
+{
+  private static final Logger log = new 
Logger(CoordinatorDynamicConfigView.class);
+  private final CoordinatorClient coordinatorClient;
+
+  @Inject
+  public CoordinatorDynamicConfigView(CoordinatorClient coordinatorClient)
+  {
+    this.coordinatorClient = coordinatorClient;
+  }
+
+  private final AtomicReference<CoordinatorDynamicConfig> config = new 
AtomicReference<>();
+
+  public CoordinatorDynamicConfig getConfig()
+  {
+    return config.get();
+  }
+
+  public Set<String> getTargetCloneServers()
+  {
+    CoordinatorDynamicConfig coordinatorDynamicConfig = config.get();
+    return coordinatorDynamicConfig.getCloneServers().keySet();
+  }
+
+  public Set<String> getSourceCloneServers()
+  {
+    CoordinatorDynamicConfig coordinatorDynamicConfig = config.get();
+    return new HashSet<>(coordinatorDynamicConfig.getCloneServers().values());
+  }
+
+  public void updateCloneServers(CoordinatorDynamicConfig updatedConfig)

Review Comment:
   Please add javadocs. 



##########
server/src/main/java/org/apache/druid/client/CoordinatorDynamicConfigView.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class CoordinatorDynamicConfigView
+{
+  private static final Logger log = new 
Logger(CoordinatorDynamicConfigView.class);
+  private final CoordinatorClient coordinatorClient;
+
+  @Inject
+  public CoordinatorDynamicConfigView(CoordinatorClient coordinatorClient)
+  {
+    this.coordinatorClient = coordinatorClient;
+  }
+
+  private final AtomicReference<CoordinatorDynamicConfig> config = new 
AtomicReference<>();
+
+  public CoordinatorDynamicConfig getConfig()
+  {
+    return config.get();
+  }
+
+  public Set<String> getTargetCloneServers()
+  {
+    CoordinatorDynamicConfig coordinatorDynamicConfig = config.get();
+    return coordinatorDynamicConfig.getCloneServers().keySet();
+  }
+
+  public Set<String> getSourceCloneServers()
+  {
+    CoordinatorDynamicConfig coordinatorDynamicConfig = config.get();
+    return new HashSet<>(coordinatorDynamicConfig.getCloneServers().values());
+  }
+
+  public void updateCloneServers(CoordinatorDynamicConfig updatedConfig)
+  {
+    config.set(updatedConfig);

Review Comment:
   Whenever we update, can we also maintain an immutable ignore server list 
here and then just fetch it on query time. 



##########
server/src/main/java/org/apache/druid/server/BrokerQueryResource.java:
##########
@@ -89,19 +95,22 @@ public Response getQueryTargets(
       InputStream in,
       @QueryParam("pretty") String pretty,
       @QueryParam("numCandidates") @DefaultValue("-1") int numCandidates,
+      @QueryParam("cloneQueryMode") CloneQueryMode cloneQueryMode,

Review Comment:
   Mark @Nullable ?



##########
server/src/main/java/org/apache/druid/server/BrokerQueryResource.java:
##########
@@ -89,19 +95,22 @@ public Response getQueryTargets(
       InputStream in,
       @QueryParam("pretty") String pretty,
       @QueryParam("numCandidates") @DefaultValue("-1") int numCandidates,
+      @QueryParam("cloneQueryMode") CloneQueryMode cloneQueryMode,
       @Context final HttpServletRequest req
   ) throws IOException
   {
     final ResourceIOReaderWriter ioReaderWriter = 
createResourceIOReaderWriter(req, pretty != null);
     try {
       Query<?> query = ioReaderWriter.getRequestMapper().readValue(in, 
Query.class);
       ExecutionVertex ev = ExecutionVertex.of(query);
+      HistoricalFilter historicalFilter = new HistoricalFilter(configView, 
cloneQueryMode == null ? CloneQueryMode.EXCLUDE : cloneQueryMode);

Review Comment:
   ```suggestion
         HistoricalFilter historicalFilter = new HistoricalFilter(configView, 
cloneQueryMode == null ? QUERY_CONTEXT.DEFAULT : cloneQueryMode); 
   ```
   ?



##########
server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
+import 
org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
+import org.apache.druid.java.util.http.client.response.BytesFullResponseHolder;
+import org.apache.druid.rpc.FixedServiceLocator;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.rpc.ServiceClient;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocation;
+import org.apache.druid.rpc.StandardRetryPolicy;
+import org.apache.druid.server.coordinator.CoordinatorConfigManager;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.net.URL;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Updates all brokers with the latest coordinator dynamic config.
+ */
+public class CoordinatorDynamicConfigSyncer
+{
+  private static final Logger log = new 
Logger(CoordinatorDynamicConfigSyncer.class);
+
+  private final CoordinatorConfigManager configManager;
+  private final ObjectMapper jsonMapper;
+  private final DruidNodeDiscoveryProvider druidNodeDiscovery;
+
+  private final AtomicReference<CoordinatorDynamicConfig> lastKnownConfig = 
new AtomicReference<>();
+  private final ServiceClientFactory clientFactory;
+  private final ExecutorService exec;
+  private final Set<String> inSyncBrokers;
+
+  @Inject
+  public CoordinatorDynamicConfigSyncer(
+      @EscalatedGlobal final ServiceClientFactory clientFactory,
+      final CoordinatorConfigManager configManager,
+      @Json final ObjectMapper jsonMapper,
+      final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider
+  )
+  {
+    this.clientFactory = clientFactory;
+    this.configManager = configManager;
+    this.jsonMapper = jsonMapper;
+    this.druidNodeDiscovery = druidNodeDiscoveryProvider;
+    this.exec = Execs.singleThreaded("DynamicConfigSyncer-%d");
+    this.inSyncBrokers = ConcurrentHashMap.newKeySet();
+  }
+
+  public void broadcastConfigToBrokers()
+  {
+    invalidateInSyncBrokersIfNeeded();
+    for (ServiceLocation broker : getKnownBrokers()) {
+      exec.submit(() -> pushConfigToBroker(broker));
+    }
+  }
+
+  public synchronized Set<String> getInSyncBrokers()

Review Comment:
   Why is this synchronized required ?



##########
server/src/main/java/org/apache/druid/server/coordinator/CloneStatusManager.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.server.coordinator.loading.SegmentAction;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class CloneStatusManager
+{
+  private final AtomicReference<Map<String, CloneStatusMetrics>> 
cloneStatusSnapshot;
+
+  public CloneStatusManager()
+  {
+    this.cloneStatusSnapshot = new AtomicReference<>(ImmutableMap.of());
+  }
+
+  public Map<String, CloneStatusMetrics> getStatusForAllServers()
+  {
+    return cloneStatusSnapshot.get();
+  }
+
+  public CloneStatusMetrics getStatusForServer(String targetServer)
+  {
+    return cloneStatusSnapshot.get().get(targetServer);
+  }
+
+  public void updateStats(Map<String, ServerHolder> historicalMap, Map<String, 
String> cloneServers)
+  {
+    final Map<String, CloneStatusMetrics> newStatusMap = new HashMap<>();
+
+    for (Map.Entry<String, String> entry : cloneServers.entrySet()) {
+      final String targetServerName = entry.getKey();
+      final ServerHolder targetServer = historicalMap.get(entry.getKey());
+      final String sourceServerName = entry.getValue();
+
+      long segmentLoad = 0L;
+      long bytesLeft = 0L;
+      long segmentDrop = 0L;
+
+      CloneStatusMetrics newStatus;
+      if (targetServer == null) {
+        newStatus = CloneStatusMetrics.unknown(sourceServerName);
+      } else {
+
+        CloneStatusMetrics.Status status;
+        if (!historicalMap.containsKey(sourceServerName)) {
+          status = CloneStatusMetrics.Status.SOURCE_SERVER_MISSING;
+        } else {
+          status = CloneStatusMetrics.Status.LOADING;
+        }
+
+        for (Map.Entry<DataSegment, SegmentAction> queuedSegment : 
targetServer.getQueuedSegments().entrySet()) {
+          if (queuedSegment.getValue().isLoad()) {
+            segmentLoad += 1;
+            bytesLeft += queuedSegment.getKey().getSize();
+          } else {
+            segmentDrop += 1;
+          }
+        }
+        newStatus = new CloneStatusMetrics(sourceServerName, status, 
segmentLoad, segmentDrop, bytesLeft);
+      }
+      newStatusMap.put(targetServerName, newStatus);
+    }
+
+    cloneStatusSnapshot.set(newStatusMap);

Review Comment:
   this should be converted to an immutable map. 



##########
server/src/main/java/org/apache/druid/server/coordinator/CloneStatusMetrics.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class CloneStatusMetrics
+{
+  private final String sourceServer;
+  private final Status status;
+  private final long segmentLoadsRemaining;
+  private final long segmenetsDropsRemaining;
+  private final long bytesRemaining;
+
+  @JsonCreator
+  public CloneStatusMetrics(
+      @JsonProperty("sourceServer") String sourceServer,
+      @JsonProperty("status") Status status,
+      @JsonProperty("segmentLoadsRemaining") long segmentLoadsRemaining,
+      @JsonProperty("segmentDropsRemaining") long segmenetsDropsRemaining,
+      @JsonProperty("bytesRemaining") long bytesRemaining
+  )
+  {
+    this.sourceServer = sourceServer;
+    this.status = status;
+    this.segmentLoadsRemaining = segmentLoadsRemaining;
+    this.segmenetsDropsRemaining = segmenetsDropsRemaining;
+    this.bytesRemaining = bytesRemaining;
+  }
+
+  @JsonProperty("sourceServer")
+  public String getSourceServer()
+  {
+    return sourceServer;
+  }
+
+  @JsonProperty("segmentLoadsRemaining")
+  public long getSegmentLoadsRemaining()
+  {
+    return segmentLoadsRemaining;
+  }
+
+  @JsonProperty("segmentDropsRemaining")
+  public long getSegmenetsDropsRemaining()
+  {
+    return segmenetsDropsRemaining;
+  }
+
+  @JsonProperty("bytesRemaining")
+  public long getBytesRemaining()
+  {
+    return bytesRemaining;
+  }
+
+  @JsonProperty("status")
+  public Status getStatus()
+  {
+    return status;
+  }
+
+  public static CloneStatusMetrics unknown(String sourceServer)
+  {
+    return new CloneStatusMetrics(sourceServer, Status.TARGET_SERVER_MISSING, 
0, 0, 0);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    CloneStatusMetrics that = (CloneStatusMetrics) o;
+    return segmentLoadsRemaining == that.segmentLoadsRemaining
+           && segmenetsDropsRemaining == that.segmenetsDropsRemaining
+           && bytesRemaining == that.bytesRemaining
+           && Objects.equals(sourceServer, that.sourceServer)
+           && status == that.status;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(sourceServer, status, segmentLoadsRemaining, 
segmenetsDropsRemaining, bytesRemaining);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "CloneStatusMetrics{" +
+           "sourceServer='" + sourceServer + '\'' +
+           ", status=" + status +
+           ", segmentLoadsRemaining=" + segmentLoadsRemaining +
+           ", segmenetsDropsRemaining=" + segmenetsDropsRemaining +
+           ", bytesRemaining=" + bytesRemaining +
+           '}';
+  }
+
+  public enum Status
+  {
+    SOURCE_SERVER_MISSING,

Review Comment:
   Java docs for enums are generally helpful. 



##########
server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java:
##########
@@ -158,4 +164,28 @@ public Response getStatusOfDuties()
   {
     return Response.ok(new 
CoordinatorDutyStatus(coordinator.getStatusOfDuties())).build();
   }
+
+  @GET
+  @Path("/brokerConfigurationStatus")
+  @ResourceFilters(StateResourceFilter.class)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getBrokerStatus()
+  {
+    return 
Response.ok(coordinatorDynamicConfigSyncer.getInSyncBrokers()).build();
+  }
+
+  @GET
+  @Path("/cloneStatus")
+  @ResourceFilters(StateResourceFilter.class)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getCloneStatus(@QueryParam("targetServer") String 
targetServer)

Review Comment:
   Should this be marked nullable?



##########
processing/src/main/java/org/apache/druid/query/CloneQueryMode.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+public enum CloneQueryMode
+{
+  ONLY,

Review Comment:
   Please add java docs to this. 



##########
server/src/main/java/org/apache/druid/client/CoordinatorDynamicConfigView.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class CoordinatorDynamicConfigView

Review Comment:
   Could you please add the updation flow of this class as the responsibility 
of updating this changes from broker startup , to coordinator pushing updates. 



##########
server/src/main/java/org/apache/druid/client/selector/HistoricalFilter.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.selector;
+
+import com.google.common.collect.ImmutableSet;
+import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
+import org.apache.druid.client.CoordinatorDynamicConfigView;
+import org.apache.druid.client.QueryableDruidServer;
+import org.apache.druid.query.CloneQueryMode;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class HistoricalFilter implements 
Function<Int2ObjectRBTreeMap<Set<QueryableDruidServer>>, 
Int2ObjectRBTreeMap<Set<QueryableDruidServer>>>
+{
+  public static final HistoricalFilter IDENTITY_FILTER = new 
HistoricalFilter(ImmutableSet::of);
+
+  public HistoricalFilter(Supplier<Set<String>> serversToIgnoreSupplier)
+  {
+    this.serversToIgnoreSupplier = serversToIgnoreSupplier;
+  }
+
+  public HistoricalFilter(CoordinatorDynamicConfigView configView, 
CloneQueryMode cloneQueryMode)
+  {
+    this.serversToIgnoreSupplier = () -> {
+      final Set<String> serversToIgnore = new HashSet<>();

Review Comment:
   Can't we have this precomputed. 
   Clone servers are a one time thing. We might do that 10 times in 2 hours and 
then forget about it for a month or so. 
   The cost of creating these objects on each query seems a lot. 
   Suppose you have 1000 historicals, this method might not scale ?
   



##########
server/src/main/java/org/apache/druid/server/DruidInternalDynamicConfigResource.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import com.google.inject.Inject;
+import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.druid.client.CoordinatorDynamicConfigView;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.http.security.ConfigResourceFilter;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+@Path("/druid-internal/v1/dynamicConfiguration")
+public class DruidInternalDynamicConfigResource
+{
+  private final CoordinatorDynamicConfigView coordinatorDynamicConfigView;
+
+  @Inject
+  public DruidInternalDynamicConfigResource(CoordinatorDynamicConfigView 
coordinatorDynamicConfigView)
+  {
+    this.coordinatorDynamicConfigView = coordinatorDynamicConfigView;
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(ConfigResourceFilter.class)
+  @Path("/coordinatorDynamicConfig")
+  public Response getDatasource()
+  {
+    return Response.ok(coordinatorDynamicConfigView.getConfig()).build();
+  }
+
+  @POST
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ResourceFilters(ConfigResourceFilter.class)
+  @Path("/coordinatorDynamicConfig")
+  public Response setDatasource(final CoordinatorDynamicConfig dynamicConfig)

Review Comment:
   setDynamicConfig?



##########
server/src/main/java/org/apache/druid/server/DruidInternalDynamicConfigResource.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import com.google.inject.Inject;
+import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.druid.client.CoordinatorDynamicConfigView;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.http.security.ConfigResourceFilter;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+@Path("/druid-internal/v1/dynamicConfiguration")
+public class DruidInternalDynamicConfigResource
+{
+  private final CoordinatorDynamicConfigView coordinatorDynamicConfigView;
+
+  @Inject
+  public DruidInternalDynamicConfigResource(CoordinatorDynamicConfigView 
coordinatorDynamicConfigView)
+  {
+    this.coordinatorDynamicConfigView = coordinatorDynamicConfigView;
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(ConfigResourceFilter.class)
+  @Path("/coordinatorDynamicConfig")
+  public Response getDatasource()

Review Comment:
   getDynamicConfig() ?



##########
server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
+import 
org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
+import org.apache.druid.java.util.http.client.response.BytesFullResponseHolder;
+import org.apache.druid.rpc.FixedServiceLocator;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.rpc.ServiceClient;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocation;
+import org.apache.druid.rpc.StandardRetryPolicy;
+import org.apache.druid.server.coordinator.CoordinatorConfigManager;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.net.URL;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Updates all brokers with the latest coordinator dynamic config.
+ */
+public class CoordinatorDynamicConfigSyncer
+{
+  private static final Logger log = new 
Logger(CoordinatorDynamicConfigSyncer.class);
+
+  private final CoordinatorConfigManager configManager;
+  private final ObjectMapper jsonMapper;
+  private final DruidNodeDiscoveryProvider druidNodeDiscovery;
+
+  private final AtomicReference<CoordinatorDynamicConfig> lastKnownConfig = 
new AtomicReference<>();
+  private final ServiceClientFactory clientFactory;
+  private final ExecutorService exec;
+  private final Set<String> inSyncBrokers;
+
+  @Inject
+  public CoordinatorDynamicConfigSyncer(
+      @EscalatedGlobal final ServiceClientFactory clientFactory,
+      final CoordinatorConfigManager configManager,
+      @Json final ObjectMapper jsonMapper,
+      final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider
+  )
+  {
+    this.clientFactory = clientFactory;
+    this.configManager = configManager;
+    this.jsonMapper = jsonMapper;
+    this.druidNodeDiscovery = druidNodeDiscoveryProvider;
+    this.exec = Execs.singleThreaded("DynamicConfigSyncer-%d");
+    this.inSyncBrokers = ConcurrentHashMap.newKeySet();
+  }
+
+  public void broadcastConfigToBrokers()
+  {
+    invalidateInSyncBrokersIfNeeded();
+    for (ServiceLocation broker : getKnownBrokers()) {
+      exec.submit(() -> pushConfigToBroker(broker));
+    }
+  }
+
+  public synchronized Set<String> getInSyncBrokers()
+  {
+    return Set.copyOf(inSyncBrokers);
+  }
+
+  private void pushConfigToBroker(ServiceLocation brokerLocation)
+  {
+    final ServiceClient brokerClient = clientFactory.makeClient(
+        NodeRole.BROKER.getJsonName(),
+        new FixedServiceLocator(brokerLocation),
+        StandardRetryPolicy.builder().maxAttempts(6).build()
+    );
+
+    try {
+      CoordinatorDynamicConfig currentDynamicConfig = 
configManager.getCurrentDynamicConfig();
+      final RequestBuilder requestBuilder =
+          new RequestBuilder(HttpMethod.POST, 
"/druid-internal/v1/dynamicConfiguration/coordinatorDynamicConfig")
+              .jsonContent(jsonMapper, currentDynamicConfig);
+
+      final BytesFullResponseHolder responseHolder = 
brokerClient.request(requestBuilder, new BytesFullResponseHandler());
+      final HttpResponseStatus status = responseHolder.getStatus();
+      if (status.equals(HttpResponseStatus.OK)) {
+        addToInSyncBrokers(currentDynamicConfig, brokerLocation);
+      } else {
+        log.error(
+            "Received status [%s] while posting dynamic configs to broker[%s]",
+            status.getCode(),
+            brokerLocation
+        );
+      }
+    }
+    catch (Exception e) {
+      // Catch and ignore the exception, wait for the next sync.
+      log.error(
+          e,
+          "Exception while syncing dynamic configuration to broker[%s]",
+          brokerLocation
+      );
+    }
+  }
+
+  private Set<ServiceLocation> getKnownBrokers()
+  {
+    return druidNodeDiscovery.getForNodeRole(NodeRole.BROKER)
+                             .getAllNodes()
+                             .stream()
+                             .map(DiscoveryDruidNode::toServiceLocation)
+                             .collect(Collectors.toSet());
+  }
+
+  private synchronized void invalidateInSyncBrokersIfNeeded()
+  {
+    final CoordinatorDynamicConfig currentDynamicConfig = 
configManager.getCurrentDynamicConfig();
+    if (!currentDynamicConfig.equals(lastKnownConfig.get())) {
+      // Config has changed, clear the inSync list.
+      lastKnownConfig.set(currentDynamicConfig);
+      inSyncBrokers.clear();

Review Comment:
   Order should be reversed as just good coding practice. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to