This is an automated email from the ASF dual-hosted git repository. abhishekrb pushed a commit to branch refactor_ds_lookup_holder in repository https://gitbox.apache.org/repos/asf/druid.git
commit 64b8b734109bead03ee0d75dc670b4515454b000 Author: Abhishek Balaji Radhakrishnan <[email protected]> AuthorDate: Sun Nov 9 20:20:34 2025 -0800 Move lookup and broadcast datasource load spec configs to its own class to avoid confusion. --- .../lookup/LookupListeningAnnouncerConfig.java | 17 ++++++---- .../query/lookup/LookupReferencesManager.java | 4 +-- .../segment/realtime/ChatHandlerResource.java | 6 ++-- .../coordination/SegmentCacheBootstrapper.java | 14 ++++---- .../jetty/ChatHandlerServerModule.java | 6 ++-- .../jetty/CliIndexerServerModule.java | 4 +-- .../initialization/jetty/JettyServerModule.java | 6 ++-- .../druid/server/metrics/GroupByStatsMonitor.java | 6 ++-- ...SourceTaskIdHolder.java => LoadSpecHolder.java} | 39 +++------------------- .../apache/druid/server/metrics/MetricsModule.java | 36 ++++++++++---------- ...TaskIdHolder.java => TaskPropertiesHolder.java} | 31 +++-------------- .../lookup/LookupListeningAnnouncerConfigTest.java | 13 ++++---- .../segment/realtime/ChatHandlerResourceTest.java | 8 ++--- .../SegmentCacheBootstrapperCacheTest.java | 8 ++--- .../coordination/SegmentCacheBootstrapperTest.java | 19 ++++++----- .../server/metrics/GroupByStatsMonitorTest.java | 16 ++++----- .../druid/server/metrics/MetricsModuleTest.java | 8 ++--- .../main/java/org/apache/druid/cli/CliPeon.java | 11 +++--- 18 files changed, 105 insertions(+), 147 deletions(-) diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java index 0f6ef52fc50..2fdb8fe94dc 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java @@ -25,12 +25,15 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; -import org.apache.druid.server.metrics.DataSourceTaskIdHolder; +import org.apache.druid.server.metrics.LoadSpecHolder; +import org.apache.druid.server.metrics.TaskPropertiesHolder; class LookupListeningAnnouncerConfig { public static final String DEFAULT_TIER = "__default"; - private final DataSourceTaskIdHolder dataSourceTaskIdHolder; + private final LoadSpecHolder loadSpecHolder; + private final TaskPropertiesHolder taskPropsHolder; + @JsonProperty("lookupTier") private String lookupTier = null; @JsonProperty("lookupTierIsDatasource") @@ -38,10 +41,12 @@ class LookupListeningAnnouncerConfig @JsonCreator public LookupListeningAnnouncerConfig( - @JacksonInject DataSourceTaskIdHolder dataSourceTaskIdHolder + @JacksonInject LoadSpecHolder loadSpecHolder, + @JacksonInject TaskPropertiesHolder taskPropsHolder ) { - this.dataSourceTaskIdHolder = dataSourceTaskIdHolder; + this.loadSpecHolder = loadSpecHolder; + this.taskPropsHolder = taskPropsHolder; } public String getLookupTier() @@ -50,7 +55,7 @@ class LookupListeningAnnouncerConfig !(lookupTierIsDatasource && null != lookupTier), "Cannot specify both `lookupTier` and `lookupTierIsDatasource`" ); - final String lookupTier = lookupTierIsDatasource ? dataSourceTaskIdHolder.getDataSource() : this.lookupTier; + final String lookupTier = lookupTierIsDatasource ? taskPropsHolder.getDataSource() : this.lookupTier; return Preconditions.checkNotNull( lookupTier == null ? DEFAULT_TIER : StringUtils.emptyToNullNonDruidDataString(lookupTier), @@ -61,6 +66,6 @@ class LookupListeningAnnouncerConfig public LookupLoadingSpec getLookupLoadingSpec() { - return dataSourceTaskIdHolder.getLookupLoadingSpec(); + return loadSpecHolder.getLookupLoadingSpec(); } } diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java index 94c333122b7..22ee57313ec 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java @@ -42,7 +42,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.http.ServletResourceUtils; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; -import org.apache.druid.server.metrics.DataSourceTaskIdHolder; +import org.apache.druid.server.metrics.LoadSpecHolder; import javax.annotation.Nullable; import java.io.File; @@ -380,7 +380,7 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP } /** - * Load a set of lookups based on the injected value in {@link DataSourceTaskIdHolder#getLookupLoadingSpec()}. + * Load a set of lookups based on the injected value in {@link LoadSpecHolder#getLookupLoadingSpec()}. */ private void loadLookupsAndInitStateRef() { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/ChatHandlerResource.java b/server/src/main/java/org/apache/druid/segment/realtime/ChatHandlerResource.java index 4423a0d0f0f..99d9d6c627f 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/ChatHandlerResource.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/ChatHandlerResource.java @@ -25,7 +25,7 @@ import com.google.inject.Inject; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.initialization.jetty.BadRequestException; import org.apache.druid.server.initialization.jetty.ServiceUnavailableException; -import org.apache.druid.server.metrics.DataSourceTaskIdHolder; +import org.apache.druid.server.metrics.TaskPropertiesHolder; import javax.ws.rs.Path; import javax.ws.rs.PathParam; @@ -42,10 +42,10 @@ public class ChatHandlerResource private final String taskId; @Inject - public ChatHandlerResource(final ChatHandlerProvider handlers, final DataSourceTaskIdHolder taskIdHolder) + public ChatHandlerResource(final ChatHandlerProvider handlers, final TaskPropertiesHolder taskPropsHolder) { this.handlers = handlers; - this.taskId = taskIdHolder.getTaskId(); + this.taskId = taskPropsHolder.getTaskId(); } @Path("/{id}") diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentCacheBootstrapper.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentCacheBootstrapper.java index b58d0641cad..c31df919407 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentCacheBootstrapper.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentCacheBootstrapper.java @@ -39,7 +39,7 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.server.SegmentManager; -import org.apache.druid.server.metrics.DataSourceTaskIdHolder; +import org.apache.druid.server.metrics.LoadSpecHolder; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; @@ -82,7 +82,7 @@ public class SegmentCacheBootstrapper private static final EmittingLogger log = new EmittingLogger(SegmentCacheBootstrapper.class); - private final DataSourceTaskIdHolder datasourceHolder; + private final LoadSpecHolder loadSpecHolder; @Inject public SegmentCacheBootstrapper( @@ -94,7 +94,7 @@ public class SegmentCacheBootstrapper ServerTypeConfig serverTypeConfig, CoordinatorClient coordinatorClient, ServiceEmitter emitter, - DataSourceTaskIdHolder datasourceHolder + LoadSpecHolder loadSpecHolder ) { this.loadDropHandler = loadDropHandler; @@ -105,7 +105,7 @@ public class SegmentCacheBootstrapper this.serverTypeConfig = serverTypeConfig; this.coordinatorClient = coordinatorClient; this.emitter = emitter; - this.datasourceHolder = datasourceHolder; + this.loadSpecHolder = loadSpecHolder; } @LifecycleStart @@ -270,12 +270,12 @@ public class SegmentCacheBootstrapper /** * @return a list of bootstrap segments. When bootstrap segments cannot be found, an empty list is returned. - * The bootstrap segments returned are filtered by the broadcast datasources indicated by {@link DataSourceTaskIdHolder#getBroadcastDatasourceLoadingSpec()} + * The bootstrap segments returned are filtered by the broadcast datasources indicated by {@link LoadSpecHolder#getBroadcastDatasourceLoadingSpec()} * if applicable. */ private List<DataSegment> getBootstrapSegments() { - final BroadcastDatasourceLoadingSpec.Mode mode = datasourceHolder.getBroadcastDatasourceLoadingSpec().getMode(); + final BroadcastDatasourceLoadingSpec.Mode mode = loadSpecHolder.getBroadcastDatasourceLoadingSpec().getMode(); if (mode == BroadcastDatasourceLoadingSpec.Mode.NONE) { log.info("Skipping fetch of bootstrap segments."); return ImmutableList.of(); @@ -290,7 +290,7 @@ public class SegmentCacheBootstrapper final BootstrapSegmentsResponse response = FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(), true); if (mode == BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED) { - final Set<String> broadcastDatasourcesToLoad = datasourceHolder.getBroadcastDatasourceLoadingSpec().getBroadcastDatasourcesToLoad(); + final Set<String> broadcastDatasourcesToLoad = loadSpecHolder.getBroadcastDatasourceLoadingSpec().getBroadcastDatasourcesToLoad(); final List<DataSegment> filteredBroadcast = new ArrayList<>(); response.getIterator().forEachRemaining(segment -> { if (broadcastDatasourcesToLoad.contains(segment.getDataSource())) { diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java index 8ee3d5ba081..86efcf0b63c 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java @@ -35,7 +35,7 @@ import org.apache.druid.segment.realtime.ChatHandlerResource; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.initialization.TLSServerConfig; -import org.apache.druid.server.metrics.DataSourceTaskIdHolder; +import org.apache.druid.server.metrics.TaskPropertiesHolder; import org.apache.druid.server.security.TLSCertificateChecker; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.util.ssl.SslContextFactory; @@ -83,10 +83,10 @@ public class ChatHandlerServerModule implements Module @Provides @LazySingleton public TaskIdResponseHeaderFilterHolder taskIdResponseHeaderFilterHolderBuilder( - final DataSourceTaskIdHolder taskIdHolder + final TaskPropertiesHolder taskPropsHolder ) { - return new TaskIdResponseHeaderFilterHolder("/druid/worker/v1/chat/*", taskIdHolder.getTaskId()); + return new TaskIdResponseHeaderFilterHolder("/druid/worker/v1/chat/*", taskPropsHolder.getTaskId()); } @Provides diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java index 76fd422b32a..56ba7784df9 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java @@ -36,7 +36,7 @@ import org.apache.druid.segment.realtime.ChatHandlerResource; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.initialization.TLSServerConfig; -import org.apache.druid.server.metrics.DataSourceTaskIdHolder; +import org.apache.druid.server.metrics.TaskPropertiesHolder; import org.apache.druid.server.security.TLSCertificateChecker; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.util.ssl.SslContextFactory; @@ -109,7 +109,7 @@ public class CliIndexerServerModule implements Module @Provides @LazySingleton public TaskIdResponseHeaderFilterHolder taskIdResponseHeaderFilterHolderBuilder( - final DataSourceTaskIdHolder taskIdHolder + final TaskPropertiesHolder taskIdHolder ) { return new TaskIdResponseHeaderFilterHolder("/druid/worker/v1/chat/*", taskIdHolder.getTaskId()); diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java index 6593e67947e..987f2416536 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java @@ -53,9 +53,9 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.StatusResource; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.initialization.TLSServerConfig; -import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import org.apache.druid.server.metrics.MetricsModule; import org.apache.druid.server.metrics.MonitorsConfig; +import org.apache.druid.server.metrics.TaskPropertiesHolder; import org.apache.druid.server.security.CustomCheckX509TrustManager; import org.apache.druid.server.security.TLSCertificateChecker; import org.eclipse.jetty.server.ConnectionFactory; @@ -517,11 +517,11 @@ public class JettyServerModule extends JerseyServletModule @Provides @LazySingleton - public JettyMonitor getJettyMonitor(DataSourceTaskIdHolder dataSourceTaskIdHolder) + public JettyMonitor getJettyMonitor(TaskPropertiesHolder taskPropsHolder) { return new JettyMonitor( MonitorsConfig.mapOfDatasourceAndTaskID( - dataSourceTaskIdHolder.getDataSource(), dataSourceTaskIdHolder.getTaskId() + taskPropsHolder.getDataSource(), taskPropsHolder.getTaskId() ) ); } diff --git a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java index 0f07bd2894b..50b56b74dbc 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java @@ -49,14 +49,14 @@ public class GroupByStatsMonitor extends AbstractMonitor public GroupByStatsMonitor( GroupByStatsProvider groupByStatsProvider, @Merging BlockingPool<ByteBuffer> mergeBufferPool, - DataSourceTaskIdHolder dataSourceTaskIdHolder + TaskPropertiesHolder taskPropsHolder ) { this.groupByStatsProvider = groupByStatsProvider; this.mergeBufferPool = mergeBufferPool; this.dimensions = MonitorsConfig.mapOfDatasourceAndTaskID( - dataSourceTaskIdHolder.getDataSource(), - dataSourceTaskIdHolder.getTaskId() + taskPropsHolder.getDataSource(), + taskPropsHolder.getTaskId() ); } diff --git a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java b/server/src/main/java/org/apache/druid/server/metrics/LoadSpecHolder.java similarity index 63% copy from server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java copy to server/src/main/java/org/apache/druid/server/metrics/LoadSpecHolder.java index d5eec77f87f..b63bc568b54 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java +++ b/server/src/main/java/org/apache/druid/server/metrics/LoadSpecHolder.java @@ -24,51 +24,22 @@ import com.google.inject.name.Named; import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; -import javax.annotation.Nullable; - /** - * This holder is only applicable to {@code CliPeon} servers. + * A holder applicable to all servers that contains load specifications such as {@link LookupLoadingSpec} + * and {@link BroadcastDatasourceLoadingSpec}. */ -public class DataSourceTaskIdHolder +public class LoadSpecHolder { - public static final String DATA_SOURCE_BINDING = "druidDataSource"; - public static final String TASK_ID_BINDING = "druidTaskId"; public static final String LOOKUPS_TO_LOAD_FOR_TASK = "lookupsToLoadForTask"; public static final String BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK = "broadcastDatasourcesToLoadForTask"; - @Named(DATA_SOURCE_BINDING) - @Inject(optional = true) - String dataSource = null; - - @Named(TASK_ID_BINDING) - @Inject(optional = true) - String taskId = null; - @Named(LOOKUPS_TO_LOAD_FOR_TASK) @Inject(optional = true) - LookupLoadingSpec lookupLoadingSpec = LookupLoadingSpec.ALL; + final LookupLoadingSpec lookupLoadingSpec = LookupLoadingSpec.ALL; @Named(BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK) @Inject(optional = true) - BroadcastDatasourceLoadingSpec broadcastDatasourceLoadingSpec = BroadcastDatasourceLoadingSpec.ALL; - - /** - * @return the taskId for CliPeon servers; {@code null} for all other servers. - */ - @Nullable - public String getDataSource() - { - return dataSource; - } - - /** - * @return the dataSource for CliPeon servers; {@code null} for all other servers. - */ - @Nullable - public String getTaskId() - { - return taskId; - } + final BroadcastDatasourceLoadingSpec broadcastDatasourceLoadingSpec = BroadcastDatasourceLoadingSpec.ALL; public LookupLoadingSpec getLookupLoadingSpec() { diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java index d6a25bea4dd..8f135b8c6af 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java @@ -87,7 +87,7 @@ public class MetricsModule implements Module DruidBinders.metricMonitorBinder(binder); // get the binder so that it will inject the empty set at a minimum. - binder.bind(DataSourceTaskIdHolder.class).in(LazySingleton.class); + binder.bind(TaskPropertiesHolder.class).in(LazySingleton.class); binder.bind(ExecutorServiceMonitor.class).in(LazySingleton.class); @@ -111,8 +111,8 @@ public class MetricsModule implements Module List<Monitor> monitors = new ArrayList<>(); // HACK: when ServiceStatusMonitor is the first to be loaded, it introduces a circular dependency between // CliPeon.runTask and CliPeon.getDataSourceFromTask/CliPeon.getTaskIDFromTask. The reason for this is unclear - // but by injecting DataSourceTaskIdHolder early this cycle is avoided. - injector.getInstance(DataSourceTaskIdHolder.class); + // but by injecting TaskPropertiesHolder early this cycle is avoided. + injector.getInstance(TaskPropertiesHolder.class); for (Class<? extends Monitor> monitorClass : Iterables.concat(monitorsConfig.getMonitors(), monitorSet)) { if (shouldLoadMonitor(monitorClass, nodeRoles)) { monitors.add(injector.getInstance(monitorClass)); @@ -150,12 +150,12 @@ public class MetricsModule implements Module @Provides @ManageLifecycle public JvmMonitor getJvmMonitor( - DataSourceTaskIdHolder dataSourceTaskIdHolder + TaskPropertiesHolder taskPropsHolder ) { Map<String, String[]> dimensions = MonitorsConfig.mapOfDatasourceAndTaskID( - dataSourceTaskIdHolder.getDataSource(), - dataSourceTaskIdHolder.getTaskId() + taskPropsHolder.getDataSource(), + taskPropsHolder.getTaskId() ); return new JvmMonitor(dimensions); } @@ -163,37 +163,37 @@ public class MetricsModule implements Module @Provides @ManageLifecycle public JvmCpuMonitor getJvmCpuMonitor( - DataSourceTaskIdHolder dataSourceTaskIdHolder + TaskPropertiesHolder taskPropsHolder ) { Map<String, String[]> dimensions = MonitorsConfig.mapOfDatasourceAndTaskID( - dataSourceTaskIdHolder.getDataSource(), - dataSourceTaskIdHolder.getTaskId() + taskPropsHolder.getDataSource(), + taskPropsHolder.getTaskId() ); return new JvmCpuMonitor(dimensions); } @Provides @ManageLifecycle - public JvmThreadsMonitor getJvmThreadsMonitor(DataSourceTaskIdHolder dataSourceTaskIdHolder) + public JvmThreadsMonitor getJvmThreadsMonitor(TaskPropertiesHolder taskPropsHolder) { Map<String, String[]> dimensions = MonitorsConfig.mapOfDatasourceAndTaskID( - dataSourceTaskIdHolder.getDataSource(), - dataSourceTaskIdHolder.getTaskId() + taskPropsHolder.getDataSource(), + taskPropsHolder.getTaskId() ); return new JvmThreadsMonitor(dimensions); } @Provides @ManageLifecycle - public SysMonitor getSysMonitor(DataSourceTaskIdHolder dataSourceTaskIdHolder, @Self Set<NodeRole> nodeRoles) + public SysMonitor getSysMonitor(TaskPropertiesHolder taskPropsHolder, @Self Set<NodeRole> nodeRoles) { if (nodeRoles.contains(NodeRole.PEON)) { return new NoopSysMonitor(); } else { Map<String, String[]> dimensions = MonitorsConfig.mapOfDatasourceAndTaskID( - dataSourceTaskIdHolder.getDataSource(), - dataSourceTaskIdHolder.getTaskId() + taskPropsHolder.getDataSource(), + taskPropsHolder.getTaskId() ); return new SysMonitor(dimensions); } @@ -202,7 +202,7 @@ public class MetricsModule implements Module @Provides @ManageLifecycle public OshiSysMonitor getOshiSysMonitor( - DataSourceTaskIdHolder dataSourceTaskIdHolder, + TaskPropertiesHolder taskPropsHolder, @Self Set<NodeRole> nodeRoles, OshiSysMonitorConfig oshiSysConfig ) @@ -211,8 +211,8 @@ public class MetricsModule implements Module return new NoopOshiSysMonitor(); } else { Map<String, String[]> dimensions = MonitorsConfig.mapOfDatasourceAndTaskID( - dataSourceTaskIdHolder.getDataSource(), - dataSourceTaskIdHolder.getTaskId() + taskPropsHolder.getDataSource(), + taskPropsHolder.getTaskId() ); return new OshiSysMonitor(dimensions, oshiSysConfig); } diff --git a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java b/server/src/main/java/org/apache/druid/server/metrics/TaskPropertiesHolder.java similarity index 60% rename from server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java rename to server/src/main/java/org/apache/druid/server/metrics/TaskPropertiesHolder.java index d5eec77f87f..96e25e96764 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java +++ b/server/src/main/java/org/apache/druid/server/metrics/TaskPropertiesHolder.java @@ -21,36 +21,25 @@ package org.apache.druid.server.metrics; import com.google.inject.Inject; import com.google.inject.name.Named; -import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; -import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import javax.annotation.Nullable; /** - * This holder is only applicable to {@code CliPeon} servers. + * A holder for task-specific properties applicable to {@code CliPeon} servers. + * For other server types, the getter methods in this holder will return {@code null}. */ -public class DataSourceTaskIdHolder +public class TaskPropertiesHolder { public static final String DATA_SOURCE_BINDING = "druidDataSource"; public static final String TASK_ID_BINDING = "druidTaskId"; - public static final String LOOKUPS_TO_LOAD_FOR_TASK = "lookupsToLoadForTask"; - public static final String BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK = "broadcastDatasourcesToLoadForTask"; @Named(DATA_SOURCE_BINDING) @Inject(optional = true) - String dataSource = null; + final String dataSource = null; @Named(TASK_ID_BINDING) @Inject(optional = true) - String taskId = null; - - @Named(LOOKUPS_TO_LOAD_FOR_TASK) - @Inject(optional = true) - LookupLoadingSpec lookupLoadingSpec = LookupLoadingSpec.ALL; - - @Named(BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK) - @Inject(optional = true) - BroadcastDatasourceLoadingSpec broadcastDatasourceLoadingSpec = BroadcastDatasourceLoadingSpec.ALL; + final String taskId = null; /** * @return the taskId for CliPeon servers; {@code null} for all other servers. @@ -69,14 +58,4 @@ public class DataSourceTaskIdHolder { return taskId; } - - public LookupLoadingSpec getLookupLoadingSpec() - { - return lookupLoadingSpec; - } - - public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() - { - return broadcastDatasourceLoadingSpec; - } } diff --git a/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java b/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java index 75cbfa2fd1e..533e56bd6c9 100644 --- a/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java +++ b/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java @@ -33,7 +33,8 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.initialization.Initialization; import org.apache.druid.server.DruidNode; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; -import org.apache.druid.server.metrics.DataSourceTaskIdHolder; +import org.apache.druid.server.metrics.LoadSpecHolder; +import org.apache.druid.server.metrics.TaskPropertiesHolder; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -59,12 +60,12 @@ public class LookupListeningAnnouncerConfigTest new DruidNode("test-inject", null, false, null, null, true, false) ); binder - .bind(Key.get(String.class, Names.named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING))) + .bind(Key.get(String.class, Names.named(TaskPropertiesHolder.DATA_SOURCE_BINDING))) .toInstance("some_datasource"); final List<String> lookupsToLoad = Arrays.asList("lookupName1", "lookupName2"); binder.bind(new TypeLiteral<List<String>>() {}) - .annotatedWith(Names.named(DataSourceTaskIdHolder.LOOKUPS_TO_LOAD_FOR_TASK)) + .annotatedWith(Names.named(LoadSpecHolder.LOOKUPS_TO_LOAD_FOR_TASK)) .toInstance(lookupsToLoad); } }, @@ -139,9 +140,9 @@ public class LookupListeningAnnouncerConfigTest @Test public void testLookupsToLoadInjection() { - final DataSourceTaskIdHolder dimensionIdHolder = new DataSourceTaskIdHolder(); - injector.injectMembers(dimensionIdHolder); - Assert.assertEquals(LookupLoadingSpec.Mode.ALL, dimensionIdHolder.getLookupLoadingSpec().getMode()); + final LoadSpecHolder loadSpecHolder = new LoadSpecHolder(); + injector.injectMembers(loadSpecHolder); + Assert.assertEquals(LookupLoadingSpec.Mode.ALL, loadSpecHolder.getLookupLoadingSpec().getMode()); } @Test(expected = IllegalArgumentException.class) diff --git a/server/src/test/java/org/apache/druid/segment/realtime/ChatHandlerResourceTest.java b/server/src/test/java/org/apache/druid/segment/realtime/ChatHandlerResourceTest.java index 21d99c62225..06324f4eb06 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/ChatHandlerResourceTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/ChatHandlerResourceTest.java @@ -22,7 +22,7 @@ package org.apache.druid.segment.realtime; import com.google.common.base.Optional; import org.apache.druid.server.initialization.jetty.ServiceUnavailableException; -import org.apache.druid.server.metrics.DataSourceTaskIdHolder; +import org.apache.druid.server.metrics.TaskPropertiesHolder; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; @@ -38,18 +38,18 @@ public class ChatHandlerResourceTest extends EasyMockSupport @Mock ChatHandlerProvider handlers; @Mock - DataSourceTaskIdHolder dataSourceTaskIdHolder; + TaskPropertiesHolder taskPropsHolder; ChatHandlerResource chatHandlerResource; @Test public void test_noHandlerFound() { String handlerId = "handlerId"; - EasyMock.expect(dataSourceTaskIdHolder.getTaskId()).andReturn(null); + EasyMock.expect(taskPropsHolder.getTaskId()).andReturn(null); EasyMock.expect(handlers.get(handlerId)).andReturn(Optional.absent()); replayAll(); - chatHandlerResource = new ChatHandlerResource(handlers, dataSourceTaskIdHolder); + chatHandlerResource = new ChatHandlerResource(handlers, taskPropsHolder); Assert.assertThrows(ServiceUnavailableException.class, () -> chatHandlerResource.doTaskChat(handlerId, null)); verifyAll(); } diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentCacheBootstrapperCacheTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentCacheBootstrapperCacheTest.java index 8c870fcb757..7e33f8cd887 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentCacheBootstrapperCacheTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentCacheBootstrapperCacheTest.java @@ -35,7 +35,7 @@ import org.apache.druid.segment.loading.SegmentLocalCacheManager; import org.apache.druid.segment.loading.StorageLocation; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; -import org.apache.druid.server.metrics.DataSourceTaskIdHolder; +import org.apache.druid.server.metrics.LoadSpecHolder; import org.apache.druid.timeline.DataSegment; import org.junit.Assert; import org.junit.Before; @@ -143,7 +143,7 @@ public class SegmentCacheBootstrapperCacheTest new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, emitter, - new DataSourceTaskIdHolder() + new LoadSpecHolder() ); bootstrapper.start(); @@ -171,7 +171,7 @@ public class SegmentCacheBootstrapperCacheTest new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, emitter, - new DataSourceTaskIdHolder() + new LoadSpecHolder() ); bootstrapper.start(); @@ -211,7 +211,7 @@ public class SegmentCacheBootstrapperCacheTest new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, emitter, - new DataSourceTaskIdHolder() + new LoadSpecHolder() ); bootstrapper.start(); diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentCacheBootstrapperTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentCacheBootstrapperTest.java index 031f9f5f958..94410903e0d 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentCacheBootstrapperTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentCacheBootstrapperTest.java @@ -36,7 +36,7 @@ import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; -import org.apache.druid.server.metrics.DataSourceTaskIdHolder; +import org.apache.druid.server.metrics.LoadSpecHolder; import org.apache.druid.test.utils.TestSegmentCacheManager; import org.apache.druid.timeline.DataSegment; import org.junit.Assert; @@ -137,7 +137,7 @@ public class SegmentCacheBootstrapperTest new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, serviceEmitter, - new DataSourceTaskIdHolder() + new LoadSpecHolder() ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); @@ -196,7 +196,7 @@ public class SegmentCacheBootstrapperTest new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, serviceEmitter, - new DataSourceTaskIdHolder() + new LoadSpecHolder() ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); @@ -252,7 +252,7 @@ public class SegmentCacheBootstrapperTest new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, serviceEmitter, - new DataSourceTaskIdHolder() + new LoadSpecHolder() ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); @@ -295,7 +295,7 @@ public class SegmentCacheBootstrapperTest binder -> { binder.bindScope(LazySingleton.class, Scopes.SINGLETON); final BroadcastDatasourceLoadingSpec broadcastMode = BroadcastDatasourceLoadingSpec.NONE; - binder.bind(Key.get(BroadcastDatasourceLoadingSpec.class, Names.named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK))) + binder.bind(Key.get(BroadcastDatasourceLoadingSpec.class, Names.named(LoadSpecHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK))) .toInstance(broadcastMode); } ); @@ -317,7 +317,7 @@ public class SegmentCacheBootstrapperTest new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, serviceEmitter, - injector.getInstance(DataSourceTaskIdHolder.class) + injector.getInstance(LoadSpecHolder.class) ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); @@ -355,7 +355,8 @@ public class SegmentCacheBootstrapperTest binder -> { binder.bindScope(LazySingleton.class, Scopes.SINGLETON); final BroadcastDatasourceLoadingSpec broadcastMode = BroadcastDatasourceLoadingSpec.loadOnly(ImmutableSet.of("test1")); - binder.bind(Key.get(BroadcastDatasourceLoadingSpec.class, Names.named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK))) + binder.bind(Key.get(BroadcastDatasourceLoadingSpec.class, Names.named( + LoadSpecHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK))) .toInstance(broadcastMode); } ); @@ -377,7 +378,7 @@ public class SegmentCacheBootstrapperTest new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, serviceEmitter, - injector.getInstance(DataSourceTaskIdHolder.class) + injector.getInstance(LoadSpecHolder.class) ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); @@ -418,7 +419,7 @@ public class SegmentCacheBootstrapperTest new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, serviceEmitter, - new DataSourceTaskIdHolder() + new LoadSpecHolder() ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); diff --git a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java index 5931fba677c..b3d332a7fd1 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java @@ -84,7 +84,7 @@ public class GroupByStatsMonitorTest public void testMonitor() { final GroupByStatsMonitor monitor = - new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new DataSourceTaskIdHolder()); + new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new TaskPropertiesHolder()); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); emitter.flush(); @@ -114,16 +114,16 @@ public class GroupByStatsMonitorTest Key.get(DruidNode.class, Self.class), new DruidNode("test-inject", null, false, null, null, true, false) ); - binder.bind(Key.get(String.class, Names.named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING))) + binder.bind(Key.get(String.class, Names.named(TaskPropertiesHolder.DATA_SOURCE_BINDING))) .toInstance(dataSource); - binder.bind(Key.get(String.class, Names.named(DataSourceTaskIdHolder.TASK_ID_BINDING))) + binder.bind(Key.get(String.class, Names.named(TaskPropertiesHolder.TASK_ID_BINDING))) .toInstance(taskId); }) ); - final DataSourceTaskIdHolder dsTaskIdHolder = new DataSourceTaskIdHolder(); - injector.injectMembers(dsTaskIdHolder); + final TaskPropertiesHolder taskPropertiesHolder = new TaskPropertiesHolder(); + injector.injectMembers(taskPropertiesHolder); final GroupByStatsMonitor monitor = - new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, dsTaskIdHolder); + new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, taskPropertiesHolder); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); emitter.flush(); @@ -153,7 +153,7 @@ public class GroupByStatsMonitorTest }).get(20, TimeUnit.SECONDS); final GroupByStatsMonitor monitor = - new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new DataSourceTaskIdHolder()); + new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new TaskPropertiesHolder()); final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost"); boolean ret = monitor.doMonitor(emitter); Assert.assertTrue(ret); @@ -182,7 +182,7 @@ public class GroupByStatsMonitorTest } final GroupByStatsMonitor monitor = - new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new DataSourceTaskIdHolder()); + new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new TaskPropertiesHolder()); final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost"); boolean ret = monitor.doMonitor(emitter); Assert.assertTrue(ret); diff --git a/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java b/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java index 9337f04c11e..628fbee00f8 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java @@ -92,7 +92,7 @@ public class MetricsModuleTest } }) ); - final DataSourceTaskIdHolder dimensionIdHolder = new DataSourceTaskIdHolder(); + final TaskPropertiesHolder dimensionIdHolder = new TaskPropertiesHolder(); injector.injectMembers(dimensionIdHolder); Assert.assertNull(dimensionIdHolder.getDataSource()); Assert.assertNull(dimensionIdHolder.getTaskId()); @@ -115,14 +115,14 @@ public class MetricsModuleTest Key.get(DruidNode.class, Self.class), new DruidNode("test-inject", null, false, null, null, true, false) ); - binder.bind(Key.get(String.class, Names.named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING))) + binder.bind(Key.get(String.class, Names.named(TaskPropertiesHolder.DATA_SOURCE_BINDING))) .toInstance(dataSource); - binder.bind(Key.get(String.class, Names.named(DataSourceTaskIdHolder.TASK_ID_BINDING))) + binder.bind(Key.get(String.class, Names.named(TaskPropertiesHolder.TASK_ID_BINDING))) .toInstance(taskId); } }) ); - final DataSourceTaskIdHolder dimensionIdHolder = new DataSourceTaskIdHolder(); + final TaskPropertiesHolder dimensionIdHolder = new TaskPropertiesHolder(); injector.injectMembers(dimensionIdHolder); Assert.assertEquals(dataSource, dimensionIdHolder.getDataSource()); Assert.assertEquals(taskId, dimensionIdHolder.getTaskId()); diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index a5373a9b50a..79f55b660ab 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -130,8 +130,9 @@ import org.apache.druid.server.http.SegmentListerResource; import org.apache.druid.server.initialization.jetty.ChatHandlerServerModule; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; -import org.apache.druid.server.metrics.DataSourceTaskIdHolder; +import org.apache.druid.server.metrics.LoadSpecHolder; import org.apache.druid.server.metrics.ServiceStatusMonitor; +import org.apache.druid.server.metrics.TaskPropertiesHolder; import org.apache.druid.storage.local.LocalTmpStorageConfig; import org.apache.druid.tasklogs.TaskPayloadManager; import org.eclipse.jetty.server.Server; @@ -331,7 +332,7 @@ public class CliPeon extends GuiceRunnable @Provides @LazySingleton - @Named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING) + @Named(TaskPropertiesHolder.DATA_SOURCE_BINDING) public String getDataSourceFromTask(final Task task) { return task.getDataSource(); @@ -339,7 +340,7 @@ public class CliPeon extends GuiceRunnable @Provides @LazySingleton - @Named(DataSourceTaskIdHolder.TASK_ID_BINDING) + @Named(TaskPropertiesHolder.TASK_ID_BINDING) public String getTaskIDFromTask(final Task task) { return task.getId(); @@ -347,7 +348,7 @@ public class CliPeon extends GuiceRunnable @Provides @LazySingleton - @Named(DataSourceTaskIdHolder.LOOKUPS_TO_LOAD_FOR_TASK) + @Named(LoadSpecHolder.LOOKUPS_TO_LOAD_FOR_TASK) public LookupLoadingSpec getLookupsToLoad(final Task task) { return task.getLookupLoadingSpec(); @@ -355,7 +356,7 @@ public class CliPeon extends GuiceRunnable @Provides @LazySingleton - @Named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK) + @Named(LoadSpecHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK) public BroadcastDatasourceLoadingSpec getBroadcastDatasourcesToLoad(final Task task) { return task.getBroadcastDatasourceLoadingSpec(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
