This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b43c91411c Remove cpuTimeAcc from 
DataSource#createSegmentMapFunction's signature (#17623)
5b43c91411c is described below

commit 5b43c91411cd5bacb000bc4b0386995f2c8834aa
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Wed Feb 12 09:36:13 2025 +0100

    Remove cpuTimeAcc from DataSource#createSegmentMapFunction's signature 
(#17623)
    
    This patch removes cpuTimeAcc from DataSource#createSegmentMapFunction 
signature.
    
    the method being executed doesn't need to know internally that they are 
being measured; it also complicates code-flow a bit - as it tries to avoid 
double counting.
---
 .../querykit/BaseLeafFrameProcessorFactory.java    |   3 +-
 .../BroadcastJoinSegmentMapFnProcessor.java        |   2 +-
 .../druid/msq/querykit/InputNumberDataSource.java  |   3 +-
 .../msq/querykit/SimpleSegmentMapFnProcessor.java  |   2 +-
 .../java/org/apache/druid/query/DataSource.java    |   3 +-
 .../org/apache/druid/query/FilteredDataSource.java |  18 +--
 .../druid/query/FrameBasedInlineDataSource.java    |   3 +-
 .../org/apache/druid/query/InlineDataSource.java   |   6 +-
 .../org/apache/druid/query/JoinDataSource.java     | 150 ++++++++++-----------
 .../org/apache/druid/query/LookupDataSource.java   |   6 +-
 .../org/apache/druid/query/QueryDataSource.java    |   6 +-
 .../apache/druid/query/RestrictedDataSource.java   |  17 +--
 .../org/apache/druid/query/TableDataSource.java    |   6 +-
 .../org/apache/druid/query/UnionDataSource.java    |   6 +-
 .../org/apache/druid/query/UnnestDataSource.java   |  14 +-
 .../org/apache/druid/query/union/UnionQuery.java   |   3 +-
 .../apache/druid/query/QueryDataSourceTest.java    |   9 +-
 .../apache/druid/query/QueryRunnerTestHelper.java  |   3 +-
 .../apache/druid/segment/join/NoopDataSource.java  |   6 +-
 .../appenderator/SinkQuerySegmentWalker.java       |  14 +-
 .../druid/server/LocalQuerySegmentWalker.java      |   6 +-
 .../druid/server/coordination/ServerManager.java   |   8 +-
 .../server/TestClusterQuerySegmentWalker.java      |   4 +-
 .../sql/calcite/external/ExternalDataSource.java   |   4 +-
 24 files changed, 114 insertions(+), 188 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
index 013b6d4c93c..3e6db775393 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
@@ -58,7 +58,6 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -163,7 +162,7 @@ public abstract class BaseLeafFrameProcessorFactory extends 
BaseFrameProcessorFa
 
     if (segmentMapFnProcessor == null) {
       final Function<SegmentReference, SegmentReference> segmentMapFn =
-          query.getDataSource().createSegmentMapFunction(query, new 
AtomicLong());
+          query.getDataSource().createSegmentMapFunction(query);
       processorManager = 
processorManagerFn.apply(ImmutableList.of(segmentMapFn));
     } else {
       processorManager = new ChainedProcessorManager<>(ProcessorManagers.of(() 
-> segmentMapFnProcessor), processorManagerFn);
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java
index b47a450aa00..d3ce49d5f8e 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java
@@ -193,7 +193,7 @@ public class BroadcastJoinSegmentMapFnProcessor implements 
FrameProcessor<Functi
 
   private Function<SegmentReference, SegmentReference> 
createSegmentMapFunction()
   {
-    return 
inlineChannelData(query.getDataSource()).createSegmentMapFunction(query, new 
AtomicLong());
+    return 
inlineChannelData(query.getDataSource()).createSegmentMapFunction(query);
   }
 
   DataSource inlineChannelData(final DataSource originalDataSource)
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
index ccd3aef7573..05560796435 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
@@ -32,7 +32,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
@@ -97,7 +96,7 @@ public class InputNumberDataSource implements DataSource
   }
 
   @Override
-  public Function<SegmentReference, SegmentReference> 
createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc)
+  public Function<SegmentReference, SegmentReference> 
createSegmentMapFunction(Query query)
   {
     return Function.identity();
   }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/SimpleSegmentMapFnProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/SimpleSegmentMapFnProcessor.java
index 22ee0dd7fb4..9e5a8964bf2 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/SimpleSegmentMapFnProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/SimpleSegmentMapFnProcessor.java
@@ -64,7 +64,7 @@ public class SimpleSegmentMapFnProcessor implements 
FrameProcessor<Function<Segm
   @Override
   public ReturnOrAwait<Function<SegmentReference, SegmentReference>> 
runIncrementally(final IntSet readableInputs)
   {
-    return 
ReturnOrAwait.returnObject(query.getDataSource().createSegmentMapFunction(query,
 new AtomicLong()));
+    return 
ReturnOrAwait.returnObject(query.getDataSource().createSegmentMapFunction(query));
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java 
b/processing/src/main/java/org/apache/druid/query/DataSource.java
index 7c5f52d08fe..3f140ffc302 100644
--- a/processing/src/main/java/org/apache/druid/query/DataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/DataSource.java
@@ -30,7 +30,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -113,7 +112,7 @@ public interface DataSource
    * @param cpuTimeAcc the cpu time accumulator
    * @return the segment function
    */
-  Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query 
query, AtomicLong cpuTimeAcc);
+  Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query 
query);
 
   /**
    * Returns an updated datasource based on the specified new source.
diff --git 
a/processing/src/main/java/org/apache/druid/query/FilteredDataSource.java 
b/processing/src/main/java/org/apache/druid/query/FilteredDataSource.java
index 1644b3218d3..0716fd156ba 100644
--- a/processing/src/main/java/org/apache/druid/query/FilteredDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/FilteredDataSource.java
@@ -27,13 +27,12 @@ import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.segment.FilteredSegment;
 import org.apache.druid.segment.SegmentReference;
-import org.apache.druid.utils.JvmUtils;
 
 import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
@@ -122,19 +121,10 @@ public class FilteredDataSource implements DataSource
   }
 
   @Override
-  public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
-      Query query,
-      AtomicLong cpuTimeAccumulator
-  )
+  public Function<SegmentReference, SegmentReference> 
createSegmentMapFunction(Query query)
   {
-    final Function<SegmentReference, SegmentReference> segmentMapFn = 
base.createSegmentMapFunction(
-        query,
-        cpuTimeAccumulator
-    );
-    return JvmUtils.safeAccumulateThreadCpuTime(
-        cpuTimeAccumulator,
-        () -> baseSegment -> new 
FilteredSegment(segmentMapFn.apply(baseSegment), filter)
-    );
+    final Function<SegmentReference, SegmentReference> segmentMapFn = 
base.createSegmentMapFunction(query);
+    return baseSegment -> new FilteredSegment(segmentMapFn.apply(baseSegment), 
filter);
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java
 
b/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java
index 68d3db9a36d..060c06f3adf 100644
--- 
a/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java
+++ 
b/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java
@@ -37,7 +37,6 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -171,7 +170,7 @@ public class FrameBasedInlineDataSource implements 
DataSource
   }
 
   @Override
-  public Function<SegmentReference, SegmentReference> 
createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc)
+  public Function<SegmentReference, SegmentReference> 
createSegmentMapFunction(Query query)
   {
     return Function.identity();
   }
diff --git 
a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java 
b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java
index a14eb63fe6f..1326938eddc 100644
--- a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java
@@ -39,7 +39,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -243,10 +242,7 @@ public class InlineDataSource implements DataSource
   }
 
   @Override
-  public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
-      Query query,
-      AtomicLong cpuTimeAcc
-  )
+  public Function<SegmentReference, SegmentReference> 
createSegmentMapFunction(Query query)
   {
     return Function.identity();
   }
diff --git 
a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java 
b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java
index 9662a99ab81..f4ef4cc3fa8 100644
--- a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java
@@ -54,9 +54,9 @@ import 
org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
 import org.apache.druid.segment.join.filter.JoinFilterPreAnalysisKey;
 import org.apache.druid.segment.join.filter.JoinableClauses;
 import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
-import org.apache.druid.utils.JvmUtils;
 
 import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -65,7 +65,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -301,14 +300,12 @@ public class JoinDataSource implements DataSource
 
   @Override
   public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
-      Query query,
-      AtomicLong cpuTimeAccumulator
+      Query query
   )
   {
     return createSegmentMapFunctionInternal(
         analysis.getJoinBaseTableFilter().map(Filters::toFilter).orElse(null),
         analysis.getPreJoinableClauses(),
-        cpuTimeAccumulator,
         analysis.getBaseQuery().orElse(query)
     );
   }
@@ -444,86 +441,79 @@ public class JoinDataSource implements DataSource
   private Function<SegmentReference, SegmentReference> 
createSegmentMapFunctionInternal(
       @Nullable final Filter baseFilter,
       final List<PreJoinableClause> clauses,
-      final AtomicLong cpuTimeAccumulator,
       final Query<?> query
   )
   {
     // compute column correlations here and RHS correlated values
-    return JvmUtils.safeAccumulateThreadCpuTime(
-        cpuTimeAccumulator,
-        () -> {
-          if (clauses.isEmpty()) {
-            return Function.identity();
-          } else {
-            final JoinableClauses joinableClauses = 
JoinableClauses.createClauses(
-                clauses,
-                joinableFactoryWrapper.getJoinableFactory()
-            );
-            final JoinFilterRewriteConfig filterRewriteConfig = 
JoinFilterRewriteConfig.forQuery(query);
-
-            // Pick off any join clauses that can be converted into filters.
-            final Set<String> requiredColumns = query.getRequiredColumns();
-            final Filter baseFilterToUse;
-            final List<JoinableClause> clausesToUse;
-
-            if (requiredColumns != null && 
filterRewriteConfig.isEnableRewriteJoinToFilter()) {
-              final Pair<List<Filter>, List<JoinableClause>> conversionResult 
= JoinableFactoryWrapper.convertJoinsToFilters(
-                  joinableClauses.getJoinableClauses(),
-                  requiredColumns,
-                  
Ints.checkedCast(Math.min(filterRewriteConfig.getFilterRewriteMaxSize(), 
Integer.MAX_VALUE))
-              );
-
-              baseFilterToUse =
-                  Filters.maybeAnd(
-                      Lists.newArrayList(
-                          Iterables.concat(
-                              Collections.singleton(baseFilter),
-                              conversionResult.lhs
-                          )
-                      )
-                  ).orElse(null);
-              clausesToUse = conversionResult.rhs;
-            } else {
-              baseFilterToUse = baseFilter;
-              clausesToUse = joinableClauses.getJoinableClauses();
-            }
-
-            // Analyze remaining join clauses to see if filters on them can be 
pushed down.
-            final JoinFilterPreAnalysis joinFilterPreAnalysis = 
JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
-                new JoinFilterPreAnalysisKey(
-                    filterRewriteConfig,
-                    clausesToUse,
-                    query.getVirtualColumns(),
-                    Filters.maybeAnd(Arrays.asList(baseFilterToUse, 
Filters.toFilter(query.getFilter())))
-                           .orElse(null)
+    if (clauses.isEmpty()) {
+      return Function.identity();
+    } else {
+      final JoinableClauses joinableClauses = JoinableClauses.createClauses(
+          clauses,
+          joinableFactoryWrapper.getJoinableFactory()
+      );
+      final JoinFilterRewriteConfig filterRewriteConfig = 
JoinFilterRewriteConfig.forQuery(query);
+
+      // Pick off any join clauses that can be converted into filters.
+      final Set<String> requiredColumns = query.getRequiredColumns();
+      final Filter baseFilterToUse;
+      final List<JoinableClause> clausesToUse;
+
+      if (requiredColumns != null && 
filterRewriteConfig.isEnableRewriteJoinToFilter()) {
+        final Pair<List<Filter>, List<JoinableClause>> conversionResult = 
JoinableFactoryWrapper.convertJoinsToFilters(
+            joinableClauses.getJoinableClauses(),
+            requiredColumns,
+            
Ints.checkedCast(Math.min(filterRewriteConfig.getFilterRewriteMaxSize(), 
Integer.MAX_VALUE))
+        );
+
+        baseFilterToUse =
+            Filters.maybeAnd(
+                Lists.newArrayList(
+                    Iterables.concat(
+                        Collections.singleton(baseFilter),
+                        conversionResult.lhs
+                    )
                 )
-            );
-            final Function<SegmentReference, SegmentReference> baseMapFn;
-            // A join data source is not concrete
-            // And isConcrete() of an unnest datasource delegates to its base
-            // Hence, in the case of a Join -> Unnest -> Join
-            // if we just use isConcrete on the left
-            // the segment map function for the unnest would never get called
-            // This calls us to delegate to the segmentMapFunction of the left
-            // only when it is not a JoinDataSource
-            if (left instanceof JoinDataSource) {
-              baseMapFn = Function.identity();
-            } else {
-              baseMapFn = left.createSegmentMapFunction(
-                  query,
-                  cpuTimeAccumulator
-              );
-            }
-            return baseSegment ->
-                new HashJoinSegment(
-                    baseMapFn.apply(baseSegment),
-                    baseFilterToUse,
-                    GuavaUtils.firstNonNull(clausesToUse, ImmutableList.of()),
-                    joinFilterPreAnalysis
-                );
-          }
-        }
-    );
+            ).orElse(null);
+        clausesToUse = conversionResult.rhs;
+      } else {
+        baseFilterToUse = baseFilter;
+        clausesToUse = joinableClauses.getJoinableClauses();
+      }
+
+      // Analyze remaining join clauses to see if filters on them can be 
pushed down.
+      final JoinFilterPreAnalysis joinFilterPreAnalysis = 
JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
+          new JoinFilterPreAnalysisKey(
+              filterRewriteConfig,
+              clausesToUse,
+              query.getVirtualColumns(),
+              Filters.maybeAnd(Arrays.asList(baseFilterToUse, 
Filters.toFilter(query.getFilter())))
+                     .orElse(null)
+          )
+      );
+      final Function<SegmentReference, SegmentReference> baseMapFn;
+      // A join data source is not concrete
+      // And isConcrete() of an unnest datasource delegates to its base
+      // Hence, in the case of a Join -> Unnest -> Join
+      // if we just use isConcrete on the left
+      // the segment map function for the unnest would never get called
+      // This calls us to delegate to the segmentMapFunction of the left
+      // only when it is not a JoinDataSource
+      if (left instanceof JoinDataSource) {
+        baseMapFn = Function.identity();
+      } else {
+        baseMapFn = left.createSegmentMapFunction(
+            query
+        );
+      }
+      return baseSegment ->
+          new HashJoinSegment(
+              baseMapFn.apply(baseSegment),
+              baseFilterToUse,
+              GuavaUtils.firstNonNull(clausesToUse, ImmutableList.of()),
+              joinFilterPreAnalysis
+          );
+    }
   }
 
   /**
diff --git 
a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java 
b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java
index 873ca889321..84a23cc0282 100644
--- a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java
@@ -30,7 +30,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
@@ -101,10 +100,7 @@ public class LookupDataSource implements DataSource
   }
 
   @Override
-  public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
-      Query query,
-      AtomicLong cpuTime
-  )
+  public Function<SegmentReference, SegmentReference> 
createSegmentMapFunction(Query query)
   {
     return Function.identity();
   }
diff --git 
a/processing/src/main/java/org/apache/druid/query/QueryDataSource.java 
b/processing/src/main/java/org/apache/druid/query/QueryDataSource.java
index 08dc44126fe..35f548def51 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryDataSource.java
@@ -32,7 +32,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 @JsonTypeName("query")
@@ -110,12 +109,11 @@ public class QueryDataSource implements DataSource
 
   @Override
   public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
-      Query query,
-      AtomicLong cpuTime
+      Query query
   )
   {
     final Query<?> subQuery = this.getQuery();
-    return subQuery.getDataSource().createSegmentMapFunction(subQuery, 
cpuTime);
+    return subQuery.getDataSource().createSegmentMapFunction(subQuery);
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java 
b/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java
index f7f91072fc6..7f1303b4a47 100644
--- a/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java
@@ -29,14 +29,11 @@ import org.apache.druid.query.policy.NoRestrictionPolicy;
 import org.apache.druid.query.policy.Policy;
 import org.apache.druid.segment.RestrictedSegment;
 import org.apache.druid.segment.SegmentReference;
-import org.apache.druid.utils.JvmUtils;
-
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
@@ -125,18 +122,10 @@ public class RestrictedDataSource implements DataSource
   }
 
   @Override
-  public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
-      Query query,
-      AtomicLong cpuTimeAccumulator
-  )
+  public Function<SegmentReference, SegmentReference> 
createSegmentMapFunction(Query query)
   {
-    return JvmUtils.safeAccumulateThreadCpuTime(
-        cpuTimeAccumulator,
-        () -> base.createSegmentMapFunction(
-            query,
-            cpuTimeAccumulator
-        ).andThen((segment) -> (new RestrictedSegment(segment, policy)))
-    );
+    final Function<SegmentReference, SegmentReference> segmentMapFn = 
base.createSegmentMapFunction(query);
+    return baseSegment -> new 
RestrictedSegment(segmentMapFn.apply(baseSegment), policy);
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/query/TableDataSource.java 
b/processing/src/main/java/org/apache/druid/query/TableDataSource.java
index d735a75928d..22f6fb7a975 100644
--- a/processing/src/main/java/org/apache/druid/query/TableDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/TableDataSource.java
@@ -34,7 +34,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 @JsonTypeName("table")
@@ -101,10 +100,7 @@ public class TableDataSource implements DataSource
   }
 
   @Override
-  public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
-      Query query,
-      AtomicLong cpuTime
-  )
+  public Function<SegmentReference, SegmentReference> 
createSegmentMapFunction(Query query)
   {
     return Function.identity();
   }
diff --git 
a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java 
b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java
index 0384ecbc885..3673608c0c8 100644
--- a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java
@@ -33,7 +33,6 @@ import org.apache.druid.utils.CollectionUtils;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -144,10 +143,7 @@ public class UnionDataSource implements DataSource
   }
 
   @Override
-  public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
-      Query query,
-      AtomicLong cpuTime
-  )
+  public Function<SegmentReference, SegmentReference> 
createSegmentMapFunction(Query query)
   {
     return Function.identity();
   }
diff --git 
a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java 
b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
index 06acca13a75..b6b89823fe5 100644
--- a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
@@ -28,13 +28,12 @@ import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.segment.SegmentReference;
 import org.apache.druid.segment.UnnestSegment;
 import org.apache.druid.segment.VirtualColumn;
-import org.apache.druid.utils.JvmUtils;
 
 import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
@@ -135,18 +134,13 @@ public class UnnestDataSource implements DataSource
 
   @Override
   public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
-      Query query,
-      AtomicLong cpuTimeAccumulator
+      Query query
   )
   {
     final Function<SegmentReference, SegmentReference> segmentMapFn = 
base.createSegmentMapFunction(
-        query,
-        cpuTimeAccumulator
-    );
-    return JvmUtils.safeAccumulateThreadCpuTime(
-        cpuTimeAccumulator,
-        () -> baseSegment -> new 
UnnestSegment(segmentMapFn.apply(baseSegment), virtualColumn, unnestFilter)
+        query
     );
+    return baseSegment -> new UnnestSegment(segmentMapFn.apply(baseSegment), 
virtualColumn, unnestFilter);
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/query/union/UnionQuery.java 
b/processing/src/main/java/org/apache/druid/query/union/UnionQuery.java
index a999a23fde2..2d16f674e67 100644
--- a/processing/src/main/java/org/apache/druid/query/union/UnionQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/union/UnionQuery.java
@@ -49,7 +49,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 public class UnionQuery implements Query<Object>
@@ -271,7 +270,7 @@ public class UnionQuery implements Query<Object>
     }
 
     @Override
-    public Function<SegmentReference, SegmentReference> 
createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc)
+    public Function<SegmentReference, SegmentReference> 
createSegmentMapFunction(Query query)
     {
       throw methodNotSupported();
     }
diff --git 
a/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java 
b/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java
index 96d8ddd05a5..cde52961abc 100644
--- a/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java
+++ b/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java
@@ -32,7 +32,6 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.util.Collections;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 public class QueryDataSourceTest
@@ -174,17 +173,15 @@ public class QueryDataSourceTest
   public void test_withSegmentMapFunction()
   {
     Function<SegmentReference, SegmentReference> parentsegmentMapFunction = 
queryDataSource.createSegmentMapFunction(
-        groupByQuery,
-        new AtomicLong()
+        groupByQuery
     );
 
     Function<SegmentReference, SegmentReference> childsegmentMapFunction = 
queryOnTableDataSource.createSegmentMapFunction(
-        groupByQuery,
-        new AtomicLong()
+        groupByQuery
     );
     // The segment functions should both be identity functions and equal
     Assert.assertEquals(parentsegmentMapFunction, childsegmentMapFunction);
   }
 
-  
+
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java 
b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
index 120ef503b8a..1d0a21061df 100644
--- a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
+++ b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
@@ -79,7 +79,6 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -559,7 +558,7 @@ public class QueryRunnerTestHelper
   {
     final DataSource base = query.getDataSource();
 
-    final SegmentReference segmentReference = 
base.createSegmentMapFunction(query, new AtomicLong())
+    final SegmentReference segmentReference = 
base.createSegmentMapFunction(query)
                                                   
.apply(ReferenceCountingSegment.wrapRootGenerationSegment(adapter));
     return makeQueryRunner(factory, segmentReference, runnerName);
   }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java 
b/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java
index c211137c85e..70e5b110de4 100644
--- a/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java
+++ b/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java
@@ -27,7 +27,6 @@ import org.apache.druid.segment.SegmentReference;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
@@ -72,10 +71,7 @@ public class NoopDataSource implements DataSource
   }
 
   @Override
-  public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
-      Query query,
-      AtomicLong cpuTime
-  )
+  public Function<SegmentReference, SegmentReference> 
createSegmentMapFunction(Query query)
   {
     return Function.identity();
   }
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index c256e82c6d2..6460f51ca57 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -75,6 +75,7 @@ import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.IntegerPartitionChunk;
 import org.apache.druid.timeline.partition.PartitionChunk;
 import org.apache.druid.utils.CloseableUtils;
+import org.apache.druid.utils.JvmUtils;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -201,11 +202,10 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
     }
 
     // segmentMapFn maps each base Segment into a joined Segment if necessary.
-    final Function<SegmentReference, SegmentReference> segmentMapFn =
-        dataSourceFromQuery.createSegmentMapFunction(
-            query,
-            cpuTimeAccumulator
-        );
+    final Function<SegmentReference, SegmentReference> segmentMapFn = 
JvmUtils.safeAccumulateThreadCpuTime(
+        cpuTimeAccumulator,
+        () -> dataSourceFromQuery.createSegmentMapFunction(query)
+    );
 
     // We compute the join cache key here itself so it doesn't need to be 
re-computed for every segment
     final Optional<byte[]> cacheKeyPrefix = 
Optional.ofNullable(query.getDataSource().getCacheKey());
@@ -453,7 +453,7 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
     // with subsegments (hydrants).
     return segmentId + "_H" + hydrantNumber;
   }
-  
+
   /**
    * This class is responsible for emitting query/segment/time, 
query/wait/time and query/segmentAndCache/Time metrics for a Sink.
    * It accumulates query/segment/time and query/segmentAndCache/time metric 
for each FireHydrant at the level of Sink.
@@ -599,7 +599,7 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
       }
     }
   }
-  
+
   private static class SinkHolder implements Overshadowable<SinkHolder>
   {
     private final Sink sink;
diff --git 
a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java 
b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
index ae6b67deb5f..9d036fb47c3 100644
--- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
@@ -96,11 +96,7 @@ public class LocalQuerySegmentWalker implements 
QuerySegmentWalker
     final AtomicLong cpuAccumulator = new AtomicLong(0L);
 
     final Function<SegmentReference, SegmentReference> segmentMapFn = 
dataSourceFromQuery
-        .createSegmentMapFunction(
-            query,
-            cpuAccumulator
-        );
-
+        .createSegmentMapFunction(query);
 
     final QueryRunnerFactory<T, Query<T>> queryRunnerFactory = 
conglomerate.findFactory(query);
     final QueryRunner<T> baseRunner = queryRunnerFactory.mergeRunners(
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java 
b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
index 3317375347d..b3297ce9a60 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
@@ -66,6 +66,7 @@ import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.PartitionChunk;
+import org.apache.druid.utils.JvmUtils;
 import org.joda.time.Interval;
 
 import java.util.Collections;
@@ -194,9 +195,10 @@ public class ServerManager implements QuerySegmentWalker
     } else {
       return new 
ReportTimelineMissingSegmentQueryRunner<>(Lists.newArrayList(specs));
     }
-    final Function<SegmentReference, SegmentReference> segmentMapFn =
-        dataSourceFromQuery
-             .createSegmentMapFunction(newQuery, cpuTimeAccumulator);
+    final Function<SegmentReference, SegmentReference> segmentMapFn = 
JvmUtils.safeAccumulateThreadCpuTime(
+        cpuTimeAccumulator,
+        () -> dataSourceFromQuery.createSegmentMapFunction(newQuery)
+    );
 
     // We compute the datasource's cache key here itself so it doesn't need to 
be re-computed for every segment
     final Optional<byte[]> cacheKeyPrefix = 
Optional.ofNullable(dataSourceFromQuery.getCacheKey());
diff --git 
a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
 
b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
index 464092d7ccb..1f0210422dd 100644
--- 
a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
+++ 
b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
@@ -63,7 +63,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
@@ -158,8 +157,7 @@ public class TestClusterQuerySegmentWalker implements 
QuerySegmentWalker
     }
 
     final Function<SegmentReference, SegmentReference> segmentMapFn = 
dataSourceFromQuery.createSegmentMapFunction(
-        query,
-        new AtomicLong()
+        query
     );
 
     final QueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<>(
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java
index e74a0eaefb6..c35dda4cda1 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java
@@ -36,7 +36,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
@@ -126,8 +125,7 @@ public class ExternalDataSource implements DataSource
 
   @Override
   public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
-      Query query,
-      AtomicLong cpuTime
+      Query query
   )
   {
     return Function.identity();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to