This is an automated email from the ASF dual-hosted git repository.
FrankChen021 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5ba191be9f6 perf: disable processing thread renaming by default
(#19518)
5ba191be9f6 is described below
commit 5ba191be9f689a13a16001b3f1b20cc31bd559dd
Author: Virushade <[email protected]>
AuthorDate: Thu May 28 20:47:03 2026 +0800
perf: disable processing thread renaming by default (#19518)
* Add default value for thread enabling
* Peon disable thread renaming
* Add benchmark query types
* Add groupby benchmark
* Specify query type
* Docs for thread
---
.../benchmark/SinkQuerySegmentWalkerBenchmark.java | 358 +++++++++++++++++++--
docs/querying/query-context-reference.md | 3 +-
.../query/spec/SpecificSegmentQueryRunner.java | 3 +-
.../query/spec/SpecificSegmentQueryRunnerTest.java | 99 +++++-
4 files changed, 427 insertions(+), 36 deletions(-)
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java
index 6b36d72c672..777dd297ecb 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java
@@ -19,29 +19,87 @@
package org.apache.druid.benchmark;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.client.cache.MapCache;
import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.guice.BuiltInTypesModule;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.jackson.AggregatorsModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.LoggingEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
+import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.Druids;
+import org.apache.druid.query.ForwardingQueryProcessingPool;
+import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
-import org.apache.druid.query.Result;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.QueryRunnerTestHelper;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
+import org.apache.druid.query.groupby.TestGroupByBuffers;
+import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
+import org.apache.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
+import org.apache.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
+import org.apache.druid.query.metadata.metadata.ListColumnIncluderator;
+import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
+import org.apache.druid.query.policy.NoopPolicyEnforcer;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.scan.ScanQueryConfig;
+import org.apache.druid.query.scan.ScanQueryEngine;
+import org.apache.druid.query.scan.ScanQueryQueryToolChest;
+import org.apache.druid.query.scan.ScanQueryRunnerFactory;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
-import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
+import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
+import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.TuningConfig;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
+import org.apache.druid.segment.realtime.appenderator.Appenderators;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
-import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorTester;
+import org.apache.druid.segment.realtime.appenderator.TestAppenderatorConfig;
import org.apache.druid.segment.realtime.sink.Committers;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
+import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -59,8 +117,11 @@ import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
+import java.net.URI;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@@ -71,6 +132,18 @@ import java.util.concurrent.TimeUnit;
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class SinkQuerySegmentWalkerBenchmark
{
+ private static final String DATASOURCE = "foo";
+ private static final List<String> QUERY_COLUMNS = ImmutableList.of("__time",
"dim", "count", "met");
+ private static final MultipleIntervalSegmentSpec QUERY_INTERVALS =
+ new
MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2000/2001")));
+ private static final String SET_PROCESSING_THREAD_NAMES =
"setProcessingThreadNames";
+
+ @Param({"timeseries", "scan", "segmentMetadata", "groupBy"})
+ private String queryType;
+
+ @Param({"false", "true"})
+ private boolean setProcessingThreadNames;
+
@Param({"10", "50", "100", "200"})
private int numFireHydrants;
@@ -78,24 +151,66 @@ public class SinkQuerySegmentWalkerBenchmark
private final ServiceEmitter serviceEmitter = new ServiceEmitter("test",
"test", loggingEmitter);
private File cacheDir;
+ private ExecutorService queryExecutor;
private Appenderator appenderator;
+ private TestGroupByBuffers groupByBuffers;
@Setup(Level.Trial)
public void setup() throws Exception
{
final String userConfiguredCacheDir =
System.getProperty("druid.benchmark.cacheDir",
System.getenv("DRUID_BENCHMARK_CACHE_DIR"));
cacheDir = new File(userConfiguredCacheDir);
- final StreamAppenderatorTester tester =
- new StreamAppenderatorTester.Builder().maxRowsInMemory(1)
- .basePersistDirectory(cacheDir)
-
.withServiceEmitter(serviceEmitter)
- .build();
+ FileUtils.deleteDirectory(cacheDir);
+ final ObjectMapper objectMapper = makeObjectMapper();
+ final IndexIO indexIO = new IndexIO(
+ objectMapper,
+ new ColumnConfig()
+ {
+ }
+ );
+ final IndexMergerV9 indexMerger = new IndexMergerV9(
+ objectMapper,
+ indexIO,
+ OffHeapMemorySegmentWriteOutMediumFactory.instance()
+ );
+ final DataSchema schema = makeDataSchema();
+ final RowIngestionMeters rowIngestionMeters = new
SimpleRowIngestionMeters();
+ final AppenderatorConfig tuningConfig = makeTuningConfig();
+
+ queryExecutor = Execs.singleThreaded("queryExecutor(%d)");
+ groupByBuffers = TestGroupByBuffers.createDefault();
- appenderator = tester.getAppenderator();
+ serviceEmitter.start();
+ EmittingLogger.registerEmitter(serviceEmitter);
+
+ final QueryRunnerFactoryConglomerate conglomerate =
makeQueryRunnerFactoryConglomerate();
+ appenderator = Appenderators.createRealtime(
+ null,
+ schema.getDataSource(),
+ schema,
+ tuningConfig,
+ new SegmentGenerationMetrics(),
+ makeDataSegmentPusher(),
+ objectMapper,
+ indexIO,
+ indexMerger,
+ conglomerate,
+ new NoopDataSegmentAnnouncer(),
+ serviceEmitter,
+ new ForwardingQueryProcessingPool(queryExecutor),
+ MapCache.create(2048),
+ new CacheConfig(),
+ new CachePopulatorStats(),
+ NoopPolicyEnforcer.instance(),
+ rowIngestionMeters,
+ new ParseExceptionHandler(rowIngestionMeters, false,
Integer.MAX_VALUE, 0),
+ CentralizedDatasourceSchemaConfig.create(),
+ interval -> {}
+ );
appenderator.startJob();
final SegmentIdWithShardSpec segmentIdWithShardSpec = new
SegmentIdWithShardSpec(
- StreamAppenderatorTester.DATASOURCE,
+ DATASOURCE,
Intervals.of("2000/2001"),
"A",
new LinearShardSpec(0)
@@ -119,33 +234,214 @@ public class SinkQuerySegmentWalkerBenchmark
@TearDown(Level.Trial)
public void tearDown() throws Exception
{
- appenderator.close();
- FileUtils.deleteDirectory(cacheDir);
+ try {
+ if (appenderator != null) {
+ appenderator.close();
+ }
+ }
+ finally {
+ if (queryExecutor != null) {
+ queryExecutor.shutdownNow();
+ }
+ try {
+ if (groupByBuffers != null) {
+ groupByBuffers.close();
+ }
+ }
+ finally {
+ FileUtils.deleteDirectory(cacheDir);
+ }
+ }
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
- public void emitSinkMetrics(Blackhole blackhole) throws Exception
+ public void runSinkQuery(Blackhole blackhole) throws Exception
{
- {
- final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
-
.dataSource(StreamAppenderatorTester.DATASOURCE)
-
.intervals(ImmutableList.of(Intervals.of("2000/2001")))
- .aggregators(
- Arrays.asList(
- new
LongSumAggregatorFactory("count", "count"),
- new
LongSumAggregatorFactory("met", "met")
- )
- )
- .granularity(Granularities.DAY)
- .build();
-
- final List<Result<TimeseriesResultValue>> results =
- QueryPlus.wrap(query1).run(appenderator,
ResponseContext.createEmpty()).toList();
- blackhole.consume(results);
-
- serviceEmitter.flush();
+ final Query<?> query = makeQuery();
+ final List<?> results = QueryPlus.wrap(query).run(appenderator,
ResponseContext.createEmpty()).toList();
+ blackhole.consume(results);
+
+ serviceEmitter.flush();
+ }
+
+ private Query<?> makeQuery()
+ {
+ switch (queryType) {
+ case "timeseries":
+ return makeTimeseriesQuery();
+ case "scan":
+ return makeScanQuery();
+ case "segmentMetadata":
+ return makeSegmentMetadataQuery();
+ case "groupBy":
+ return makeGroupByQuery();
+ default:
+ throw new IllegalStateException("Unsupported query type[" + queryType
+ "]");
}
}
+
+ private QueryRunnerFactoryConglomerate makeQueryRunnerFactoryConglomerate()
+ {
+ return DefaultQueryRunnerFactoryConglomerate.buildFromQueryRunnerFactories(
+ ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
+ .put(
+ TimeseriesQuery.class,
+ new TimeseriesQueryRunnerFactory(
+ new TimeseriesQueryQueryToolChest(),
+ new TimeseriesQueryEngine(),
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER
+ )
+ )
+ .put(
+ ScanQuery.class,
+ new ScanQueryRunnerFactory(
+ new
ScanQueryQueryToolChest(DefaultGenericQueryMetricsFactory.instance()),
+ new ScanQueryEngine(),
+ new ScanQueryConfig()
+ )
+ )
+ .put(
+ SegmentMetadataQuery.class,
+ new SegmentMetadataQueryRunnerFactory(
+ new SegmentMetadataQueryQueryToolChest(new
SegmentMetadataQueryConfig()),
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER
+ )
+ )
+ .put(
+ GroupByQuery.class,
+ GroupByQueryRunnerTest.makeQueryRunnerFactory(new
GroupByQueryConfig(), groupByBuffers)
+ )
+ .build()
+ );
+ }
+
+ private TimeseriesQuery makeTimeseriesQuery()
+ {
+ return Druids.newTimeseriesQueryBuilder()
+ .dataSource(DATASOURCE)
+ .intervals(QUERY_INTERVALS)
+ .aggregators(makeAggregators())
+ .granularity(Granularities.DAY)
+ .context(makeQueryContext())
+ .build();
+ }
+
+ private ScanQuery makeScanQuery()
+ {
+ return Druids.newScanQueryBuilder()
+ .dataSource(DATASOURCE)
+ .intervals(QUERY_INTERVALS)
+ .columns(QUERY_COLUMNS)
+
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+ .context(makeQueryContext())
+ .build();
+ }
+
+ private SegmentMetadataQuery makeSegmentMetadataQuery()
+ {
+ return Druids.newSegmentMetadataQueryBuilder()
+ .dataSource(DATASOURCE)
+ .intervals(QUERY_INTERVALS)
+ .toInclude(new ListColumnIncluderator(QUERY_COLUMNS))
+ .analysisTypes(
+ SegmentMetadataQuery.AnalysisType.CARDINALITY,
+ SegmentMetadataQuery.AnalysisType.SIZE,
+ SegmentMetadataQuery.AnalysisType.INTERVAL,
+ SegmentMetadataQuery.AnalysisType.MINMAX,
+ SegmentMetadataQuery.AnalysisType.AGGREGATORS
+ )
+ .merge(true)
+ .context(makeQueryContext())
+ .build();
+ }
+
+ private GroupByQuery makeGroupByQuery()
+ {
+ return GroupByQuery.builder()
+ .setDataSource(DATASOURCE)
+ .setInterval("2000/2001")
+ .setGranularity(Granularities.ALL)
+ .setAggregatorSpecs(makeAggregators())
+ .setContext(makeQueryContext())
+ .build();
+ }
+
+ private List<AggregatorFactory> makeAggregators()
+ {
+ return Arrays.asList(
+ new LongSumAggregatorFactory("count", "count"),
+ new LongSumAggregatorFactory("met", "met")
+ );
+ }
+
+ private Map<String, Object> makeQueryContext()
+ {
+ return ImmutableMap.of(SET_PROCESSING_THREAD_NAMES,
setProcessingThreadNames);
+ }
+
+ private static ObjectMapper makeObjectMapper()
+ {
+ final ObjectMapper objectMapper = new DefaultObjectMapper();
+ objectMapper.registerSubtypes(LinearShardSpec.class);
+ objectMapper.registerModules(new AggregatorsModule());
+ objectMapper.registerModules(new BuiltInTypesModule().getJacksonModules());
+ objectMapper.setInjectableValues(
+ new InjectableValues.Std()
+ .addValue(ExprMacroTable.class.getName(),
TestExprMacroTable.INSTANCE)
+ .addValue(ObjectMapper.class.getName(), objectMapper)
+ );
+ return objectMapper;
+ }
+
+ private static DataSchema makeDataSchema()
+ {
+ return DataSchema.builder()
+ .withDataSource(DATASOURCE)
+ .withTimestamp(new TimestampSpec("ts", "auto", null))
+ .withDimensions(DimensionsSpec.EMPTY)
+ .withAggregators(
+ new CountAggregatorFactory("count"),
+ new LongSumAggregatorFactory("met", "met")
+ )
+ .withGranularity(new
UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null))
+ .build();
+ }
+
+ private AppenderatorConfig makeTuningConfig()
+ {
+ return new TestAppenderatorConfig(
+ TuningConfig.DEFAULT_APPENDABLE_INDEX,
+ 1,
+ Runtime.getRuntime().totalMemory() / 3,
+ false,
+ IndexSpec.getDefault(),
+ 0,
+ false,
+ 0L,
+ OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+ IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE,
+ cacheDir,
+ false
+ );
+ }
+
+ private static DataSegmentPusher makeDataSegmentPusher()
+ {
+ return new DataSegmentPusher()
+ {
+ @Override
+ public DataSegment push(File file, DataSegment segment, boolean
useUniquePath)
+ {
+ return segment;
+ }
+
+ @Override
+ public Map<String, Object> makeLoadSpec(URI uri)
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
}
diff --git a/docs/querying/query-context-reference.md
b/docs/querying/query-context-reference.md
index c485c0231c0..41bd206199e 100644
--- a/docs/querying/query-context-reference.md
+++ b/docs/querying/query-context-reference.md
@@ -68,7 +68,7 @@ Unless otherwise noted, the following parameters apply to all
query types, and t
|`useFilterCNF`|`false`| If true, Druid will attempt to convert the query
filter to Conjunctive Normal Form (CNF). During query processing, columns can
be pre-filtered by intersecting the bitmap indexes of all values that match the
eligible filters, often greatly reducing the raw number of rows which need to
be scanned. But this effect only happens for the top level filter, or
individual clauses of a top level 'and' filter. As such, filters in CNF
potentially have a higher chance to util [...]
|`secondaryPartitionPruning`|`true`|Enable secondary partition pruning on the
Broker. The Broker will always prune unnecessary segments from the input scan
based on a filter on time intervals, but if the data is further partitioned
with hash or range partitioning, this option will enable additional pruning
based on a filter on secondary partition dimensions.|
|`debug`| `false` | Flag indicating whether to enable debugging outputs for
the query. When set to false, no additional logs will be produced (logs
produced will be entirely dependent on your logging level). When set to true,
the following addition logs will be produced:<br />- Log the stack trace of the
exception (if any) produced by the query |
-|`setProcessingThreadNames`|`true`| Whether processing thread names will be
set to `queryType_dataSource_intervals` while processing a query. This aids in
interpreting thread dumps, and is on by default. Query overhead can be reduced
slightly by setting this to `false`. This has a tiny effect in most scenarios,
but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
+|`setProcessingThreadNames`|`false`| Flag indicating whether processing thread
names will be set to `processing_<queryId>` while processing a query. Thread
renaming aids in interpreting thread dumps, but has measurable thread renaming
overhead when segment scans are very quick. |
|`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge
two Project operators when inlining expressions causes complexity to increase.
Implemented as a workaround to exception `There are not enough rules to produce
a node with desired properties: convention=DRUID, sort=[]` thrown after
rejecting the merge of two projects.|
|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should
be queried by brokers. Clone servers are created by the `cloneServers`
Coordinator dynamic configuration. Possible values are `excludeClones`,
`includeClones` and `preferClones`. `excludeClones` means that clone
Historicals are not queried by the broker. `preferClones` indicates that when
given a choice between the clone Historical and the original Historical which
is being cloned, the broker chooses the clones [...]
|`realtimeSegmentsMode` |`include`| Controls whether realtime segments are
queried. `include` queries all segments, including realtime. `exclude` skips
realtime segments. `exclusive` queries only realtime segments. |
@@ -140,4 +140,3 @@ For more information, see the following topics:
- [Set query context](./query-context.md) to learn how to configure query
context parameters.
- [SQL query context](sql-query-context.md) for query context parameters
specific to Druid SQL.
- [SQL-based ingestion
reference](../multi-stage-query/reference/#context-parameters) for context
parameters used in SQL-based ingestion (MSQ).
-
diff --git
a/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java
b/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java
index 7f1a37f61e6..f888bf87e47 100644
---
a/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java
+++
b/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java
@@ -48,6 +48,7 @@ public class SpecificSegmentQueryRunner<T> implements
QueryRunner<T>
@VisibleForTesting
static final String CTX_SET_THREAD_NAME = "setProcessingThreadNames";
+ static final boolean DEFAULT_SET_THREAD_NAME_ENABLED = false;
public SpecificSegmentQueryRunner(
QueryRunner<T> base,
@@ -68,7 +69,7 @@ public class SpecificSegmentQueryRunner<T> implements
QueryRunner<T>
)
);
- final boolean setName =
input.getQuery().context().getBoolean(CTX_SET_THREAD_NAME, true);
+ final boolean setName =
input.getQuery().context().getBoolean(CTX_SET_THREAD_NAME,
DEFAULT_SET_THREAD_NAME_ENABLED);
final Query<T> query = queryPlus.getQuery();
diff --git
a/processing/src/test/java/org/apache/druid/query/spec/SpecificSegmentQueryRunnerTest.java
b/processing/src/test/java/org/apache/druid/query/spec/SpecificSegmentQueryRunnerTest.java
index 5ab3783869e..0101ce990b4 100644
---
a/processing/src/test/java/org/apache/druid/query/spec/SpecificSegmentQueryRunnerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/spec/SpecificSegmentQueryRunnerTest.java
@@ -51,6 +51,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
public class SpecificSegmentQueryRunnerTest
{
@@ -182,8 +184,6 @@ public class SpecificSegmentQueryRunnerTest
new CountAggregatorFactory("rows")
)
)
- // Do one test with CTX_SET_THREAD_NAME =
false.
-
.context(ImmutableMap.of(SpecificSegmentQueryRunner.CTX_SET_THREAD_NAME, false))
.build();
Sequence results = queryRunner.run(QueryPlus.wrap(query), responseContext);
List<Result<TimeseriesResultValue>> res = results.toList();
@@ -197,6 +197,101 @@ public class SpecificSegmentQueryRunnerTest
validate(mapper, descriptor, responseContext);
}
+ @Test
+ public void testSetThreadName()
+ {
+ assertThreadNameDuringProcessing(null, "original-test-thread");
+ assertThreadNameDuringProcessing(false, "original-test-thread");
+ assertThreadNameDuringProcessing(true, "processing_thread-name-query");
+ }
+
+ private void assertThreadNameDuringProcessing(
+ final Boolean setProcessingThreadNames,
+ final String expectedThreadNameDuringProcessing
+ )
+ {
+ final String originalThreadName = Thread.currentThread().getName();
+
+ try {
+ Thread.currentThread().setName("original-test-thread");
+
+ final AtomicReference<String> runnerThreadName = new AtomicReference<>();
+ final AtomicReference<String> sequenceThreadName = new
AtomicReference<>();
+ final Result<TimeseriesResultValue> value = makeResult();
+ final SegmentDescriptor descriptor = new SegmentDescriptor(
+ Intervals.of("2012-01-01T00:00:00Z/P1D"),
+ "version",
+ 0
+ );
+
+ final SpecificSegmentQueryRunner<Result<TimeseriesResultValue>>
queryRunner = new SpecificSegmentQueryRunner<>(
+ new QueryRunner<>()
+ {
+ @Override
+ public Sequence<Result<TimeseriesResultValue>> run(
+ QueryPlus<Result<TimeseriesResultValue>> queryPlus,
+ ResponseContext responseContext
+ )
+ {
+ runnerThreadName.set(Thread.currentThread().getName());
+ return Sequences.withEffect(
+ Sequences.simple(Collections.singletonList(value)),
+ () ->
sequenceThreadName.set(Thread.currentThread().getName()),
+ Execs.directExecutor()
+ );
+ }
+ },
+ new SpecificSegmentSpec(descriptor)
+ );
+
+ final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+ .dataSource("foo")
+ .granularity(Granularities.ALL)
+
.intervals(ImmutableList.of(Intervals.of("2012-01-01T00:00:00Z/P1D")))
+ .aggregators(
+ ImmutableList.of(
+ new
CountAggregatorFactory("rows")
+ )
+ )
+
.context(makeThreadNameContext(setProcessingThreadNames))
+ .queryId("thread-name-query")
+ .build();
+
+ final Sequence<Result<TimeseriesResultValue>> results = queryRunner.run(
+ QueryPlus.wrap(query),
+ ResponseContext.createEmpty()
+ );
+ results.toList();
+
+ Assertions.assertEquals(expectedThreadNameDuringProcessing,
runnerThreadName.get());
+ Assertions.assertEquals(expectedThreadNameDuringProcessing,
sequenceThreadName.get());
+ Assertions.assertEquals("original-test-thread",
Thread.currentThread().getName());
+ }
+ finally {
+ Thread.currentThread().setName(originalThreadName);
+ }
+ }
+
+ private static Map<String, Object> makeThreadNameContext(final Boolean
setProcessingThreadNames)
+ {
+ if (setProcessingThreadNames == null) {
+ return Collections.emptyMap();
+ } else {
+ return ImmutableMap.of(SpecificSegmentQueryRunner.CTX_SET_THREAD_NAME,
setProcessingThreadNames);
+ }
+ }
+
+ private static Result<TimeseriesResultValue> makeResult()
+ {
+ final TimeseriesResultBuilder builder = new TimeseriesResultBuilder(
+ DateTimes.of("2012-01-01T00:00:00Z")
+ );
+ final CountAggregator rows = new CountAggregator();
+ rows.aggregate();
+ builder.addMetric("rows", rows.get());
+ return builder.build();
+ }
+
private void validate(ObjectMapper mapper, SegmentDescriptor descriptor,
ResponseContext responseContext)
throws IOException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]