This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5b43c91411c Remove cpuTimeAcc from
DataSource#createSegmentMapFunction's signature (#17623)
5b43c91411c is described below
commit 5b43c91411cd5bacb000bc4b0386995f2c8834aa
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Wed Feb 12 09:36:13 2025 +0100
Remove cpuTimeAcc from DataSource#createSegmentMapFunction's signature
(#17623)
This patch removes cpuTimeAcc from DataSource#createSegmentMapFunction
signature.
the method being executed doesn't need to know internally that they are
being measured; it also complicates code-flow a bit - as it tries to avoid
double counting.
---
.../querykit/BaseLeafFrameProcessorFactory.java | 3 +-
.../BroadcastJoinSegmentMapFnProcessor.java | 2 +-
.../druid/msq/querykit/InputNumberDataSource.java | 3 +-
.../msq/querykit/SimpleSegmentMapFnProcessor.java | 2 +-
.../java/org/apache/druid/query/DataSource.java | 3 +-
.../org/apache/druid/query/FilteredDataSource.java | 18 +--
.../druid/query/FrameBasedInlineDataSource.java | 3 +-
.../org/apache/druid/query/InlineDataSource.java | 6 +-
.../org/apache/druid/query/JoinDataSource.java | 150 ++++++++++-----------
.../org/apache/druid/query/LookupDataSource.java | 6 +-
.../org/apache/druid/query/QueryDataSource.java | 6 +-
.../apache/druid/query/RestrictedDataSource.java | 17 +--
.../org/apache/druid/query/TableDataSource.java | 6 +-
.../org/apache/druid/query/UnionDataSource.java | 6 +-
.../org/apache/druid/query/UnnestDataSource.java | 14 +-
.../org/apache/druid/query/union/UnionQuery.java | 3 +-
.../apache/druid/query/QueryDataSourceTest.java | 9 +-
.../apache/druid/query/QueryRunnerTestHelper.java | 3 +-
.../apache/druid/segment/join/NoopDataSource.java | 6 +-
.../appenderator/SinkQuerySegmentWalker.java | 14 +-
.../druid/server/LocalQuerySegmentWalker.java | 6 +-
.../druid/server/coordination/ServerManager.java | 8 +-
.../server/TestClusterQuerySegmentWalker.java | 4 +-
.../sql/calcite/external/ExternalDataSource.java | 4 +-
24 files changed, 114 insertions(+), 188 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
index 013b6d4c93c..3e6db775393 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
@@ -58,7 +58,6 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -163,7 +162,7 @@ public abstract class BaseLeafFrameProcessorFactory extends
BaseFrameProcessorFa
if (segmentMapFnProcessor == null) {
final Function<SegmentReference, SegmentReference> segmentMapFn =
- query.getDataSource().createSegmentMapFunction(query, new
AtomicLong());
+ query.getDataSource().createSegmentMapFunction(query);
processorManager =
processorManagerFn.apply(ImmutableList.of(segmentMapFn));
} else {
processorManager = new ChainedProcessorManager<>(ProcessorManagers.of(()
-> segmentMapFnProcessor), processorManagerFn);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java
index b47a450aa00..d3ce49d5f8e 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java
@@ -193,7 +193,7 @@ public class BroadcastJoinSegmentMapFnProcessor implements
FrameProcessor<Functi
private Function<SegmentReference, SegmentReference>
createSegmentMapFunction()
{
- return
inlineChannelData(query.getDataSource()).createSegmentMapFunction(query, new
AtomicLong());
+ return
inlineChannelData(query.getDataSource()).createSegmentMapFunction(query);
}
DataSource inlineChannelData(final DataSource originalDataSource)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
index ccd3aef7573..05560796435 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
@@ -32,7 +32,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
@@ -97,7 +96,7 @@ public class InputNumberDataSource implements DataSource
}
@Override
- public Function<SegmentReference, SegmentReference>
createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc)
+ public Function<SegmentReference, SegmentReference>
createSegmentMapFunction(Query query)
{
return Function.identity();
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/SimpleSegmentMapFnProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/SimpleSegmentMapFnProcessor.java
index 22ee0dd7fb4..9e5a8964bf2 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/SimpleSegmentMapFnProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/SimpleSegmentMapFnProcessor.java
@@ -64,7 +64,7 @@ public class SimpleSegmentMapFnProcessor implements
FrameProcessor<Function<Segm
@Override
public ReturnOrAwait<Function<SegmentReference, SegmentReference>>
runIncrementally(final IntSet readableInputs)
{
- return
ReturnOrAwait.returnObject(query.getDataSource().createSegmentMapFunction(query,
new AtomicLong()));
+ return
ReturnOrAwait.returnObject(query.getDataSource().createSegmentMapFunction(query));
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java
b/processing/src/main/java/org/apache/druid/query/DataSource.java
index 7c5f52d08fe..3f140ffc302 100644
--- a/processing/src/main/java/org/apache/druid/query/DataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/DataSource.java
@@ -30,7 +30,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -113,7 +112,7 @@ public interface DataSource
* @param cpuTimeAcc the cpu time accumulator
* @return the segment function
*/
- Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query
query, AtomicLong cpuTimeAcc);
+ Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query
query);
/**
* Returns an updated datasource based on the specified new source.
diff --git
a/processing/src/main/java/org/apache/druid/query/FilteredDataSource.java
b/processing/src/main/java/org/apache/druid/query/FilteredDataSource.java
index 1644b3218d3..0716fd156ba 100644
--- a/processing/src/main/java/org/apache/druid/query/FilteredDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/FilteredDataSource.java
@@ -27,13 +27,12 @@ import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.FilteredSegment;
import org.apache.druid.segment.SegmentReference;
-import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
+
import java.util.List;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
@@ -122,19 +121,10 @@ public class FilteredDataSource implements DataSource
}
@Override
- public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
- Query query,
- AtomicLong cpuTimeAccumulator
- )
+ public Function<SegmentReference, SegmentReference>
createSegmentMapFunction(Query query)
{
- final Function<SegmentReference, SegmentReference> segmentMapFn =
base.createSegmentMapFunction(
- query,
- cpuTimeAccumulator
- );
- return JvmUtils.safeAccumulateThreadCpuTime(
- cpuTimeAccumulator,
- () -> baseSegment -> new
FilteredSegment(segmentMapFn.apply(baseSegment), filter)
- );
+ final Function<SegmentReference, SegmentReference> segmentMapFn =
base.createSegmentMapFunction(query);
+ return baseSegment -> new FilteredSegment(segmentMapFn.apply(baseSegment),
filter);
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java
b/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java
index 68d3db9a36d..060c06f3adf 100644
---
a/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java
+++
b/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java
@@ -37,7 +37,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -171,7 +170,7 @@ public class FrameBasedInlineDataSource implements
DataSource
}
@Override
- public Function<SegmentReference, SegmentReference>
createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc)
+ public Function<SegmentReference, SegmentReference>
createSegmentMapFunction(Query query)
{
return Function.identity();
}
diff --git
a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java
b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java
index a14eb63fe6f..1326938eddc 100644
--- a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java
@@ -39,7 +39,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -243,10 +242,7 @@ public class InlineDataSource implements DataSource
}
@Override
- public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
- Query query,
- AtomicLong cpuTimeAcc
- )
+ public Function<SegmentReference, SegmentReference>
createSegmentMapFunction(Query query)
{
return Function.identity();
}
diff --git
a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java
b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java
index 9662a99ab81..f4ef4cc3fa8 100644
--- a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java
@@ -54,9 +54,9 @@ import
org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysisKey;
import org.apache.druid.segment.join.filter.JoinableClauses;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
-import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -65,7 +65,6 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -301,14 +300,12 @@ public class JoinDataSource implements DataSource
@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
- Query query,
- AtomicLong cpuTimeAccumulator
+ Query query
)
{
return createSegmentMapFunctionInternal(
analysis.getJoinBaseTableFilter().map(Filters::toFilter).orElse(null),
analysis.getPreJoinableClauses(),
- cpuTimeAccumulator,
analysis.getBaseQuery().orElse(query)
);
}
@@ -444,86 +441,79 @@ public class JoinDataSource implements DataSource
private Function<SegmentReference, SegmentReference>
createSegmentMapFunctionInternal(
@Nullable final Filter baseFilter,
final List<PreJoinableClause> clauses,
- final AtomicLong cpuTimeAccumulator,
final Query<?> query
)
{
// compute column correlations here and RHS correlated values
- return JvmUtils.safeAccumulateThreadCpuTime(
- cpuTimeAccumulator,
- () -> {
- if (clauses.isEmpty()) {
- return Function.identity();
- } else {
- final JoinableClauses joinableClauses =
JoinableClauses.createClauses(
- clauses,
- joinableFactoryWrapper.getJoinableFactory()
- );
- final JoinFilterRewriteConfig filterRewriteConfig =
JoinFilterRewriteConfig.forQuery(query);
-
- // Pick off any join clauses that can be converted into filters.
- final Set<String> requiredColumns = query.getRequiredColumns();
- final Filter baseFilterToUse;
- final List<JoinableClause> clausesToUse;
-
- if (requiredColumns != null &&
filterRewriteConfig.isEnableRewriteJoinToFilter()) {
- final Pair<List<Filter>, List<JoinableClause>> conversionResult
= JoinableFactoryWrapper.convertJoinsToFilters(
- joinableClauses.getJoinableClauses(),
- requiredColumns,
-
Ints.checkedCast(Math.min(filterRewriteConfig.getFilterRewriteMaxSize(),
Integer.MAX_VALUE))
- );
-
- baseFilterToUse =
- Filters.maybeAnd(
- Lists.newArrayList(
- Iterables.concat(
- Collections.singleton(baseFilter),
- conversionResult.lhs
- )
- )
- ).orElse(null);
- clausesToUse = conversionResult.rhs;
- } else {
- baseFilterToUse = baseFilter;
- clausesToUse = joinableClauses.getJoinableClauses();
- }
-
- // Analyze remaining join clauses to see if filters on them can be
pushed down.
- final JoinFilterPreAnalysis joinFilterPreAnalysis =
JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
- new JoinFilterPreAnalysisKey(
- filterRewriteConfig,
- clausesToUse,
- query.getVirtualColumns(),
- Filters.maybeAnd(Arrays.asList(baseFilterToUse,
Filters.toFilter(query.getFilter())))
- .orElse(null)
+ if (clauses.isEmpty()) {
+ return Function.identity();
+ } else {
+ final JoinableClauses joinableClauses = JoinableClauses.createClauses(
+ clauses,
+ joinableFactoryWrapper.getJoinableFactory()
+ );
+ final JoinFilterRewriteConfig filterRewriteConfig =
JoinFilterRewriteConfig.forQuery(query);
+
+ // Pick off any join clauses that can be converted into filters.
+ final Set<String> requiredColumns = query.getRequiredColumns();
+ final Filter baseFilterToUse;
+ final List<JoinableClause> clausesToUse;
+
+ if (requiredColumns != null &&
filterRewriteConfig.isEnableRewriteJoinToFilter()) {
+ final Pair<List<Filter>, List<JoinableClause>> conversionResult =
JoinableFactoryWrapper.convertJoinsToFilters(
+ joinableClauses.getJoinableClauses(),
+ requiredColumns,
+
Ints.checkedCast(Math.min(filterRewriteConfig.getFilterRewriteMaxSize(),
Integer.MAX_VALUE))
+ );
+
+ baseFilterToUse =
+ Filters.maybeAnd(
+ Lists.newArrayList(
+ Iterables.concat(
+ Collections.singleton(baseFilter),
+ conversionResult.lhs
+ )
)
- );
- final Function<SegmentReference, SegmentReference> baseMapFn;
- // A join data source is not concrete
- // And isConcrete() of an unnest datasource delegates to its base
- // Hence, in the case of a Join -> Unnest -> Join
- // if we just use isConcrete on the left
- // the segment map function for the unnest would never get called
- // This calls us to delegate to the segmentMapFunction of the left
- // only when it is not a JoinDataSource
- if (left instanceof JoinDataSource) {
- baseMapFn = Function.identity();
- } else {
- baseMapFn = left.createSegmentMapFunction(
- query,
- cpuTimeAccumulator
- );
- }
- return baseSegment ->
- new HashJoinSegment(
- baseMapFn.apply(baseSegment),
- baseFilterToUse,
- GuavaUtils.firstNonNull(clausesToUse, ImmutableList.of()),
- joinFilterPreAnalysis
- );
- }
- }
- );
+ ).orElse(null);
+ clausesToUse = conversionResult.rhs;
+ } else {
+ baseFilterToUse = baseFilter;
+ clausesToUse = joinableClauses.getJoinableClauses();
+ }
+
+ // Analyze remaining join clauses to see if filters on them can be
pushed down.
+ final JoinFilterPreAnalysis joinFilterPreAnalysis =
JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
+ new JoinFilterPreAnalysisKey(
+ filterRewriteConfig,
+ clausesToUse,
+ query.getVirtualColumns(),
+ Filters.maybeAnd(Arrays.asList(baseFilterToUse,
Filters.toFilter(query.getFilter())))
+ .orElse(null)
+ )
+ );
+ final Function<SegmentReference, SegmentReference> baseMapFn;
+ // A join data source is not concrete
+ // And isConcrete() of an unnest datasource delegates to its base
+ // Hence, in the case of a Join -> Unnest -> Join
+ // if we just use isConcrete on the left
+ // the segment map function for the unnest would never get called
+ // This calls us to delegate to the segmentMapFunction of the left
+ // only when it is not a JoinDataSource
+ if (left instanceof JoinDataSource) {
+ baseMapFn = Function.identity();
+ } else {
+ baseMapFn = left.createSegmentMapFunction(
+ query
+ );
+ }
+ return baseSegment ->
+ new HashJoinSegment(
+ baseMapFn.apply(baseSegment),
+ baseFilterToUse,
+ GuavaUtils.firstNonNull(clausesToUse, ImmutableList.of()),
+ joinFilterPreAnalysis
+ );
+ }
}
/**
diff --git
a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java
b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java
index 873ca889321..84a23cc0282 100644
--- a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java
@@ -30,7 +30,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
@@ -101,10 +100,7 @@ public class LookupDataSource implements DataSource
}
@Override
- public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
- Query query,
- AtomicLong cpuTime
- )
+ public Function<SegmentReference, SegmentReference>
createSegmentMapFunction(Query query)
{
return Function.identity();
}
diff --git
a/processing/src/main/java/org/apache/druid/query/QueryDataSource.java
b/processing/src/main/java/org/apache/druid/query/QueryDataSource.java
index 08dc44126fe..35f548def51 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryDataSource.java
@@ -32,7 +32,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@JsonTypeName("query")
@@ -110,12 +109,11 @@ public class QueryDataSource implements DataSource
@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
- Query query,
- AtomicLong cpuTime
+ Query query
)
{
final Query<?> subQuery = this.getQuery();
- return subQuery.getDataSource().createSegmentMapFunction(subQuery,
cpuTime);
+ return subQuery.getDataSource().createSegmentMapFunction(subQuery);
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java
b/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java
index f7f91072fc6..7f1303b4a47 100644
--- a/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java
@@ -29,14 +29,11 @@ import org.apache.druid.query.policy.NoRestrictionPolicy;
import org.apache.druid.query.policy.Policy;
import org.apache.druid.segment.RestrictedSegment;
import org.apache.druid.segment.SegmentReference;
-import org.apache.druid.utils.JvmUtils;
-
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
@@ -125,18 +122,10 @@ public class RestrictedDataSource implements DataSource
}
@Override
- public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
- Query query,
- AtomicLong cpuTimeAccumulator
- )
+ public Function<SegmentReference, SegmentReference>
createSegmentMapFunction(Query query)
{
- return JvmUtils.safeAccumulateThreadCpuTime(
- cpuTimeAccumulator,
- () -> base.createSegmentMapFunction(
- query,
- cpuTimeAccumulator
- ).andThen((segment) -> (new RestrictedSegment(segment, policy)))
- );
+ final Function<SegmentReference, SegmentReference> segmentMapFn =
base.createSegmentMapFunction(query);
+ return baseSegment -> new
RestrictedSegment(segmentMapFn.apply(baseSegment), policy);
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/TableDataSource.java
b/processing/src/main/java/org/apache/druid/query/TableDataSource.java
index d735a75928d..22f6fb7a975 100644
--- a/processing/src/main/java/org/apache/druid/query/TableDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/TableDataSource.java
@@ -34,7 +34,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@JsonTypeName("table")
@@ -101,10 +100,7 @@ public class TableDataSource implements DataSource
}
@Override
- public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
- Query query,
- AtomicLong cpuTime
- )
+ public Function<SegmentReference, SegmentReference>
createSegmentMapFunction(Query query)
{
return Function.identity();
}
diff --git
a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java
b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java
index 0384ecbc885..3673608c0c8 100644
--- a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java
@@ -33,7 +33,6 @@ import org.apache.druid.utils.CollectionUtils;
import java.util.Collections;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -144,10 +143,7 @@ public class UnionDataSource implements DataSource
}
@Override
- public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
- Query query,
- AtomicLong cpuTime
- )
+ public Function<SegmentReference, SegmentReference>
createSegmentMapFunction(Query query)
{
return Function.identity();
}
diff --git
a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
index 06acca13a75..b6b89823fe5 100644
--- a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
@@ -28,13 +28,12 @@ import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.UnnestSegment;
import org.apache.druid.segment.VirtualColumn;
-import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
+
import java.util.List;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
@@ -135,18 +134,13 @@ public class UnnestDataSource implements DataSource
@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
- Query query,
- AtomicLong cpuTimeAccumulator
+ Query query
)
{
final Function<SegmentReference, SegmentReference> segmentMapFn =
base.createSegmentMapFunction(
- query,
- cpuTimeAccumulator
- );
- return JvmUtils.safeAccumulateThreadCpuTime(
- cpuTimeAccumulator,
- () -> baseSegment -> new
UnnestSegment(segmentMapFn.apply(baseSegment), virtualColumn, unnestFilter)
+ query
);
+ return baseSegment -> new UnnestSegment(segmentMapFn.apply(baseSegment),
virtualColumn, unnestFilter);
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/union/UnionQuery.java
b/processing/src/main/java/org/apache/druid/query/union/UnionQuery.java
index a999a23fde2..2d16f674e67 100644
--- a/processing/src/main/java/org/apache/druid/query/union/UnionQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/union/UnionQuery.java
@@ -49,7 +49,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
public class UnionQuery implements Query<Object>
@@ -271,7 +270,7 @@ public class UnionQuery implements Query<Object>
}
@Override
- public Function<SegmentReference, SegmentReference>
createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc)
+ public Function<SegmentReference, SegmentReference>
createSegmentMapFunction(Query query)
{
throw methodNotSupported();
}
diff --git
a/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java
b/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java
index 96d8ddd05a5..cde52961abc 100644
--- a/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java
+++ b/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java
@@ -32,7 +32,6 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Collections;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
public class QueryDataSourceTest
@@ -174,17 +173,15 @@ public class QueryDataSourceTest
public void test_withSegmentMapFunction()
{
Function<SegmentReference, SegmentReference> parentsegmentMapFunction =
queryDataSource.createSegmentMapFunction(
- groupByQuery,
- new AtomicLong()
+ groupByQuery
);
Function<SegmentReference, SegmentReference> childsegmentMapFunction =
queryOnTableDataSource.createSegmentMapFunction(
- groupByQuery,
- new AtomicLong()
+ groupByQuery
);
// The segment functions should both be identity functions and equal
Assert.assertEquals(parentsegmentMapFunction, childsegmentMapFunction);
}
-
+
}
diff --git
a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
index 120ef503b8a..1d0a21061df 100644
--- a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
+++ b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
@@ -79,7 +79,6 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -559,7 +558,7 @@ public class QueryRunnerTestHelper
{
final DataSource base = query.getDataSource();
- final SegmentReference segmentReference =
base.createSegmentMapFunction(query, new AtomicLong())
+ final SegmentReference segmentReference =
base.createSegmentMapFunction(query)
.apply(ReferenceCountingSegment.wrapRootGenerationSegment(adapter));
return makeQueryRunner(factory, segmentReference, runnerName);
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java
b/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java
index c211137c85e..70e5b110de4 100644
--- a/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java
+++ b/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java
@@ -27,7 +27,6 @@ import org.apache.druid.segment.SegmentReference;
import java.util.Collections;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
@@ -72,10 +71,7 @@ public class NoopDataSource implements DataSource
}
@Override
- public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
- Query query,
- AtomicLong cpuTime
- )
+ public Function<SegmentReference, SegmentReference>
createSegmentMapFunction(Query query)
{
return Function.identity();
}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index c256e82c6d2..6460f51ca57 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -75,6 +75,7 @@ import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.IntegerPartitionChunk;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.utils.CloseableUtils;
+import org.apache.druid.utils.JvmUtils;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -201,11 +202,10 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
}
// segmentMapFn maps each base Segment into a joined Segment if necessary.
- final Function<SegmentReference, SegmentReference> segmentMapFn =
- dataSourceFromQuery.createSegmentMapFunction(
- query,
- cpuTimeAccumulator
- );
+ final Function<SegmentReference, SegmentReference> segmentMapFn =
JvmUtils.safeAccumulateThreadCpuTime(
+ cpuTimeAccumulator,
+ () -> dataSourceFromQuery.createSegmentMapFunction(query)
+ );
// We compute the join cache key here itself so it doesn't need to be
re-computed for every segment
final Optional<byte[]> cacheKeyPrefix =
Optional.ofNullable(query.getDataSource().getCacheKey());
@@ -453,7 +453,7 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
// with subsegments (hydrants).
return segmentId + "_H" + hydrantNumber;
}
-
+
/**
* This class is responsible for emitting query/segment/time,
query/wait/time and query/segmentAndCache/Time metrics for a Sink.
* It accumulates query/segment/time and query/segmentAndCache/time metric
for each FireHydrant at the level of Sink.
@@ -599,7 +599,7 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
}
}
}
-
+
private static class SinkHolder implements Overshadowable<SinkHolder>
{
private final Sink sink;
diff --git
a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
index ae6b67deb5f..9d036fb47c3 100644
--- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
@@ -96,11 +96,7 @@ public class LocalQuerySegmentWalker implements
QuerySegmentWalker
final AtomicLong cpuAccumulator = new AtomicLong(0L);
final Function<SegmentReference, SegmentReference> segmentMapFn =
dataSourceFromQuery
- .createSegmentMapFunction(
- query,
- cpuAccumulator
- );
-
+ .createSegmentMapFunction(query);
final QueryRunnerFactory<T, Query<T>> queryRunnerFactory =
conglomerate.findFactory(query);
final QueryRunner<T> baseRunner = queryRunnerFactory.mergeRunners(
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
index 3317375347d..b3297ce9a60 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
@@ -66,6 +66,7 @@ import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
+import org.apache.druid.utils.JvmUtils;
import org.joda.time.Interval;
import java.util.Collections;
@@ -194,9 +195,10 @@ public class ServerManager implements QuerySegmentWalker
} else {
return new
ReportTimelineMissingSegmentQueryRunner<>(Lists.newArrayList(specs));
}
- final Function<SegmentReference, SegmentReference> segmentMapFn =
- dataSourceFromQuery
- .createSegmentMapFunction(newQuery, cpuTimeAccumulator);
+ final Function<SegmentReference, SegmentReference> segmentMapFn =
JvmUtils.safeAccumulateThreadCpuTime(
+ cpuTimeAccumulator,
+ () -> dataSourceFromQuery.createSegmentMapFunction(newQuery)
+ );
// We compute the datasource's cache key here itself so it doesn't need to
be re-computed for every segment
final Optional<byte[]> cacheKeyPrefix =
Optional.ofNullable(dataSourceFromQuery.getCacheKey());
diff --git
a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
index 464092d7ccb..1f0210422dd 100644
---
a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
+++
b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
@@ -63,7 +63,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
@@ -158,8 +157,7 @@ public class TestClusterQuerySegmentWalker implements
QuerySegmentWalker
}
final Function<SegmentReference, SegmentReference> segmentMapFn =
dataSourceFromQuery.createSegmentMapFunction(
- query,
- new AtomicLong()
+ query
);
final QueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<>(
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java
index e74a0eaefb6..c35dda4cda1 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java
@@ -36,7 +36,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
@@ -126,8 +125,7 @@ public class ExternalDataSource implements DataSource
@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
- Query query,
- AtomicLong cpuTime
+ Query query
)
{
return Function.identity();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]