kfaraz commented on code in PR #17899: URL: https://github.com/apache/druid/pull/17899#discussion_r2063066336
########## docs/querying/query-context.md: ########## @@ -66,6 +66,7 @@ See [SQL query context](sql-query-context.md) for other query context parameters |`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:<br />- Log the stack trace of the exception (if any) produced by the query | |`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. | |`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.| +|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `clonesPreferred`. `excludeClones` means that clone Historicals are not queried by the broker. `clonesPreferred` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones; Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries; MSQ does not query Historicals directly, and Dart will not respect this context parameter.| Review Comment: For follow up: Use `preferClones` instead of `clonesPreferred`. ########## server/src/main/java/org/apache/druid/server/http/BrokerSyncStatus.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.rpc.ServiceLocation; + +import java.util.Objects; + +public class BrokerSyncStatus +{ + private final String host; + private final int port; + private final long syncTimeInMs; Review Comment: cc: @adarshsanjeev , note for follow up ########## server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import com.google.inject.Inject; +import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; +import org.apache.druid.client.coordinator.Coordinator; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.coordinator.CoordinatorClientImpl; +import org.apache.druid.client.selector.HistoricalFilter; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.error.DruidException; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.CloneQueryMode; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocator; +import org.apache.druid.rpc.StandardRetryPolicy; +import org.apache.druid.server.BrokerDynamicConfigResource; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; + +import javax.validation.constraints.NotNull; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Broker view of the coordinator dynamic configuration, and its derived values such as target and source clone servers. + * This class is registered as a managed lifecycle to fetch the coordinator dynamic configuration on startup. Further + * updates are handled through {@link BrokerDynamicConfigResource}. + */ +public class BrokerViewOfCoordinatorConfig implements HistoricalFilter +{ + private static final Logger log = new Logger(BrokerViewOfCoordinatorConfig.class); + private final CoordinatorClient coordinatorClient; + + @GuardedBy("this") + private CoordinatorDynamicConfig config; + @GuardedBy("this") + private Set<String> targetCloneServers; + @GuardedBy("this") + private Set<String> sourceCloneServers; + + @Inject + public BrokerViewOfCoordinatorConfig( + @Json final ObjectMapper jsonMapper, + @EscalatedGlobal final ServiceClientFactory clientFactory, + @Coordinator final ServiceLocator serviceLocator + ) + { + this.coordinatorClient = + new CoordinatorClientImpl( + clientFactory.makeClient( + NodeRole.COORDINATOR.getJsonName(), + serviceLocator, + StandardRetryPolicy.builder().maxAttempts(15).build() + ), + jsonMapper + ); + } + + @VisibleForTesting + public BrokerViewOfCoordinatorConfig(CoordinatorClient coordinatorClient) + { + this.coordinatorClient = coordinatorClient; + } + + /** + * Return the latest {@link CoordinatorDynamicConfig}. + */ + public synchronized CoordinatorDynamicConfig getDynamicConfig() + { + return config; + } + + /** + * Update the config view with a new coordinator dynamic config snapshot. Also updates the source and target clone + * servers based on the new dynamic configuration. + */ + public synchronized void setDynamicConfig(@NotNull CoordinatorDynamicConfig updatedConfig) + { + config = updatedConfig; + final Map<String, String> cloneServers = config.getCloneServers(); + this.targetCloneServers = ImmutableSet.copyOf(cloneServers.keySet()); + this.sourceCloneServers = ImmutableSet.copyOf(cloneServers.values()); + } + + @LifecycleStart + public void start() + { + try { + log.info("Fetching coordinator dynamic configuration."); + + CoordinatorDynamicConfig coordinatorDynamicConfig = coordinatorClient.getCoordinatorDynamicConfig().get(); + setDynamicConfig(coordinatorDynamicConfig); + + log.info("Successfully fetched coordinator dynamic config[%s].", coordinatorDynamicConfig); + } + catch (Exception e) { + // If the fetch fails, the broker should not serve queries. Throw the exception and try again on restart. + throw new RuntimeException("Failed to initialize coordinator dynamic config", e); + } + } + + @Override + public Int2ObjectRBTreeMap<Set<QueryableDruidServer>> getQueryableServers( + Int2ObjectRBTreeMap<Set<QueryableDruidServer>> historicalServers, + CloneQueryMode mode + ) + { + final Set<String> serversToIgnore = getCurrentServersToIgnore(mode); + + if (serversToIgnore.isEmpty()) { + return historicalServers; + } + + final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> filteredHistoricals = new Int2ObjectRBTreeMap<>(); + for (int priority : historicalServers.keySet()) { + Set<QueryableDruidServer> servers = historicalServers.get(priority); + filteredHistoricals.put(priority, + servers.stream() + .filter(server -> !serversToIgnore.contains(server.getServer().getHost())) + .collect(Collectors.toSet()) + ); + } + + return filteredHistoricals; + } + + /** + * Get the list of servers that should not be queried based on the cloneQueryMode parameter. + */ + private synchronized Set<String> getCurrentServersToIgnore(CloneQueryMode cloneQueryMode) + { + switch (cloneQueryMode) { + case PREFERCLONES: + // Remove servers being cloned targets, so that clones are queried. + return sourceCloneServers; + case EXCLUDECLONES: + // Remove clones, so that only source servers are queried. + return targetCloneServers; + case INCLUDECLONES: + // Don't remove either. + return Set.of(); + default: + throw DruidException.defensive("Unexpected value: [%s]", cloneQueryMode); Review Comment: follow up: ```suggestion throw DruidException.defensive("Unexpected value of cloneQueryMode[%s]", cloneQueryMode); ``` ########## processing/src/main/java/org/apache/druid/query/QueryContexts.java: ########## @@ -149,6 +150,7 @@ public class QueryContexts public static final boolean DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN = true; public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE = true; public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS = false; + public static final CloneQueryMode DEFAULT_CLONE_QUERY_MODE = CloneQueryMode.EXCLUDECLONES; Review Comment: Note for follow-up: Should the default be exclude or include? ########## server/src/main/java/org/apache/druid/server/http/BrokerSyncStatus.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.rpc.ServiceLocation; + +import java.util.Objects; + +public class BrokerSyncStatus +{ + private final String host; + private final int port; + private final long syncTimeInMs; Review Comment: No, unfortunately, it is not documented anywhere. I agree that calling out the units removes the ambiguity, esp. when it comes to specifying durations. (Side note: when writing configs for durations, we should either just call out the unit in the config name or accept a `Duration` or `Period` in the config value. We should probably document this somewhere as a Druid coding guideline.) But I actually had other concerns with this field name. `syncTimeInMs` sounds more like the _duration_ it took to finish a sync, rather than the last sync timestamp. So we should definitely call it `lastSyncTimestamp`. For clarity, we can add the unit suffix and calling it `lastSyncTimestampMillis`. ########## server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import com.google.inject.Inject; +import org.apache.druid.client.broker.BrokerClient; +import org.apache.druid.client.broker.BrokerClientImpl; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.rpc.FixedServiceLocator; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocation; +import org.apache.druid.rpc.StandardRetryPolicy; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.coordinator.CoordinatorConfigManager; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; + +import javax.annotation.Nullable; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * Updates all brokers with the latest coordinator dynamic config. + */ +public class CoordinatorDynamicConfigSyncer +{ + private static final Logger log = new Logger(CoordinatorDynamicConfigSyncer.class); + + private final CoordinatorConfigManager configManager; + private final ObjectMapper jsonMapper; + private final DruidNodeDiscoveryProvider druidNodeDiscovery; + + private final ServiceClientFactory clientFactory; + private final ScheduledExecutorService exec; + private @Nullable Future<?> syncFuture = null; + + @GuardedBy("this") + private final Set<BrokerSyncStatus> inSyncBrokers; + private final AtomicReference<CoordinatorDynamicConfig> lastKnownConfig = new AtomicReference<>(); + + @Inject + public CoordinatorDynamicConfigSyncer( + @EscalatedGlobal final ServiceClientFactory clientFactory, + final CoordinatorConfigManager configManager, + @Json final ObjectMapper jsonMapper, + final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider + ) + { + this.clientFactory = clientFactory; + this.configManager = configManager; + this.jsonMapper = jsonMapper; + this.druidNodeDiscovery = druidNodeDiscoveryProvider; + this.exec = Execs.scheduledSingleThreaded("CoordinatorDynamicConfigSyncer-%d"); Review Comment: Note for follow up: There should preferably be a lifecycle stop method which shuts down this executor, as it allows for nicer cleanup of the resources used by this class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
