jtuglu1 commented on code in PR #19011: URL: https://github.com/apache/druid/pull/19011#discussion_r2898545575
########## server/src/main/java/org/apache/druid/server/http/BaseDynamicConfigSyncer.java: ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.client.broker.BrokerClient; +import org.apache.druid.client.broker.BrokerClientImpl; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.java.util.common.Stopwatch; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.rpc.FixedServiceLocator; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocation; +import org.apache.druid.rpc.StandardRetryPolicy; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.coordinator.stats.CoordinatorStat; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Base class for syncing dynamic configuration to all brokers. + * Subclasses must implement: + * - {@link #getCurrentConfig()} to provide the latest config + * - {@link #pushConfigToBroker(BrokerClient, Object)} to push config via the appropriate BrokerClient method + * - {@link #getConfigTypeName()} for logging + * + * @param <T> the type of dynamic configuration (e.g., CoordinatorDynamicConfig, BrokerDynamicConfig) + */ +public abstract class BaseDynamicConfigSyncer<T> +{ + private static final Logger log = new Logger(BaseDynamicConfigSyncer.class); + + private final ObjectMapper jsonMapper; + private final DruidNodeDiscoveryProvider druidNodeDiscovery; + private final ServiceClientFactory clientFactory; + private final ScheduledExecutorService exec; + private final ServiceEmitter emitter; + private @Nullable Future<?> syncFuture = null; + + @GuardedBy("this") + private final Set<BrokerSyncStatus> inSyncBrokers; + private final AtomicReference<T> lastKnownConfig = new AtomicReference<>(); + + protected BaseDynamicConfigSyncer( + final ServiceClientFactory clientFactory, + final ObjectMapper jsonMapper, + final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + final ServiceEmitter emitter, + final ScheduledExecutorService exec + ) + { + this.clientFactory = clientFactory; + this.jsonMapper = jsonMapper; + this.druidNodeDiscovery = druidNodeDiscoveryProvider; + this.emitter = emitter; + this.exec = exec; + this.inSyncBrokers = ConcurrentHashMap.newKeySet(); + } + + /** + * Get the current configuration to broadcast to brokers. + */ + protected abstract T getCurrentConfig(); + + /** + * Push the config to a broker using the appropriate BrokerClient method. + * @return true if the push was successful + */ + protected abstract boolean pushConfigToBroker(BrokerClient brokerClient, T config) throws Exception; + + /** + * Get the name of this config type for logging (e.g., "coordinator dynamic configuration", "broker dynamic configuration"). + */ + protected abstract String getConfigTypeName(); + + /** + * Queues the configuration sync to the brokers without blocking the calling thread. + */ + public void queueBroadcastConfigToBrokers() + { + exec.submit(this::broadcastConfigToBrokers); + } + + /** + * Push the latest dynamic config to all currently known Brokers. Also + * invalidates the set of inSyncBrokers if the config has changed. + */ + @VisibleForTesting + public void broadcastConfigToBrokers() + { + invalidateInSyncBrokersIfNeeded(); + final Stopwatch stopwatch = Stopwatch.createStarted(); + for (DiscoveryDruidNode broker : getKnownBrokers()) { + pushConfigToBrokerNode(broker); + } + emitStat( + Stats.Configuration.TOTAL_SYNC_TIME, + RowKey.empty(), + stopwatch.millisElapsed() + ); + } + + /** + * Returns the set of Brokers which have been updated with the latest config. + */ + public synchronized Set<BrokerSyncStatus> getInSyncBrokers() + { + return Set.copyOf(inSyncBrokers); + } + + /** + * Schedules a periodic sync with brokers when the coordinator becomes the leader. + */ + public void onLeaderStart() + { + log.info("Starting %s syncing to brokers on leader node.", getConfigTypeName()); Review Comment: nit: `Starting syncing config [%s] to brokers on leader node` ########## docs/configuration/index.md: ########## @@ -1862,6 +1862,89 @@ queries in order to avoid running as a default priority of 0. |--------|-----------|-------| |`druid.broker.internal.query.config.context`|A string formatted `key:value` map of a query context to add to internally generated broker queries.|null| +#### Dynamic configuration Review Comment: let's move this section to `docs/api-reference/dynamic-configuration-api.md`. ########## server/src/main/java/org/apache/druid/server/http/BrokerDynamicConfigSyncer.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.druid.client.broker.BrokerClient; +import org.apache.druid.common.config.JacksonConfigManager; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.server.broker.BrokerDynamicConfig; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Syncs broker dynamic configuration to all brokers. + */ +public class BrokerDynamicConfigSyncer extends BaseDynamicConfigSyncer<BrokerDynamicConfig> +{ + private final AtomicReference<BrokerDynamicConfig> currentConfig; + + @Inject + public BrokerDynamicConfigSyncer( + @EscalatedGlobal final ServiceClientFactory clientFactory, + final JacksonConfigManager configManager, + @Json final ObjectMapper jsonMapper, + final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + final ServiceEmitter emitter + ) + { + super( + clientFactory, + jsonMapper, + druidNodeDiscoveryProvider, + emitter, + Execs.scheduledSingleThreaded("BrokerDynamicConfigSyncer-%d") + ); + this.currentConfig = configManager.watch( + BrokerDynamicConfig.CONFIG_KEY, + BrokerDynamicConfig.class, + new BrokerDynamicConfig(null) + ); + } + + @Override + protected BrokerDynamicConfig getCurrentConfig() + { + return currentConfig.get(); + } + + @Override + protected boolean pushConfigToBroker(BrokerClient brokerClient, BrokerDynamicConfig config) throws Exception + { + return brokerClient.updateBrokerDynamicConfig(config).get(); + } + + @Override + protected String getConfigTypeName() + { + return "broker dynamic configuration"; Review Comment: same thing here about type name ########## server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java: ########## @@ -86,179 +47,31 @@ public CoordinatorDynamicConfigSyncer( final ServiceEmitter emitter ) { - this.clientFactory = clientFactory; - this.configManager = configManager; - this.jsonMapper = jsonMapper; - this.druidNodeDiscovery = druidNodeDiscoveryProvider; - this.exec = Execs.scheduledSingleThreaded("CoordinatorDynamicConfigSyncer-%d"); - this.inSyncBrokers = ConcurrentHashMap.newKeySet(); - this.emitter = emitter; - } - - /** - * Queues the configuration sync to the brokers without blocking the calling thread. - */ - public void queueBroadcastConfigToBrokers() - { - exec.submit(this::broadcastConfigToBrokers); - } - - /** - * Push the latest coordinator dynamic config, provided by the configManager to all currently known Brokers. Also - * invalidates the set of inSyncBrokers if the config has changed. - */ - @VisibleForTesting - void broadcastConfigToBrokers() - { - invalidateInSyncBrokersIfNeeded(); - final Stopwatch stopwatch = Stopwatch.createStarted(); - for (DiscoveryDruidNode broker : getKnownBrokers()) { - pushConfigToBroker(broker); - } - emitStat( - Stats.Configuration.TOTAL_SYNC_TIME, - RowKey.empty(), - stopwatch.millisElapsed() - ); - } - - /** - * Returns the set of Brokers which have been updated with the latest {@link CoordinatorDynamicConfig}. - */ - public synchronized Set<BrokerSyncStatus> getInSyncBrokers() - { - return Set.copyOf(inSyncBrokers); - } - - /** - * Schedules a periodic sync with brokers when the coordinator becomes the leader. - */ - public void onLeaderStart() - { - log.info("Starting coordinator config syncing to brokers on leader node."); - syncFuture = exec.scheduleAtFixedRate( - this::broadcastConfigToBrokers, - 30L, - 60L, - TimeUnit.SECONDS - ); - } - - /** - * Stops the sync when coordinator stops being the leader. - */ - public void onLeaderStop() - { - log.info("Not leader, stopping coordinator config syncing to brokers."); - if (syncFuture != null) { - syncFuture.cancel(true); - } - } - - @LifecycleStop - public void stop() - { - exec.shutdownNow(); - } - - /** - * Push the latest coordinator dynamic config, provided by the configManager to the Broker at the brokerLocation - * param. - */ - private void pushConfigToBroker(DiscoveryDruidNode broker) - { - final Stopwatch stopwatch = Stopwatch.createStarted(); - final ServiceLocation brokerLocation = CoordinatorDynamicConfigSyncer.convertDiscoveryNodeToServiceLocation(broker); - final BrokerClient brokerClient = new BrokerClientImpl( - clientFactory.makeClient( - NodeRole.BROKER.getJsonName(), - new FixedServiceLocator(brokerLocation), - StandardRetryPolicy.builder().maxAttempts(6).build() - ), - jsonMapper - ); - - try { - CoordinatorDynamicConfig currentDynamicConfig = configManager.getCurrentDynamicConfig(); - boolean success = brokerClient.updateCoordinatorDynamicConfig(currentDynamicConfig).get(); - if (success) { - markBrokerAsSynced(currentDynamicConfig, brokerLocation); - } - } - catch (Exception e) { - // Catch and ignore the exception, wait for the next sync. - log.error(e, "Exception while syncing dynamic configuration to broker[%s]", brokerLocation); - emitStat( - Stats.Configuration.BROKER_SYNC_ERROR, - RowKey.with(Dimension.SERVER, broker.getDruidNode().getHostAndPortToUse()).build(), - 1 - ); - } - emitStat( - Stats.Configuration.BROKER_SYNC_TIME, - RowKey.with(Dimension.SERVER, broker.getDruidNode().getHostAndPortToUse()).build(), - stopwatch.millisElapsed() + super( + clientFactory, + jsonMapper, + druidNodeDiscoveryProvider, + emitter, + Execs.scheduledSingleThreaded("CoordinatorDynamicConfigSyncer-%d") ); + this.configManager = configManager; } - /** - * Returns a collection of {@link DiscoveryDruidNode} for all brokers currently known to the druidNodeDiscovery. - */ - private Collection<DiscoveryDruidNode> getKnownBrokers() - { - return druidNodeDiscovery.getForNodeRole(NodeRole.BROKER) - .getAllNodes(); - } - - /** - * Clears the set of inSyncBrokers and updates the lastKnownConfig if the latest coordinator dynamic config is - * different from the config tracked by this class. - */ - private synchronized void invalidateInSyncBrokersIfNeeded() - { - final CoordinatorDynamicConfig currentDynamicConfig = configManager.getCurrentDynamicConfig(); - if (!currentDynamicConfig.equals(lastKnownConfig.get())) { - // Config has changed, clear the inSync list. - inSyncBrokers.clear(); - lastKnownConfig.set(currentDynamicConfig); - } - } - - /** - * Adds a broker to the set of inSyncBrokers if the coordinator dynamic config has not changed. - */ - private synchronized void markBrokerAsSynced(CoordinatorDynamicConfig config, ServiceLocation broker) + @Override + protected CoordinatorDynamicConfig getCurrentConfig() { - if (config.equals(lastKnownConfig.get())) { - inSyncBrokers.add(new BrokerSyncStatus(broker, System.currentTimeMillis())); - } + return configManager.getCurrentDynamicConfig(); } - /** - * Utility method to convert {@link DiscoveryDruidNode} to a {@link ServiceLocation} - */ - @Nullable - private static ServiceLocation convertDiscoveryNodeToServiceLocation(DiscoveryDruidNode discoveryDruidNode) + @Override + protected boolean pushConfigToBroker(BrokerClient brokerClient, CoordinatorDynamicConfig config) throws Exception { - final DruidNode druidNode = discoveryDruidNode.getDruidNode(); - if (druidNode == null) { - return null; - } - - return new ServiceLocation( - druidNode.getHost(), - druidNode.getPlaintextPort(), - druidNode.getTlsPort(), - "" - ); + return brokerClient.updateCoordinatorDynamicConfig(config).get(); } - private void emitStat(CoordinatorStat stat, RowKey rowKey, long value) + @Override + protected String getConfigTypeName() { - ServiceMetricEvent.Builder eventBuilder = new ServiceMetricEvent.Builder(); - rowKey.getValues().forEach( - (dim, dimValue) -> eventBuilder.setDimension(dim.reportedName(), dimValue) - ); - emitter.emit(eventBuilder.setMetric(stat.getMetricName(), value)); + return "coordinator dynamic configuration"; Review Comment: maybe make this a single phrase? `coordinator` or `broker` would show up better in logs/metric dimensions. ########## server/src/main/java/org/apache/druid/server/broker/BrokerDynamicConfig.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.broker; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.server.QueryBlocklistRule; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Dynamic configuration for brokers that can be changed at runtime. + * This configuration is stored in the metadata store and managed through the coordinator, + * but is consumed by brokers. + * + * @see org.apache.druid.common.config.JacksonConfigManager + */ +public class BrokerDynamicConfig +{ + public static final String CONFIG_KEY = "broker.config"; + + /** + * List of query blocklist rules for blocking queries on brokers dynamically. + */ + private final List<QueryBlocklistRule> queryBlocklist; + + @JsonCreator + public BrokerDynamicConfig( + @JsonProperty("queryBlocklist") @Nullable List<QueryBlocklistRule> queryBlocklist + ) + { + this.queryBlocklist = Builder.valueOrDefault(queryBlocklist, Collections.emptyList()); + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_EMPTY) + public List<QueryBlocklistRule> getQueryBlocklist() + { + return queryBlocklist; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BrokerDynamicConfig that = (BrokerDynamicConfig) o; + return Objects.equals(queryBlocklist, that.queryBlocklist); + } + + @Override + public int hashCode() + { + return Objects.hash(queryBlocklist); + } + + @Override + public String toString() + { + return "BrokerDynamicConfig{" + + "queryBlocklist=" + queryBlocklist + + '}'; + } + + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + { + private List<QueryBlocklistRule> queryBlocklist; + + public Builder() + { + } + + @JsonCreator + public Builder( + @JsonProperty("queryBlocklist") @Nullable List<QueryBlocklistRule> queryBlocklist + ) + { + this.queryBlocklist = queryBlocklist; + } + + public Builder withQueryBlocklist(List<QueryBlocklistRule> queryBlocklist) + { + this.queryBlocklist = queryBlocklist; + return this; + } + + public BrokerDynamicConfig build() + { + return new BrokerDynamicConfig(queryBlocklist); + } + + /** + * Builds a BrokerDynamicConfig using either the configured values, or + * the default value if not configured. + */ + public BrokerDynamicConfig build(@Nullable BrokerDynamicConfig defaults) + { + return new BrokerDynamicConfig( + valueOrDefault(queryBlocklist, defaults != null ? defaults.getQueryBlocklist() : null) + ); + } + + private static <T> T valueOrDefault(@Nullable T value, @NotNull T defaultValue) Review Comment: nit: let's use `Configs.valueOrDefault` ########## server/src/main/java/org/apache/druid/server/http/BaseDynamicConfigSyncer.java: ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.client.broker.BrokerClient; +import org.apache.druid.client.broker.BrokerClientImpl; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.java.util.common.Stopwatch; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.rpc.FixedServiceLocator; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocation; +import org.apache.druid.rpc.StandardRetryPolicy; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.coordinator.stats.CoordinatorStat; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Base class for syncing dynamic configuration to all brokers. + * Subclasses must implement: + * - {@link #getCurrentConfig()} to provide the latest config + * - {@link #pushConfigToBroker(BrokerClient, Object)} to push config via the appropriate BrokerClient method + * - {@link #getConfigTypeName()} for logging + * + * @param <T> the type of dynamic configuration (e.g., CoordinatorDynamicConfig, BrokerDynamicConfig) + */ +public abstract class BaseDynamicConfigSyncer<T> +{ + private static final Logger log = new Logger(BaseDynamicConfigSyncer.class); + + private final ObjectMapper jsonMapper; + private final DruidNodeDiscoveryProvider druidNodeDiscovery; + private final ServiceClientFactory clientFactory; + private final ScheduledExecutorService exec; + private final ServiceEmitter emitter; + private @Nullable Future<?> syncFuture = null; + + @GuardedBy("this") + private final Set<BrokerSyncStatus> inSyncBrokers; + private final AtomicReference<T> lastKnownConfig = new AtomicReference<>(); + + protected BaseDynamicConfigSyncer( + final ServiceClientFactory clientFactory, + final ObjectMapper jsonMapper, + final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + final ServiceEmitter emitter, + final ScheduledExecutorService exec + ) + { + this.clientFactory = clientFactory; + this.jsonMapper = jsonMapper; + this.druidNodeDiscovery = druidNodeDiscoveryProvider; + this.emitter = emitter; + this.exec = exec; + this.inSyncBrokers = ConcurrentHashMap.newKeySet(); + } + + /** + * Get the current configuration to broadcast to brokers. + */ + protected abstract T getCurrentConfig(); + + /** + * Push the config to a broker using the appropriate BrokerClient method. + * @return true if the push was successful + */ + protected abstract boolean pushConfigToBroker(BrokerClient brokerClient, T config) throws Exception; + + /** + * Get the name of this config type for logging (e.g., "coordinator dynamic configuration", "broker dynamic configuration"). + */ + protected abstract String getConfigTypeName(); + + /** + * Queues the configuration sync to the brokers without blocking the calling thread. + */ + public void queueBroadcastConfigToBrokers() + { + exec.submit(this::broadcastConfigToBrokers); + } + + /** + * Push the latest dynamic config to all currently known Brokers. Also + * invalidates the set of inSyncBrokers if the config has changed. + */ + @VisibleForTesting + public void broadcastConfigToBrokers() + { + invalidateInSyncBrokersIfNeeded(); + final Stopwatch stopwatch = Stopwatch.createStarted(); + for (DiscoveryDruidNode broker : getKnownBrokers()) { + pushConfigToBrokerNode(broker); + } + emitStat( + Stats.Configuration.TOTAL_SYNC_TIME, Review Comment: maybe now that we are syncing 2 types of configs to brokers, we should make the metric distinguish which config type is being pushed? I'm leaning more towards a dimension rather than a separate metric. ########## server/src/test/java/org/apache/druid/server/BrokerDynamicConfigResourceTest.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import org.apache.druid.client.BrokerViewOfBrokerConfig; +import org.apache.druid.client.BrokerViewOfCoordinatorConfig; +import org.apache.druid.server.broker.BrokerDynamicConfig; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import javax.ws.rs.core.Response; + +public class BrokerDynamicConfigResourceTest +{ + private BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig; + private BrokerViewOfBrokerConfig brokerViewOfBrokerConfig; + private BrokerDynamicConfigResource resource; + + @Before + public void setUp() + { + brokerViewOfCoordinatorConfig = Mockito.mock(BrokerViewOfCoordinatorConfig.class); + brokerViewOfBrokerConfig = Mockito.mock(BrokerViewOfBrokerConfig.class); + resource = new BrokerDynamicConfigResource( + brokerViewOfCoordinatorConfig, + brokerViewOfBrokerConfig + ); + } + + @Test + public void testGetBrokerDynamicConfig() + { + BrokerDynamicConfig config = new BrokerDynamicConfig(null); + Mockito.when(brokerViewOfBrokerConfig.getDynamicConfig()).thenReturn(config); + + Response response = resource.getBrokerDynamicConfig(); + + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(config, response.getEntity()); + Mockito.verify(brokerViewOfBrokerConfig, Mockito.times(1)).getDynamicConfig(); + } + + @Test + public void testSetBrokerDynamicConfig() + { + BrokerDynamicConfig config = new BrokerDynamicConfig(null); + + Response response = resource.setBrokerDynamicConfig(config); + + Assert.assertEquals(200, response.getStatus()); + Mockito.verify(brokerViewOfBrokerConfig, Mockito.times(1)).setDynamicConfig(config); + } + + @Test + public void testGetDynamicConfig() Review Comment: let's maybe rename these get/set to have coordinator dynamic in them (distinguish from broker dynamic). ########## server/src/main/java/org/apache/druid/client/BaseBrokerViewOfConfig.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.logger.Logger; + +import javax.validation.constraints.NotNull; + +/** + * Base class for broker's view of dynamic configuration fetched from the Coordinator. + * Subclasses must implement: + * {@link #fetchConfigFromClient()} to fetch configuration from their specific client</li> + * {@link #getConfigTypeName()} for logging purposes</li> + * + * @param <T> the type of dynamic configuration (e.g., CoordinatorDynamicConfig, BrokerDynamicConfig) + */ +public abstract class BaseBrokerViewOfConfig<T> Review Comment: could we make this template parameter's name more match what it is? e.g. `DynamicConfig` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
