This is an automated email from the ASF dual-hosted git repository.

FrankChen021 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ba191be9f6 perf: disable processing thread renaming by default 
(#19518)
5ba191be9f6 is described below

commit 5ba191be9f689a13a16001b3f1b20cc31bd559dd
Author: Virushade <[email protected]>
AuthorDate: Thu May 28 20:47:03 2026 +0800

    perf: disable processing thread renaming by default (#19518)
    
    * Add default value for thread enabling
    
    * Peon disable thread renaming
    
    * Add benchmark query types
    
    * Add groupby benchmark
    
    * Specify query type
    
    * Docs for thread
---
 .../benchmark/SinkQuerySegmentWalkerBenchmark.java | 358 +++++++++++++++++++--
 docs/querying/query-context-reference.md           |   3 +-
 .../query/spec/SpecificSegmentQueryRunner.java     |   3 +-
 .../query/spec/SpecificSegmentQueryRunnerTest.java |  99 +++++-
 4 files changed, 427 insertions(+), 36 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java
index 6b36d72c672..777dd297ecb 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java
@@ -19,29 +19,87 @@
 
 package org.apache.druid.benchmark;
 
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.client.cache.MapCache;
 import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.guice.BuiltInTypesModule;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.jackson.AggregatorsModule;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.core.LoggingEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
+import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
 import org.apache.druid.query.Druids;
+import org.apache.druid.query.ForwardingQueryProcessingPool;
+import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryPlus;
-import org.apache.druid.query.Result;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.QueryRunnerTestHelper;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
+import org.apache.druid.query.groupby.TestGroupByBuffers;
+import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
+import org.apache.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
+import org.apache.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
+import org.apache.druid.query.metadata.metadata.ListColumnIncluderator;
+import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
+import org.apache.druid.query.policy.NoopPolicyEnforcer;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.scan.ScanQueryConfig;
+import org.apache.druid.query.scan.ScanQueryEngine;
+import org.apache.druid.query.scan.ScanQueryQueryToolChest;
+import org.apache.druid.query.scan.ScanQueryRunnerFactory;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
-import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
+import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
+import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.TuningConfig;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.apache.druid.segment.realtime.appenderator.Appenderator;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
+import org.apache.druid.segment.realtime.appenderator.Appenderators;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
-import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorTester;
+import org.apache.druid.segment.realtime.appenderator.TestAppenderatorConfig;
 import org.apache.druid.segment.realtime.sink.Committers;
+import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
+import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -59,8 +117,11 @@ import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.infra.Blackhole;
 
 import java.io.File;
+import java.net.URI;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 @State(Scope.Benchmark)
@@ -71,6 +132,18 @@ import java.util.concurrent.TimeUnit;
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class SinkQuerySegmentWalkerBenchmark
 {
+  private static final String DATASOURCE = "foo";
+  private static final List<String> QUERY_COLUMNS = ImmutableList.of("__time", 
"dim", "count", "met");
+  private static final MultipleIntervalSegmentSpec QUERY_INTERVALS =
+      new 
MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2000/2001")));
+  private static final String SET_PROCESSING_THREAD_NAMES = 
"setProcessingThreadNames";
+
+  @Param({"timeseries", "scan", "segmentMetadata", "groupBy"})
+  private String queryType;
+
+  @Param({"false", "true"})
+  private boolean setProcessingThreadNames;
+
   @Param({"10", "50", "100", "200"})
   private int numFireHydrants;
 
@@ -78,24 +151,66 @@ public class SinkQuerySegmentWalkerBenchmark
   private final ServiceEmitter serviceEmitter = new ServiceEmitter("test", 
"test", loggingEmitter);
   private File cacheDir;
 
+  private ExecutorService queryExecutor;
   private Appenderator appenderator;
+  private TestGroupByBuffers groupByBuffers;
 
   @Setup(Level.Trial)
   public void setup() throws Exception
   {
     final String userConfiguredCacheDir = 
System.getProperty("druid.benchmark.cacheDir", 
System.getenv("DRUID_BENCHMARK_CACHE_DIR"));
     cacheDir = new File(userConfiguredCacheDir);
-    final StreamAppenderatorTester tester =
-        new StreamAppenderatorTester.Builder().maxRowsInMemory(1)
-                                              .basePersistDirectory(cacheDir)
-                                              
.withServiceEmitter(serviceEmitter)
-                                              .build();
+    FileUtils.deleteDirectory(cacheDir);
+    final ObjectMapper objectMapper = makeObjectMapper();
+    final IndexIO indexIO = new IndexIO(
+        objectMapper,
+        new ColumnConfig()
+        {
+        }
+    );
+    final IndexMergerV9 indexMerger = new IndexMergerV9(
+        objectMapper,
+        indexIO,
+        OffHeapMemorySegmentWriteOutMediumFactory.instance()
+    );
+    final DataSchema schema = makeDataSchema();
+    final RowIngestionMeters rowIngestionMeters = new 
SimpleRowIngestionMeters();
+    final AppenderatorConfig tuningConfig = makeTuningConfig();
+
+    queryExecutor = Execs.singleThreaded("queryExecutor(%d)");
+    groupByBuffers = TestGroupByBuffers.createDefault();
 
-    appenderator = tester.getAppenderator();
+    serviceEmitter.start();
+    EmittingLogger.registerEmitter(serviceEmitter);
+
+    final QueryRunnerFactoryConglomerate conglomerate = 
makeQueryRunnerFactoryConglomerate();
+    appenderator = Appenderators.createRealtime(
+        null,
+        schema.getDataSource(),
+        schema,
+        tuningConfig,
+        new SegmentGenerationMetrics(),
+        makeDataSegmentPusher(),
+        objectMapper,
+        indexIO,
+        indexMerger,
+        conglomerate,
+        new NoopDataSegmentAnnouncer(),
+        serviceEmitter,
+        new ForwardingQueryProcessingPool(queryExecutor),
+        MapCache.create(2048),
+        new CacheConfig(),
+        new CachePopulatorStats(),
+        NoopPolicyEnforcer.instance(),
+        rowIngestionMeters,
+        new ParseExceptionHandler(rowIngestionMeters, false, 
Integer.MAX_VALUE, 0),
+        CentralizedDatasourceSchemaConfig.create(),
+        interval -> {}
+    );
     appenderator.startJob();
 
     final SegmentIdWithShardSpec segmentIdWithShardSpec = new 
SegmentIdWithShardSpec(
-        StreamAppenderatorTester.DATASOURCE,
+        DATASOURCE,
         Intervals.of("2000/2001"),
         "A",
         new LinearShardSpec(0)
@@ -119,33 +234,214 @@ public class SinkQuerySegmentWalkerBenchmark
   @TearDown(Level.Trial)
   public void tearDown() throws Exception
   {
-    appenderator.close();
-    FileUtils.deleteDirectory(cacheDir);
+    try {
+      if (appenderator != null) {
+        appenderator.close();
+      }
+    }
+    finally {
+      if (queryExecutor != null) {
+        queryExecutor.shutdownNow();
+      }
+      try {
+        if (groupByBuffers != null) {
+          groupByBuffers.close();
+        }
+      }
+      finally {
+        FileUtils.deleteDirectory(cacheDir);
+      }
+    }
   }
 
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
-  public void emitSinkMetrics(Blackhole blackhole) throws Exception
+  public void runSinkQuery(Blackhole blackhole) throws Exception
   {
-    {
-      final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
-                                           
.dataSource(StreamAppenderatorTester.DATASOURCE)
-                                           
.intervals(ImmutableList.of(Intervals.of("2000/2001")))
-                                           .aggregators(
-                                               Arrays.asList(
-                                                   new 
LongSumAggregatorFactory("count", "count"),
-                                                   new 
LongSumAggregatorFactory("met", "met")
-                                               )
-                                           )
-                                           .granularity(Granularities.DAY)
-                                           .build();
-
-      final List<Result<TimeseriesResultValue>> results =
-          QueryPlus.wrap(query1).run(appenderator, 
ResponseContext.createEmpty()).toList();
-      blackhole.consume(results);
-
-      serviceEmitter.flush();
+    final Query<?> query = makeQuery();
+    final List<?> results = QueryPlus.wrap(query).run(appenderator, 
ResponseContext.createEmpty()).toList();
+    blackhole.consume(results);
+
+    serviceEmitter.flush();
+  }
+
+  private Query<?> makeQuery()
+  {
+    switch (queryType) {
+      case "timeseries":
+        return makeTimeseriesQuery();
+      case "scan":
+        return makeScanQuery();
+      case "segmentMetadata":
+        return makeSegmentMetadataQuery();
+      case "groupBy":
+        return makeGroupByQuery();
+      default:
+        throw new IllegalStateException("Unsupported query type[" + queryType 
+ "]");
     }
   }
+
+  private QueryRunnerFactoryConglomerate makeQueryRunnerFactoryConglomerate()
+  {
+    return DefaultQueryRunnerFactoryConglomerate.buildFromQueryRunnerFactories(
+        ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
+                    .put(
+                        TimeseriesQuery.class,
+                        new TimeseriesQueryRunnerFactory(
+                            new TimeseriesQueryQueryToolChest(),
+                            new TimeseriesQueryEngine(),
+                            QueryRunnerTestHelper.NOOP_QUERYWATCHER
+                        )
+                    )
+                    .put(
+                        ScanQuery.class,
+                        new ScanQueryRunnerFactory(
+                            new 
ScanQueryQueryToolChest(DefaultGenericQueryMetricsFactory.instance()),
+                            new ScanQueryEngine(),
+                            new ScanQueryConfig()
+                        )
+                    )
+                    .put(
+                        SegmentMetadataQuery.class,
+                        new SegmentMetadataQueryRunnerFactory(
+                            new SegmentMetadataQueryQueryToolChest(new 
SegmentMetadataQueryConfig()),
+                            QueryRunnerTestHelper.NOOP_QUERYWATCHER
+                        )
+                    )
+                    .put(
+                        GroupByQuery.class,
+                        GroupByQueryRunnerTest.makeQueryRunnerFactory(new 
GroupByQueryConfig(), groupByBuffers)
+                    )
+                    .build()
+    );
+  }
+
+  private TimeseriesQuery makeTimeseriesQuery()
+  {
+    return Druids.newTimeseriesQueryBuilder()
+                 .dataSource(DATASOURCE)
+                 .intervals(QUERY_INTERVALS)
+                 .aggregators(makeAggregators())
+                 .granularity(Granularities.DAY)
+                 .context(makeQueryContext())
+                 .build();
+  }
+
+  private ScanQuery makeScanQuery()
+  {
+    return Druids.newScanQueryBuilder()
+                 .dataSource(DATASOURCE)
+                 .intervals(QUERY_INTERVALS)
+                 .columns(QUERY_COLUMNS)
+                 
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                 .context(makeQueryContext())
+                 .build();
+  }
+
+  private SegmentMetadataQuery makeSegmentMetadataQuery()
+  {
+    return Druids.newSegmentMetadataQueryBuilder()
+                 .dataSource(DATASOURCE)
+                 .intervals(QUERY_INTERVALS)
+                 .toInclude(new ListColumnIncluderator(QUERY_COLUMNS))
+                 .analysisTypes(
+                     SegmentMetadataQuery.AnalysisType.CARDINALITY,
+                     SegmentMetadataQuery.AnalysisType.SIZE,
+                     SegmentMetadataQuery.AnalysisType.INTERVAL,
+                     SegmentMetadataQuery.AnalysisType.MINMAX,
+                     SegmentMetadataQuery.AnalysisType.AGGREGATORS
+                 )
+                 .merge(true)
+                 .context(makeQueryContext())
+                 .build();
+  }
+
+  private GroupByQuery makeGroupByQuery()
+  {
+    return GroupByQuery.builder()
+                       .setDataSource(DATASOURCE)
+                       .setInterval("2000/2001")
+                       .setGranularity(Granularities.ALL)
+                       .setAggregatorSpecs(makeAggregators())
+                       .setContext(makeQueryContext())
+                       .build();
+  }
+
+  private List<AggregatorFactory> makeAggregators()
+  {
+    return Arrays.asList(
+        new LongSumAggregatorFactory("count", "count"),
+        new LongSumAggregatorFactory("met", "met")
+    );
+  }
+
+  private Map<String, Object> makeQueryContext()
+  {
+    return ImmutableMap.of(SET_PROCESSING_THREAD_NAMES, 
setProcessingThreadNames);
+  }
+
+  private static ObjectMapper makeObjectMapper()
+  {
+    final ObjectMapper objectMapper = new DefaultObjectMapper();
+    objectMapper.registerSubtypes(LinearShardSpec.class);
+    objectMapper.registerModules(new AggregatorsModule());
+    objectMapper.registerModules(new BuiltInTypesModule().getJacksonModules());
+    objectMapper.setInjectableValues(
+        new InjectableValues.Std()
+            .addValue(ExprMacroTable.class.getName(), 
TestExprMacroTable.INSTANCE)
+            .addValue(ObjectMapper.class.getName(), objectMapper)
+    );
+    return objectMapper;
+  }
+
+  private static DataSchema makeDataSchema()
+  {
+    return DataSchema.builder()
+                     .withDataSource(DATASOURCE)
+                     .withTimestamp(new TimestampSpec("ts", "auto", null))
+                     .withDimensions(DimensionsSpec.EMPTY)
+                     .withAggregators(
+                         new CountAggregatorFactory("count"),
+                         new LongSumAggregatorFactory("met", "met")
+                     )
+                     .withGranularity(new 
UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null))
+                     .build();
+  }
+
+  private AppenderatorConfig makeTuningConfig()
+  {
+    return new TestAppenderatorConfig(
+        TuningConfig.DEFAULT_APPENDABLE_INDEX,
+        1,
+        Runtime.getRuntime().totalMemory() / 3,
+        false,
+        IndexSpec.getDefault(),
+        0,
+        false,
+        0L,
+        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+        IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE,
+        cacheDir,
+        false
+    );
+  }
+
+  private static DataSegmentPusher makeDataSegmentPusher()
+  {
+    return new DataSegmentPusher()
+    {
+      @Override
+      public DataSegment push(File file, DataSegment segment, boolean 
useUniquePath)
+      {
+        return segment;
+      }
+
+      @Override
+      public Map<String, Object> makeLoadSpec(URI uri)
+      {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
 }
diff --git a/docs/querying/query-context-reference.md 
b/docs/querying/query-context-reference.md
index c485c0231c0..41bd206199e 100644
--- a/docs/querying/query-context-reference.md
+++ b/docs/querying/query-context-reference.md
@@ -68,7 +68,7 @@ Unless otherwise noted, the following parameters apply to all 
query types, and t
 |`useFilterCNF`|`false`| If true, Druid will attempt to convert the query 
filter to Conjunctive Normal Form (CNF). During query processing, columns can 
be pre-filtered by intersecting the bitmap indexes of all values that match the 
eligible filters, often greatly reducing the raw number of rows which need to 
be scanned. But this effect only happens for the top level filter, or 
individual clauses of a top level 'and' filter. As such, filters in CNF 
potentially have a higher chance to util [...]
 |`secondaryPartitionPruning`|`true`|Enable secondary partition pruning on the 
Broker. The Broker will always prune unnecessary segments from the input scan 
based on a filter on time intervals, but if the data is further partitioned 
with hash or range partitioning, this option will enable additional pruning 
based on a filter on secondary partition dimensions.|
 |`debug`| `false` | Flag indicating whether to enable debugging outputs for 
the query. When set to false, no additional logs will be produced (logs 
produced will be entirely dependent on your logging level). When set to true, 
the following addition logs will be produced:<br />- Log the stack trace of the 
exception (if any) produced by the query |
-|`setProcessingThreadNames`|`true`| Whether processing thread names will be 
set to `queryType_dataSource_intervals` while processing a query. This aids in 
interpreting thread dumps, and is on by default. Query overhead can be reduced 
slightly by setting this to `false`. This has a tiny effect in most scenarios, 
but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
+|`setProcessingThreadNames`|`false`| Flag indicating whether processing thread 
names will be set to `processing_<queryId>` while processing a query. Thread 
renaming aids in interpreting thread dumps, but has measurable thread renaming 
overhead when segment scans are very quick. |
 |`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge 
two Project operators when inlining expressions causes complexity to increase. 
Implemented as a workaround to exception `There are not enough rules to produce 
a node with desired properties: convention=DRUID, sort=[]` thrown after 
rejecting the merge of two projects.|
 |`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should 
be queried by brokers. Clone servers are created by the `cloneServers` 
Coordinator dynamic configuration. Possible values are `excludeClones`, 
`includeClones` and `preferClones`. `excludeClones` means that clone 
Historicals are not queried by the broker. `preferClones` indicates that when 
given a choice between the clone Historical and the original Historical which 
is being cloned, the broker chooses the clones [...]
 |`realtimeSegmentsMode` |`include`| Controls whether realtime segments are 
queried. `include` queries all segments, including realtime. `exclude` skips 
realtime segments. `exclusive` queries only realtime segments. |
@@ -140,4 +140,3 @@ For more information, see the following topics:
 - [Set query context](./query-context.md) to learn how to configure query 
context parameters.
 - [SQL query context](sql-query-context.md) for query context parameters 
specific to Druid SQL.
 - [SQL-based ingestion 
reference](../multi-stage-query/reference/#context-parameters) for context 
parameters used in SQL-based ingestion (MSQ).
-
diff --git 
a/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java
 
b/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java
index 7f1a37f61e6..f888bf87e47 100644
--- 
a/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java
+++ 
b/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java
@@ -48,6 +48,7 @@ public class SpecificSegmentQueryRunner<T> implements 
QueryRunner<T>
 
   @VisibleForTesting
   static final String CTX_SET_THREAD_NAME = "setProcessingThreadNames";
+  static final boolean DEFAULT_SET_THREAD_NAME_ENABLED = false;
 
   public SpecificSegmentQueryRunner(
       QueryRunner<T> base,
@@ -68,7 +69,7 @@ public class SpecificSegmentQueryRunner<T> implements 
QueryRunner<T>
         )
     );
 
-    final boolean setName = 
input.getQuery().context().getBoolean(CTX_SET_THREAD_NAME, true);
+    final boolean setName = 
input.getQuery().context().getBoolean(CTX_SET_THREAD_NAME, 
DEFAULT_SET_THREAD_NAME_ENABLED);
 
     final Query<T> query = queryPlus.getQuery();
 
diff --git 
a/processing/src/test/java/org/apache/druid/query/spec/SpecificSegmentQueryRunnerTest.java
 
b/processing/src/test/java/org/apache/druid/query/spec/SpecificSegmentQueryRunnerTest.java
index 5ab3783869e..0101ce990b4 100644
--- 
a/processing/src/test/java/org/apache/druid/query/spec/SpecificSegmentQueryRunnerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/spec/SpecificSegmentQueryRunnerTest.java
@@ -51,6 +51,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class SpecificSegmentQueryRunnerTest
 {
@@ -182,8 +184,6 @@ public class SpecificSegmentQueryRunnerTest
                                           new CountAggregatorFactory("rows")
                                       )
                                   )
-                                  // Do one test with CTX_SET_THREAD_NAME = 
false.
-                                  
.context(ImmutableMap.of(SpecificSegmentQueryRunner.CTX_SET_THREAD_NAME, false))
                                   .build();
     Sequence results = queryRunner.run(QueryPlus.wrap(query), responseContext);
     List<Result<TimeseriesResultValue>> res = results.toList();
@@ -197,6 +197,101 @@ public class SpecificSegmentQueryRunnerTest
     validate(mapper, descriptor, responseContext);
   }
 
+  @Test
+  public void testSetThreadName()
+  {
+    assertThreadNameDuringProcessing(null, "original-test-thread");
+    assertThreadNameDuringProcessing(false, "original-test-thread");
+    assertThreadNameDuringProcessing(true, "processing_thread-name-query");
+  }
+
+  private void assertThreadNameDuringProcessing(
+      final Boolean setProcessingThreadNames,
+      final String expectedThreadNameDuringProcessing
+  )
+  {
+    final String originalThreadName = Thread.currentThread().getName();
+
+    try {
+      Thread.currentThread().setName("original-test-thread");
+
+      final AtomicReference<String> runnerThreadName = new AtomicReference<>();
+      final AtomicReference<String> sequenceThreadName = new 
AtomicReference<>();
+      final Result<TimeseriesResultValue> value = makeResult();
+      final SegmentDescriptor descriptor = new SegmentDescriptor(
+          Intervals.of("2012-01-01T00:00:00Z/P1D"),
+          "version",
+          0
+      );
+
+      final SpecificSegmentQueryRunner<Result<TimeseriesResultValue>> 
queryRunner = new SpecificSegmentQueryRunner<>(
+          new QueryRunner<>()
+          {
+            @Override
+            public Sequence<Result<TimeseriesResultValue>> run(
+                QueryPlus<Result<TimeseriesResultValue>> queryPlus,
+                ResponseContext responseContext
+            )
+            {
+              runnerThreadName.set(Thread.currentThread().getName());
+              return Sequences.withEffect(
+                  Sequences.simple(Collections.singletonList(value)),
+                  () -> 
sequenceThreadName.set(Thread.currentThread().getName()),
+                  Execs.directExecutor()
+              );
+            }
+          },
+          new SpecificSegmentSpec(descriptor)
+      );
+
+      final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                          .dataSource("foo")
+                                          .granularity(Granularities.ALL)
+                                          
.intervals(ImmutableList.of(Intervals.of("2012-01-01T00:00:00Z/P1D")))
+                                          .aggregators(
+                                              ImmutableList.of(
+                                                  new 
CountAggregatorFactory("rows")
+                                              )
+                                          )
+                                          
.context(makeThreadNameContext(setProcessingThreadNames))
+                                          .queryId("thread-name-query")
+                                          .build();
+
+      final Sequence<Result<TimeseriesResultValue>> results = queryRunner.run(
+          QueryPlus.wrap(query),
+          ResponseContext.createEmpty()
+      );
+      results.toList();
+
+      Assertions.assertEquals(expectedThreadNameDuringProcessing, 
runnerThreadName.get());
+      Assertions.assertEquals(expectedThreadNameDuringProcessing, 
sequenceThreadName.get());
+      Assertions.assertEquals("original-test-thread", 
Thread.currentThread().getName());
+    }
+    finally {
+      Thread.currentThread().setName(originalThreadName);
+    }
+  }
+
+  private static Map<String, Object> makeThreadNameContext(final Boolean 
setProcessingThreadNames)
+  {
+    if (setProcessingThreadNames == null) {
+      return Collections.emptyMap();
+    } else {
+      return ImmutableMap.of(SpecificSegmentQueryRunner.CTX_SET_THREAD_NAME, 
setProcessingThreadNames);
+    }
+  }
+
+  private static Result<TimeseriesResultValue> makeResult()
+  {
+    final TimeseriesResultBuilder builder = new TimeseriesResultBuilder(
+        DateTimes.of("2012-01-01T00:00:00Z")
+    );
+    final CountAggregator rows = new CountAggregator();
+    rows.aggregate();
+    builder.addMetric("rows", rows.get());
+    return builder.build();
+  }
+
   private void validate(ObjectMapper mapper, SegmentDescriptor descriptor, 
ResponseContext responseContext)
       throws IOException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to