This is an automated email from the ASF dual-hosted git repository.
atri pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d7aed2e863 Funnel Count - Multiple Strategies (no partitioning
requisites) (#11092)
d7aed2e863 is described below
commit d7aed2e8631ac1c0ee107d520f334f067117fbc1
Author: dario-liberman <[email protected]>
AuthorDate: Wed Aug 30 14:40:07 2023 +0200
Funnel Count - Multiple Strategies (no partitioning requisites) (#11092)
* FUNNEL_COUNT - aggregation strategies
* FUNNEL_COUNT - Aggregation Strategies Tests
* FUNNEL_COUNT - Aggregation Strategies Tests
* Refactor: Move strategy greation into a factory, make funnel count
aggregation function parametric
* Add license headers
* Simplify factory by postponing strategy construction and templetizing
sorted split
* Fix linter errors
---------
Co-authored-by: Dario Liberman <[email protected]>
---
.../function/AggregationFunctionFactory.java | 3 +-
.../function/FunnelCountAggregationFunction.java | 511 ---------------------
.../function/funnel/AggregationStrategy.java | 167 +++++++
.../function/funnel/BitmapAggregationStrategy.java | 44 ++
.../function/funnel/BitmapMergeStrategy.java | 52 +++
.../funnel/BitmapResultExtractionStrategy.java | 85 ++++
.../function/funnel/DictIdsWrapper.java | 36 ++
.../funnel/FunnelCountAggregationFunction.java | 188 ++++++++
.../FunnelCountAggregationFunctionFactory.java | 272 +++++++++++
.../FunnelCountSortedAggregationFunction.java | 129 ++++++
.../aggregation/function/funnel/MergeStrategy.java | 37 ++
.../function/funnel/PartitionedMergeStrategy.java | 50 ++
.../function/funnel/ResultExtractionStrategy.java | 52 +++
.../function/funnel/SetMergeStrategy.java | 52 +++
.../funnel/SetResultExtractionStrategy.java | 94 ++++
.../function/funnel/SortedAggregationResult.java | 67 +++
.../function/funnel/SortedAggregationStrategy.java | 44 ++
.../funnel/ThetaSketchAggregationStrategy.java | 73 +++
.../function/funnel/ThetaSketchMergeStrategy.java | 62 +++
.../ThetaSketchResultExtractionStrategy.java | 45 ++
.../pinot/queries/BaseFunnelCountQueriesTest.java | 41 +-
...Test.java => FunnelCountQueriesBitmapTest.java} | 34 +-
...> FunnelCountQueriesPartitionedSortedTest.java} | 22 +-
...java => FunnelCountQueriesPartitionedTest.java} | 28 +-
...tedTest.java => FunnelCountQueriesSetTest.java} | 34 +-
...java => FunnelCountQueriesThetaSketchTest.java} | 34 +-
26 files changed, 1708 insertions(+), 548 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 0f03ee8723..504a09391c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FunctionContext;
+import
org.apache.pinot.core.query.aggregation.function.funnel.FunnelCountAggregationFunctionFactory;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.exception.BadQueryRequestException;
@@ -352,7 +353,7 @@ public class AggregationFunctionFactory {
throw new IllegalArgumentException(
"Aggregation function: " + function + " is only supported in
selection without alias.");
case FUNNELCOUNT:
- return new FunnelCountAggregationFunction(arguments);
+ return new FunnelCountAggregationFunctionFactory(arguments).get();
default:
throw new IllegalArgumentException();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java
deleted file mode 100644
index 4eecad1002..0000000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java
+++ /dev/null
@@ -1,511 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.aggregation.function;
-
-import com.google.common.base.Preconditions;
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
-import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
-import org.apache.pinot.segment.spi.index.reader.Dictionary;
-import org.roaringbitmap.RoaringBitmap;
-
-
-/**
- * The {@code FunnelCountAggregationFunction} calculates the number of step
conversions for a given partition column and
- * a list of boolean expressions.
- * <p>IMPORTANT: This function relies on the partition column being
partitioned for each segment, where there are no
- * common values across different segments.
- * <p>This function calculates the exact number of step matches per partition
key within the segment, then sums up the
- * results from different segments.
- *
- * Example:
- * SELECT
- * dateTrunc('day', timestamp) AS ts,
- * FUNNEL_COUNT(
- * STEPS(url = '/addToCart', url = '/checkout', url =
'/orderConfirmation'),
- * CORRELATED_BY(user)
- * ) as step_counts
- * FROM user_log
- * WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
- * GROUP BY 1
- */
-public class FunnelCountAggregationFunction implements
AggregationFunction<List<Long>, LongArrayList> {
- final List<ExpressionContext> _expressions;
- final List<ExpressionContext> _stepExpressions;
- final List<ExpressionContext> _correlateByExpressions;
- final ExpressionContext _primaryCorrelationCol;
- final int _numSteps;
-
- final SegmentAggregationStrategy<?, List<Long>> _sortedAggregationStrategy;
- final SegmentAggregationStrategy<?, List<Long>> _bitmapAggregationStrategy;
-
- public FunnelCountAggregationFunction(List<ExpressionContext> expressions) {
- _expressions = expressions;
- _correlateByExpressions =
Option.CORRELATE_BY.getInputExpressions(expressions);
- _primaryCorrelationCol =
Option.CORRELATE_BY.getFirstInputExpression(expressions);
- _stepExpressions = Option.STEPS.getInputExpressions(expressions);
- _numSteps = _stepExpressions.size();
- _sortedAggregationStrategy = new SortedAggregationStrategy();
- _bitmapAggregationStrategy = new BitmapAggregationStrategy();
- }
-
- @Override
- public String getResultColumnName() {
- return getType().getName().toLowerCase() + "(" +
_expressions.stream().map(ExpressionContext::toString)
- .collect(Collectors.joining(",")) + ")";
- }
-
- @Override
- public List<ExpressionContext> getInputExpressions() {
- final List<ExpressionContext> inputs = new ArrayList<>();
- inputs.addAll(_correlateByExpressions);
- inputs.addAll(_stepExpressions);
- return inputs;
- }
-
- @Override
- public AggregationFunctionType getType() {
- return AggregationFunctionType.FUNNELCOUNT;
- }
-
- @Override
- public AggregationResultHolder createAggregationResultHolder() {
- return new ObjectAggregationResultHolder();
- }
-
- @Override
- public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
- return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
- }
-
- @Override
- public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap) {
- getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregate(length,
aggregationResultHolder, blockValSetMap);
- }
-
- @Override
- public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap) {
-
getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregateGroupBySV(length,
groupKeyArray,
- groupByResultHolder, blockValSetMap);
- }
-
- @Override
- public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap) {
-
getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregateGroupByMV(length,
groupKeysArray,
- groupByResultHolder, blockValSetMap);
- }
-
- @Override
- public List<Long> extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
- return
getAggregationStrategyByAggregationResult(aggregationResultHolder.getResult()).extractAggregationResult(
- aggregationResultHolder);
- }
-
- @Override
- public List<Long> extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
- return
getAggregationStrategyByAggregationResult(groupByResultHolder.getResult(groupKey)).extractGroupByResult(
- groupByResultHolder, groupKey);
- }
-
- @Override
- public List<Long> merge(List<Long> a, List<Long> b) {
- int length = a.size();
- Preconditions.checkState(length == b.size(), "The two operand arrays are
not of the same size! provided %s, %s",
- length, b.size());
-
- LongArrayList result = toLongArrayList(a);
- long[] elements = result.elements();
- for (int i = 0; i < length; i++) {
- elements[i] += b.get(i);
- }
- return result;
- }
-
- @Override
- public ColumnDataType getIntermediateResultColumnType() {
- return ColumnDataType.OBJECT;
- }
-
- @Override
- public ColumnDataType getFinalResultColumnType() {
- return ColumnDataType.LONG_ARRAY;
- }
-
- @Override
- public LongArrayList extractFinalResult(List<Long> result) {
- return toLongArrayList(result);
- }
-
- @Override
- public String toExplainString() {
- StringBuilder stringBuilder = new
StringBuilder(getType().getName()).append('(');
- int numArguments = getInputExpressions().size();
- if (numArguments > 0) {
- stringBuilder.append(getInputExpressions().get(0).toString());
- for (int i = 1; i < numArguments; i++) {
- stringBuilder.append(",
").append(getInputExpressions().get(i).toString());
- }
- }
- return stringBuilder.append(')').toString();
- }
-
- private static LongArrayList toLongArrayList(List<Long> longList) {
- return longList instanceof LongArrayList ? ((LongArrayList)
longList).clone() : new LongArrayList(longList);
- }
-
- private int[] getCorrelationIds(Map<ExpressionContext, BlockValSet>
blockValSetMap) {
- return blockValSetMap.get(_primaryCorrelationCol).getDictionaryIdsSV();
- }
-
- private int[][] getSteps(Map<ExpressionContext, BlockValSet> blockValSetMap)
{
- final int[][] steps = new int[_numSteps][];
- for (int n = 0; n < _numSteps; n++) {
- final BlockValSet stepBlockValSet =
blockValSetMap.get(_stepExpressions.get(n));
- steps[n] = stepBlockValSet.getIntValuesSV();
- }
- return steps;
- }
-
- private boolean isSorted(Map<ExpressionContext, BlockValSet> blockValSetMap)
{
- final Dictionary primaryCorrelationDictionary =
blockValSetMap.get(_primaryCorrelationCol).getDictionary();
- if (primaryCorrelationDictionary == null) {
- throw new IllegalArgumentException(
- "CORRELATE_BY column in FUNNELCOUNT aggregation function not
supported, please use a dictionary encoded "
- + "column.");
- }
- return primaryCorrelationDictionary.isSorted();
- }
-
- private SegmentAggregationStrategy<?, List<Long>>
getAggregationStrategyByBlockValSetMap(
- Map<ExpressionContext, BlockValSet> blockValSetMap) {
- return isSorted(blockValSetMap) ? _sortedAggregationStrategy :
_bitmapAggregationStrategy;
- }
-
- private SegmentAggregationStrategy<?, List<Long>>
getAggregationStrategyByAggregationResult(Object aggResult) {
- return aggResult instanceof SortedAggregationResult ?
_sortedAggregationStrategy : _bitmapAggregationStrategy;
- }
-
- enum Option {
- STEPS("steps"),
- CORRELATE_BY("correlateby");
-
- final String _name;
-
- Option(String name) {
- _name = name;
- }
-
- boolean matches(ExpressionContext expression) {
- if (expression.getType() != ExpressionContext.Type.FUNCTION) {
- return false;
- }
- return _name.equals(expression.getFunction().getFunctionName());
- }
-
- Optional<ExpressionContext> find(List<ExpressionContext> expressions) {
- return expressions.stream().filter(this::matches).findFirst();
- }
-
- public List<ExpressionContext> getInputExpressions(List<ExpressionContext>
expressions) {
- return this.find(expressions).map(exp ->
exp.getFunction().getArguments())
- .orElseThrow(() -> new IllegalStateException("FUNNELCOUNT requires "
+ _name));
- }
-
- public ExpressionContext getFirstInputExpression(List<ExpressionContext>
expressions) {
- return this.find(expressions)
- .flatMap(exp ->
exp.getFunction().getArguments().stream().findFirst())
- .orElseThrow(() -> new IllegalStateException("FUNNELCOUNT: " + _name
+ " requires an argument."));
- }
- }
-
- /**
- * Interface for segment aggregation strategy.
- *
- * <p>The implementation should be stateless, and can be shared among
multiple segments in multiple threads. The
- * result for each segment should be stored and passed in via the result
holder.
- * There should be no assumptions beyond segment boundaries, different
aggregation strategies may be utilized
- * across different segments for a given query.
- *
- * @param <A> Aggregation result accumulated across blocks within segment,
kept by result holder.
- * @param <I> Intermediate result at segment level (extracted from
aforementioned aggregation result).
- */
- @ThreadSafe
- static abstract class SegmentAggregationStrategy<A, I> {
-
- /**
- * Returns an aggregation result for this aggregation strategy to be kept
in a result holder (aggregation only).
- */
- abstract A createAggregationResult();
-
- public A getAggregationResultGroupBy(GroupByResultHolder
groupByResultHolder, int groupKey) {
- A aggResult = groupByResultHolder.getResult(groupKey);
- if (aggResult == null) {
- aggResult = createAggregationResult();
- groupByResultHolder.setValueForKey(groupKey, aggResult);
- }
- return aggResult;
- }
-
- public A getAggregationResult(AggregationResultHolder
aggregationResultHolder) {
- A aggResult = aggregationResultHolder.getResult();
- if (aggResult == null) {
- aggResult = createAggregationResult();
- aggregationResultHolder.setValue(aggResult);
- }
- return aggResult;
- }
-
- /**
- * Performs aggregation on the given block value sets (aggregation only).
- */
- abstract void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap);
-
- /**
- * Performs aggregation on the given group key array and block value sets
(aggregation group-by on single-value
- * columns).
- */
- abstract void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap);
-
- /**
- * Performs aggregation on the given group keys array and block value sets
(aggregation group-by on multi-value
- * columns).
- */
- abstract void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap);
-
- /**
- * Extracts the intermediate result from the aggregation result holder
(aggregation only).
- */
- public I extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
- return extractIntermediateResult(aggregationResultHolder.getResult());
- }
-
- /**
- * Extracts the intermediate result from the group-by result holder for
the given group key (aggregation group-by).
- */
- public I extractGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey) {
- return
extractIntermediateResult(groupByResultHolder.getResult(groupKey));
- }
-
- abstract I extractIntermediateResult(A aggregationResult);
- }
-
- /**
- * Aggregation strategy leveraging roaring bitmap algebra
(unions/intersections).
- */
- class BitmapAggregationStrategy extends
SegmentAggregationStrategy<RoaringBitmap[], List<Long>> {
-
- @Override
- public RoaringBitmap[] createAggregationResult() {
- final RoaringBitmap[] stepsBitmaps = new RoaringBitmap[_numSteps];
- for (int n = 0; n < _numSteps; n++) {
- stepsBitmaps[n] = new RoaringBitmap();
- }
- return stepsBitmaps;
- }
-
- @Override
- public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap) {
- final int[] correlationIds = getCorrelationIds(blockValSetMap);
- final int[][] steps = getSteps(blockValSetMap);
-
- final RoaringBitmap[] stepsBitmaps =
getAggregationResult(aggregationResultHolder);
-
- for (int n = 0; n < _numSteps; n++) {
- for (int i = 0; i < length; i++) {
- if (steps[n][i] > 0) {
- stepsBitmaps[n].add(correlationIds[i]);
- }
- }
- }
- }
-
- @Override
- public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap) {
- final int[] correlationIds = getCorrelationIds(blockValSetMap);
- final int[][] steps = getSteps(blockValSetMap);
-
- for (int n = 0; n < _numSteps; n++) {
- for (int i = 0; i < length; i++) {
- final int groupKey = groupKeyArray[i];
- if (steps[n][i] > 0) {
- getAggregationResultGroupBy(groupByResultHolder,
groupKey)[n].add(correlationIds[i]);
- }
- }
- }
- }
-
- @Override
- public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap) {
- final int[] correlationIds = getCorrelationIds(blockValSetMap);
- final int[][] steps = getSteps(blockValSetMap);
-
- for (int n = 0; n < _numSteps; n++) {
- for (int i = 0; i < length; i++) {
- for (int groupKey : groupKeysArray[i]) {
- if (steps[n][i] > 0) {
- getAggregationResultGroupBy(groupByResultHolder,
groupKey)[n].add(correlationIds[i]);
- }
- }
- }
- }
- }
-
- @Override
- public List<Long> extractIntermediateResult(RoaringBitmap[] stepsBitmaps) {
- if (stepsBitmaps == null) {
- return new LongArrayList(_numSteps);
- }
-
- long[] result = new long[_numSteps];
- result[0] = stepsBitmaps[0].getCardinality();
- for (int i = 1; i < _numSteps; i++) {
- // intersect this step with previous step
- stepsBitmaps[i].and(stepsBitmaps[i - 1]);
- result[i] = stepsBitmaps[i].getCardinality();
- }
- return LongArrayList.wrap(result);
- }
- }
-
- /**
- * Aggregation strategy for segments sorted by the main correlation column.
- */
- class SortedAggregationStrategy extends
SegmentAggregationStrategy<SortedAggregationResult, List<Long>> {
-
- @Override
- public SortedAggregationResult createAggregationResult() {
- return new SortedAggregationResult();
- }
-
- @Override
- public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap) {
- final int[] correlationIds = getCorrelationIds(blockValSetMap);
- final int[][] steps = getSteps(blockValSetMap);
-
- final SortedAggregationResult agg =
getAggregationResult(aggregationResultHolder);
-
- for (int i = 0; i < length; i++) {
- agg.sortedCount(steps, i, correlationIds[i]);
- }
- }
-
- @Override
- public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap) {
- final int[] correlationIds = getCorrelationIds(blockValSetMap);
- final int[][] steps = getSteps(blockValSetMap);
-
- for (int i = 0; i < length; i++) {
- final int groupKey = groupKeyArray[i];
- final SortedAggregationResult agg =
getAggregationResultGroupBy(groupByResultHolder, groupKey);
-
- agg.sortedCount(steps, i, correlationIds[i]);
- }
- }
-
- @Override
- public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap) {
- final int[] correlationIds = getCorrelationIds(blockValSetMap);
- final int[][] steps = getSteps(blockValSetMap);
-
- for (int i = 0; i < length; i++) {
- for (int groupKey : groupKeysArray[i]) {
- final SortedAggregationResult agg =
getAggregationResultGroupBy(groupByResultHolder, groupKey);
-
- agg.sortedCount(steps, i, correlationIds[i]);
- }
- }
- }
-
- @Override
- public List<Long> extractIntermediateResult(SortedAggregationResult agg) {
- if (agg == null) {
- return new LongArrayList(_numSteps);
- }
-
- return LongArrayList.wrap(agg.extractResult());
- }
- }
-
- /**
- * Aggregation result data structure leveraged by sorted aggregation
strategy.
- */
- class SortedAggregationResult {
- public long[] _stepCounters = new long[_numSteps];
- public int _lastCorrelationId = Integer.MIN_VALUE;
- public boolean[] _correlatedSteps = new boolean[_numSteps];
-
- public void sortedCount(int[][] steps, int i, int correlationId) {
- if (correlationId == _lastCorrelationId) {
- // same correlation as before, keep accumulating.
- for (int n = 0; n < _numSteps; n++) {
- _correlatedSteps[n] |= steps[n][i] > 0;
- }
- } else {
- // End of correlation group, calculate funnel conversion counts
- incrStepCounters();
-
- // initialize next correlation group
- for (int n = 0; n < _numSteps; n++) {
- _correlatedSteps[n] = steps[n][i] > 0;
- }
- _lastCorrelationId = correlationId;
- }
- }
-
- void incrStepCounters() {
- for (int n = 0; n < _numSteps; n++) {
- if (!_correlatedSteps[n]) {
- break;
- }
- _stepCounters[n]++;
- }
- }
-
- public long[] extractResult() {
- // count last correlation id left open
- incrStepCounters();
-
- return _stepCounters;
- }
- }
-}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/AggregationStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/AggregationStrategy.java
new file mode 100644
index 0000000000..298fd4a805
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/AggregationStrategy.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+
+
+/**
+ * Interface for within segment aggregation strategy.
+ *
+ * <p>The implementation should be stateless, and can be shared among multiple
segments in multiple threads. The
+ * result for each segment should be stored and passed in via the result
holder.
+ * There should be no assumptions beyond segment boundaries, different
aggregation strategies may be utilized
+ * across different segments for a given query.
+ *
+ * @param <A> Aggregation result accumulated across blocks within segment,
kept by result holder.
+ */
+@ThreadSafe
+public abstract class AggregationStrategy<A> {
+
+ protected final int _numSteps;
+ private final List<ExpressionContext> _stepExpressions;
+ private final List<ExpressionContext> _correlateByExpressions;
+ private final ExpressionContext _primaryCorrelationCol;
+
+ public AggregationStrategy(List<ExpressionContext> stepExpressions,
List<ExpressionContext> correlateByExpressions) {
+ _stepExpressions = stepExpressions;
+ _correlateByExpressions = correlateByExpressions;
+ _primaryCorrelationCol = _correlateByExpressions.get(0);
+ _numSteps = _stepExpressions.size();
+ }
+
+ /**
+ * Returns an aggregation result for this aggregation strategy to be kept in
a result holder (aggregation only).
+ */
+ abstract A createAggregationResult(Dictionary dictionary);
+
+ public A getAggregationResultGroupBy(Dictionary dictionary,
GroupByResultHolder groupByResultHolder, int groupKey) {
+ A aggResult = groupByResultHolder.getResult(groupKey);
+ if (aggResult == null) {
+ aggResult = createAggregationResult(dictionary);
+ groupByResultHolder.setValueForKey(groupKey, aggResult);
+ }
+ return aggResult;
+ }
+
+ public A getAggregationResult(Dictionary dictionary, AggregationResultHolder
aggregationResultHolder) {
+ A aggResult = aggregationResultHolder.getResult();
+ if (aggResult == null) {
+ aggResult = createAggregationResult(dictionary);
+ aggregationResultHolder.setValue(aggResult);
+ }
+ return aggResult;
+ }
+
+ /**
+ * Performs aggregation on the given block value sets (aggregation only).
+ */
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ final Dictionary dictionary = getDictionary(blockValSetMap);
+ final int[] correlationIds = getCorrelationIds(blockValSetMap);
+ final int[][] steps = getSteps(blockValSetMap);
+
+ final A aggResult = getAggregationResult(dictionary,
aggregationResultHolder);
+ for (int i = 0; i < length; i++) {
+ for (int n = 0; n < _numSteps; n++) {
+ if (steps[n][i] > 0) {
+ add(dictionary, aggResult, n, correlationIds[i]);
+ }
+ }
+ }
+ }
+
+ /**
+ * Performs aggregation on the given group key array and block value sets
(aggregation group-by on single-value
+ * columns).
+ */
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ final Dictionary dictionary = getDictionary(blockValSetMap);
+ final int[] correlationIds = getCorrelationIds(blockValSetMap);
+ final int[][] steps = getSteps(blockValSetMap);
+
+ for (int i = 0; i < length; i++) {
+ for (int n = 0; n < _numSteps; n++) {
+ final int groupKey = groupKeyArray[i];
+ final A aggResult = getAggregationResultGroupBy(dictionary,
groupByResultHolder, groupKey);
+ if (steps[n][i] > 0) {
+ add(dictionary, aggResult, n, correlationIds[i]);
+ }
+ }
+ }
+ }
+
+ /**
+ * Performs aggregation on the given group keys array and block value sets
(aggregation group-by on multi-value
+ * columns).
+ */
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ final Dictionary dictionary = getDictionary(blockValSetMap);
+ final int[] correlationIds = getCorrelationIds(blockValSetMap);
+ final int[][] steps = getSteps(blockValSetMap);
+
+ for (int i = 0; i < length; i++) {
+ for (int n = 0; n < _numSteps; n++) {
+ for (int groupKey : groupKeysArray[i]) {
+ final A aggResult = getAggregationResultGroupBy(dictionary,
groupByResultHolder, groupKey);
+ if (steps[n][i] > 0) {
+ add(dictionary, aggResult, n, correlationIds[i]);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Adds a correlation id to the aggregation counter for a given step in the
funnel.
+ */
+ abstract void add(Dictionary dictionary, A aggResult, int step, int
correlationId);
+
+ private Dictionary getDictionary(Map<ExpressionContext, BlockValSet>
blockValSetMap) {
+ final Dictionary primaryCorrelationDictionary =
blockValSetMap.get(_primaryCorrelationCol).getDictionary();
+ Preconditions.checkArgument(primaryCorrelationDictionary != null,
+ "CORRELATE_BY column in FUNNELCOUNT aggregation function not
supported, please use a dictionary encoded "
+ + "column.");
+ return primaryCorrelationDictionary;
+ }
+
+ private int[] getCorrelationIds(Map<ExpressionContext, BlockValSet>
blockValSetMap) {
+ return blockValSetMap.get(_primaryCorrelationCol).getDictionaryIdsSV();
+ }
+
+ private int[][] getSteps(Map<ExpressionContext, BlockValSet> blockValSetMap)
{
+ final int[][] steps = new int[_numSteps][];
+ for (int n = 0; n < _numSteps; n++) {
+ final BlockValSet stepBlockValSet =
blockValSetMap.get(_stepExpressions.get(n));
+ steps[n] = stepBlockValSet.getIntValuesSV();
+ }
+ return steps;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapAggregationStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapAggregationStrategy.java
new file mode 100644
index 0000000000..f726d93620
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapAggregationStrategy.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+
+
+/**
+ * Aggregation strategy leveraging roaring bitmap algebra
(unions/intersections).
+ */
+class BitmapAggregationStrategy extends AggregationStrategy<DictIdsWrapper> {
+ public BitmapAggregationStrategy(List<ExpressionContext> stepExpressions,
+ List<ExpressionContext> correlateByExpressions) {
+ super(stepExpressions, correlateByExpressions);
+ }
+
+ @Override
+ public DictIdsWrapper createAggregationResult(Dictionary dictionary) {
+ return new DictIdsWrapper(_numSteps, dictionary);
+ }
+
+ @Override
+ protected void add(Dictionary dictionary, DictIdsWrapper dictIdsWrapper, int
step, int correlationId) {
+ dictIdsWrapper._stepsBitmaps[step].add(correlationId);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapMergeStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapMergeStrategy.java
new file mode 100644
index 0000000000..0be8bbadbe
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapMergeStrategy.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import java.util.List;
+import org.roaringbitmap.RoaringBitmap;
+
+
+class BitmapMergeStrategy implements MergeStrategy<List<RoaringBitmap>> {
+ protected final int _numSteps;
+
+ BitmapMergeStrategy(int numSteps) {
+ _numSteps = numSteps;
+ }
+
+ @Override
+ public List<RoaringBitmap> merge(List<RoaringBitmap> intermediateResult1,
List<RoaringBitmap> intermediateResult2) {
+ for (int i = 0; i < _numSteps; i++) {
+ intermediateResult1.get(i).or(intermediateResult2.get(i));
+ }
+ return intermediateResult1;
+ }
+
+ @Override
+ public LongArrayList extractFinalResult(List<RoaringBitmap> stepsBitmaps) {
+ long[] result = new long[_numSteps];
+ result[0] = stepsBitmaps.get(0).getCardinality();
+ for (int i = 1; i < _numSteps; i++) {
+ // intersect this step with previous step
+ stepsBitmaps.get(i).and(stepsBitmaps.get(i - 1));
+ result[i] = stepsBitmaps.get(i).getCardinality();
+ }
+ return LongArrayList.wrap(result);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapResultExtractionStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapResultExtractionStrategy.java
new file mode 100644
index 0000000000..b60bad7e0c
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapResultExtractionStrategy.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+
+class BitmapResultExtractionStrategy implements
ResultExtractionStrategy<DictIdsWrapper, List<RoaringBitmap>> {
+ protected final int _numSteps;
+
+ BitmapResultExtractionStrategy(int numSteps) {
+ _numSteps = numSteps;
+ }
+
+ @Override
+ public List<RoaringBitmap> extractIntermediateResult(DictIdsWrapper
dictIdsWrapper) {
+ Dictionary dictionary = dictIdsWrapper._dictionary;
+ List<RoaringBitmap> result = new ArrayList<>(_numSteps);
+ for (RoaringBitmap dictIdBitmap : dictIdsWrapper._stepsBitmaps) {
+ result.add(convertToValueBitmap(dictionary, dictIdBitmap));
+ }
+ return result;
+ }
+
+ /**
+ * Helper method to read dictionary and convert dictionary ids to hash code
of the values for dictionary-encoded
+ * expression.
+ */
+ private RoaringBitmap convertToValueBitmap(Dictionary dictionary,
RoaringBitmap dictIdBitmap) {
+ RoaringBitmap valueBitmap = new RoaringBitmap();
+ PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
+ FieldSpec.DataType storedType = dictionary.getValueType();
+ switch (storedType) {
+ case INT:
+ while (iterator.hasNext()) {
+ valueBitmap.add(dictionary.getIntValue(iterator.next()));
+ }
+ break;
+ case LONG:
+ while (iterator.hasNext()) {
+
valueBitmap.add(Long.hashCode(dictionary.getLongValue(iterator.next())));
+ }
+ break;
+ case FLOAT:
+ while (iterator.hasNext()) {
+
valueBitmap.add(Float.hashCode(dictionary.getFloatValue(iterator.next())));
+ }
+ break;
+ case DOUBLE:
+ while (iterator.hasNext()) {
+
valueBitmap.add(Double.hashCode(dictionary.getDoubleValue(iterator.next())));
+ }
+ break;
+ case STRING:
+ while (iterator.hasNext()) {
+
valueBitmap.add(dictionary.getStringValue(iterator.next()).hashCode());
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal data type for FUNNEL_COUNT
aggregation function: " + storedType);
+ }
+ return valueBitmap;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/DictIdsWrapper.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/DictIdsWrapper.java
new file mode 100644
index 0000000000..c09d0128f2
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/DictIdsWrapper.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.roaringbitmap.RoaringBitmap;
+
+
+final class DictIdsWrapper {
+ final Dictionary _dictionary;
+ final RoaringBitmap[] _stepsBitmaps;
+
+ DictIdsWrapper(int numSteps, Dictionary dictionary) {
+ _dictionary = dictionary;
+ _stepsBitmaps = new RoaringBitmap[numSteps];
+ for (int n = 0; n < numSteps; n++) {
+ _stepsBitmaps[n] = new RoaringBitmap();
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountAggregationFunction.java
new file mode 100644
index 0000000000..3c258277db
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountAggregationFunction.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * The {@code FunnelCountAggregationFunction} calculates the number of
conversions for a given correlation column and
+ * a list of steps as boolean expressions.
+ *
+ * @param <A> Aggregation result accumulated across blocks within segment,
kept by result holder.
+ * @param <I> Intermediate result at segment level (extracted from
aforementioned aggregation result).
+ *
+ * Example:
+ * SELECT
+ * dateTrunc('day', timestamp) AS ts,
+ * FUNNEL_COUNT(
+ * STEPS(url = '/addToCart', url = '/checkout', url =
'/orderConfirmation'),
+ * CORRELATE_BY(user_id)
+ * ) as step_counts
+ * FROM user_log
+ * WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
+ * GROUP BY 1
+ *
+ * Counting strategies can be controlled via optional SETTINGS options, for
example:
+ *
+ * FUNNEL_COUNT(
+ * STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
+ * CORRELATE_BY(user_id),
+ * SETTINGS('theta_sketch','nominalEntries=4096')
+ * )
+ *
+ * Please refer to {@link FunnelCountAggregationFunctionFactory} to learn
about counting strategies available.
+ *
+ * @see FunnelCountAggregationFunctionFactory
+ * @see FunnelCountSortedAggregationFunction
+ */
+public class FunnelCountAggregationFunction<A, I> implements
AggregationFunction<I, LongArrayList> {
+ private final List<ExpressionContext> _expressions;
+ private final List<ExpressionContext> _stepExpressions;
+ private final List<ExpressionContext> _correlateByExpressions;
+ private final int _numSteps;
+
+ private final AggregationStrategy<A> _aggregationStrategy;
+ private final ResultExtractionStrategy<A, I> _resultExtractionStrategy;
+ private final MergeStrategy<I> _mergeStrategy;
+
+ public FunnelCountAggregationFunction(List<ExpressionContext> expressions,
List<ExpressionContext> stepExpressions,
+ List<ExpressionContext> correlateByExpressions, AggregationStrategy<A>
aggregationStrategy,
+ ResultExtractionStrategy<A, I> resultExtractionStrategy,
MergeStrategy<I> mergeStrategy) {
+ _expressions = expressions;
+ _stepExpressions = stepExpressions;
+ _correlateByExpressions = correlateByExpressions;
+ _aggregationStrategy = aggregationStrategy;
+ _resultExtractionStrategy = resultExtractionStrategy;
+ _mergeStrategy = mergeStrategy;
+ _numSteps = _stepExpressions.size();
+ }
+
+ @Override
+ public String getResultColumnName() {
+ return getType().getName().toLowerCase() + "(" +
_expressions.stream().map(ExpressionContext::toString)
+ .collect(Collectors.joining(",")) + ")";
+ }
+
+ @Override
+ public List<ExpressionContext> getInputExpressions() {
+ final List<ExpressionContext> inputs = new ArrayList<>();
+ inputs.addAll(_correlateByExpressions);
+ inputs.addAll(_stepExpressions);
+ return inputs;
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.FUNNELCOUNT;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ _aggregationStrategy.aggregate(length, aggregationResultHolder,
blockValSetMap);
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ _aggregationStrategy.aggregateGroupBySV(length, groupKeyArray,
groupByResultHolder, blockValSetMap);
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ _aggregationStrategy.aggregateGroupByMV(length, groupKeysArray,
groupByResultHolder, blockValSetMap);
+ }
+
+ @Override
+ public I extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ return
_resultExtractionStrategy.extractAggregationResult(aggregationResultHolder);
+ }
+
+ @Override
+ public I extractGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey) {
+ return _resultExtractionStrategy.extractGroupByResult(groupByResultHolder,
groupKey);
+ }
+
+ @Override
+ public I merge(I a, I b) {
+ if (a == null) {
+ return b;
+ }
+ if (b == null) {
+ return a;
+ }
+ return _mergeStrategy.merge(a, b);
+ }
+
+ @Override
+ public LongArrayList extractFinalResult(I intermediateResult) {
+ if (intermediateResult == null) {
+ return new LongArrayList(_numSteps);
+ }
+ return _mergeStrategy.extractFinalResult(intermediateResult);
+ }
+
+ @Override
+ public ColumnDataType getIntermediateResultColumnType() {
+ return ColumnDataType.OBJECT;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.LONG_ARRAY;
+ }
+
+ @Override
+ public String toExplainString() {
+ StringBuilder stringBuilder = new
StringBuilder(getType().getName()).append('(');
+ int numArguments = getInputExpressions().size();
+ if (numArguments > 0) {
+ stringBuilder.append(getInputExpressions().get(0).toString());
+ for (int i = 1; i < numArguments; i++) {
+ stringBuilder.append(",
").append(getInputExpressions().get(i).toString());
+ }
+ }
+ return stringBuilder.append(')').toString();
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountAggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountAggregationFunctionFactory.java
new file mode 100644
index 0000000000..dc9c14b3d4
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountAggregationFunctionFactory.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.thetacommon.ThetaUtil;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import
org.apache.pinot.core.query.aggregation.function.DistinctCountAggregationFunction;
+import
org.apache.pinot.core.query.aggregation.function.DistinctCountBitmapAggregationFunction;
+import
org.apache.pinot.core.query.aggregation.function.DistinctCountThetaSketchAggregationFunction;
+import
org.apache.pinot.core.query.aggregation.function.SegmentPartitionedDistinctCountAggregationFunction;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * The {@code FunnelCountAggregationFunctionFactory} builds a {@code
FunnelCountAggregationFunction}.
+ * Primary role is to validate inputs and select the appropriate aggregation
strategy to use based on settings.
+ *
+ * There are 5 strategies available, mirroring the corresponding distinct
count implementations as per below.
+ * <p><ul>
+ * <li>'set': See DISTINCTCOUNT at {@link DistinctCountAggregationFunction}
+ * <li>'bitmap' (default): See DISTINCTCOUNTBITMAP at {@link
DistinctCountBitmapAggregationFunction}
+ * <li>'theta_sketch': See DISTINCTCOUNTTHETASKETCH at {@link
DistinctCountThetaSketchAggregationFunction}
+ * <li>'partitioned': See SEGMENTPARTITIONEDDISTINCTCOUNT {@link
SegmentPartitionedDistinctCountAggregationFunction}
+ * <li>'sorted': sorted counts per segment then sums up. Only availabe in
combination with 'partitioned'.
+ * <li>'nominalEntries=4096': theta sketch configuration, default is 4096.
+ * </ul><p>
+ */
+public class FunnelCountAggregationFunctionFactory implements
Supplier<AggregationFunction> {
+ final List<ExpressionContext> _expressions;
+ final List<ExpressionContext> _stepExpressions;
+ final List<ExpressionContext> _correlateByExpressions;
+ final ExpressionContext _primaryCorrelationCol;
+ final int _numSteps;
+ final int _nominalEntries;
+ final boolean _partitionSetting;
+ final boolean _sortingSetting;
+ final boolean _thetaSketchSetting;
+ final boolean _setSetting;
+
+ public FunnelCountAggregationFunctionFactory(List<ExpressionContext>
expressions) {
+ _expressions = expressions;
+ Option.validate(expressions);
+ _correlateByExpressions =
Option.CORRELATE_BY.getInputExpressions(expressions);
+ _primaryCorrelationCol = _correlateByExpressions.get(0);
+ _stepExpressions = Option.STEPS.getInputExpressions(expressions);
+ _numSteps = _stepExpressions.size();
+
+ final List<String> settings = Option.SETTINGS.getLiterals(expressions);
+ Setting.validate(settings);
+ _setSetting = Setting.SET.isSet(settings);
+ _partitionSetting = Setting.PARTITIONED.isSet(settings);
+ _sortingSetting = Setting.SORTED.isSet(settings);
+ _thetaSketchSetting = Setting.THETA_SKETCH.isSet(settings);
+ _nominalEntries =
Setting.NOMINAL_ENTRIES.getInteger(settings).orElse(ThetaUtil.DEFAULT_NOMINAL_ENTRIES);
+ }
+
+ public AggregationFunction get() {
+ if (_partitionSetting) {
+ if (_thetaSketchSetting) {
+ // theta_sketch && partitioned
+ return
createPartionedFunnelCountAggregationFunction(thetaSketchAggregationStrategy(),
+ thetaSketchPartitionedResultExtractionStrategy(),
partitionedMergeStrategy());
+ } else {
+ // partitioned && !theta_sketch
+ return
createPartionedFunnelCountAggregationFunction(bitmapAggregationStrategy(),
+ bitmapPartitionedResultExtractionStrategy(),
partitionedMergeStrategy());
+ }
+ } else {
+ if (_thetaSketchSetting) {
+ // theta_sketch && !partitioned
+ return
createFunnelCountAggregationFunction(thetaSketchAggregationStrategy(),
+ thetaSketchResultExtractionStrategy(), thetaSketchMergeStrategy());
+ } else if (_setSetting) {
+ // set && !partitioned && !theta_sketch
+ return
createFunnelCountAggregationFunction(bitmapAggregationStrategy(),
setResultExtractionStrategy(),
+ setMergeStrategy());
+ } else {
+ // default (bitmap)
+ // !partitioned && !theta_sketch && !set
+ return
createFunnelCountAggregationFunction(bitmapAggregationStrategy(),
bitmapResultExtractionStrategy(),
+ bitmapMergeStrategy());
+ }
+ }
+ }
+
+ private <A, I> FunnelCountAggregationFunction<A, I>
createFunnelCountAggregationFunction(
+ AggregationStrategy<A> aggregationStrategy, ResultExtractionStrategy<A,
I> resultExtractionStrategy,
+ MergeStrategy<I> mergeStrategy) {
+ return new FunnelCountAggregationFunction<>(_expressions,
_stepExpressions, _correlateByExpressions,
+ aggregationStrategy, resultExtractionStrategy, mergeStrategy);
+ }
+
+ private <A> FunnelCountAggregationFunction<A, List<Long>>
createPartionedFunnelCountAggregationFunction(
+ AggregationStrategy<A> aggregationStrategy, ResultExtractionStrategy<A,
List<Long>> resultExtractionStrategy,
+ MergeStrategy<List<Long>> mergeStrategy) {
+ if (_sortingSetting) {
+ return new FunnelCountSortedAggregationFunction<>(_expressions,
_stepExpressions, _correlateByExpressions,
+ aggregationStrategy, resultExtractionStrategy, mergeStrategy);
+ } else {
+ return new FunnelCountAggregationFunction<>(_expressions,
_stepExpressions, _correlateByExpressions,
+ aggregationStrategy, resultExtractionStrategy, mergeStrategy);
+ }
+ }
+
+ AggregationStrategy<UpdateSketch[]> thetaSketchAggregationStrategy() {
+ return new ThetaSketchAggregationStrategy(_stepExpressions,
_correlateByExpressions, _nominalEntries);
+ }
+
+ AggregationStrategy<DictIdsWrapper> bitmapAggregationStrategy() {
+ return new BitmapAggregationStrategy(_stepExpressions,
_correlateByExpressions);
+ }
+
+ MergeStrategy<List<Sketch>> thetaSketchMergeStrategy() {
+ return new ThetaSketchMergeStrategy(_numSteps, _nominalEntries);
+ }
+
+ MergeStrategy<List<Set>> setMergeStrategy() {
+ return new SetMergeStrategy(_numSteps);
+ }
+
+ MergeStrategy<List<RoaringBitmap>> bitmapMergeStrategy() {
+ return new BitmapMergeStrategy(_numSteps);
+ }
+
+ MergeStrategy<List<Long>> partitionedMergeStrategy() {
+ return new PartitionedMergeStrategy(_numSteps);
+ }
+
+ ResultExtractionStrategy<UpdateSketch[], List<Sketch>>
thetaSketchResultExtractionStrategy() {
+ return new ThetaSketchResultExtractionStrategy(_numSteps);
+ }
+
+ ResultExtractionStrategy<DictIdsWrapper, List<Set>>
setResultExtractionStrategy() {
+ return new SetResultExtractionStrategy(_numSteps);
+ }
+
+ ResultExtractionStrategy<DictIdsWrapper, List<RoaringBitmap>>
bitmapResultExtractionStrategy() {
+ return new BitmapResultExtractionStrategy(_numSteps);
+ }
+
+ ResultExtractionStrategy<DictIdsWrapper, List<Long>>
bitmapPartitionedResultExtractionStrategy() {
+ final MergeStrategy<List<RoaringBitmap>> bitmapMergeStrategy =
bitmapMergeStrategy();
+ return dictIdsWrapper ->
bitmapMergeStrategy.extractFinalResult(Arrays.asList(dictIdsWrapper._stepsBitmaps));
+ }
+
+ ResultExtractionStrategy<UpdateSketch[], List<Long>>
thetaSketchPartitionedResultExtractionStrategy() {
+ final MergeStrategy<List<Sketch>> thetaSketchMergeStrategy =
thetaSketchMergeStrategy();
+ return sketches ->
thetaSketchMergeStrategy.extractFinalResult(Arrays.asList(sketches));
+ }
+
+ enum Option {
+ STEPS("steps"), CORRELATE_BY("correlateby"), SETTINGS("settings");
+
+ final String _name;
+
+ Option(String name) {
+ _name = name;
+ }
+
+ public static void validate(List<ExpressionContext> expressions) {
+ final List<String> invalidOptions = expressions.stream()
+ .filter(expression ->
!Arrays.stream(Option.values()).anyMatch(option -> option.matches(expression)))
+ .map(ExpressionContext::toString).collect(Collectors.toList());
+
+ if (!invalidOptions.isEmpty()) {
+ throw new IllegalArgumentException("Invalid FUNNELCOUNT options: " +
String.join(", ", invalidOptions));
+ }
+ }
+
+ boolean matches(ExpressionContext expression) {
+ if (expression.getType() != ExpressionContext.Type.FUNCTION) {
+ return false;
+ }
+ return _name.equals(expression.getFunction().getFunctionName());
+ }
+
+ Optional<ExpressionContext> find(List<ExpressionContext> expressions) {
+ return expressions.stream().filter(this::matches).findFirst();
+ }
+
+ public List<ExpressionContext> getInputExpressions(List<ExpressionContext>
expressions) {
+ final List<ExpressionContext> inputExpressions =
+ this.find(expressions).map(exp -> exp.getFunction().getArguments())
+ .orElseThrow(() -> new IllegalArgumentException("FUNNELCOUNT
requires " + _name));
+ Preconditions.checkArgument(!inputExpressions.isEmpty(), "FUNNELCOUNT: "
+ _name + " requires an argument.");
+ return inputExpressions;
+ }
+
+ public List<String> getLiterals(List<ExpressionContext> expressions) {
+ List<ExpressionContext> inputExpressions =
+ find(expressions).map(exp ->
exp.getFunction().getArguments()).orElseGet(Collections::emptyList);
+ Preconditions.checkArgument(
+ inputExpressions.stream().allMatch(exp -> exp.getType() ==
ExpressionContext.Type.LITERAL),
+ "FUNNELCOUNT: " + _name + " parameters must be literals");
+ return inputExpressions.stream().map(exp ->
exp.getLiteral().getStringValue()).collect(Collectors.toList());
+ }
+ }
+
+ enum Setting {
+ SET("set"),
+ BITMAP("bitmap"),
+ PARTITIONED("partitioned"),
+ SORTED("sorted"),
+ THETA_SKETCH("theta_sketch"),
+ NOMINAL_ENTRIES("nominalEntries");
+
+ private static final char KEY_VALUE_SEPARATOR = '=';
+ final String _name;
+
+ Setting(String name) {
+ _name = name.toLowerCase();
+ }
+
+ public static void validate(List<String> settings) {
+ final List<String> invalidSettings = settings.stream().filter(param ->
!Arrays.stream(Setting.values())
+ .anyMatch(setting -> setting.matchesKV(param) ||
setting.matches(param))).collect(Collectors.toList());
+
+ if (!invalidSettings.isEmpty()) {
+ throw new IllegalArgumentException("Invalid FUNNELCOUNT SETTINGS: " +
String.join(", ", invalidSettings));
+ }
+ }
+
+ boolean matchesKV(String setting) {
+ return
StringUtils.deleteWhitespace(setting).toLowerCase().startsWith(_name +
KEY_VALUE_SEPARATOR);
+ }
+
+ boolean matches(String setting) {
+ return StringUtils.deleteWhitespace(setting).toLowerCase().equals(_name);
+ }
+
+ public Optional<String> getString(List<String> settings) {
+ return settings.stream().filter(this::matchesKV).findFirst()
+ .map(setting -> setting.substring(_name.length() + 1));
+ }
+
+ public Optional<Integer> getInteger(List<String> settings) {
+ return getString(settings).map(Integer::parseInt);
+ }
+
+ public boolean isSet(List<String> settings) {
+ return settings.stream().anyMatch(this::matches) ||
getString(settings).map(Boolean::parseBoolean).orElse(false);
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountSortedAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountSortedAggregationFunction.java
new file mode 100644
index 0000000000..ac39461cef
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountSortedAggregationFunction.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+
+
+/**
+ * The {@code FunnelCountSortedAggregationFunction} calculates the number of
conversions for a given correlation column
+ * and a list of steps as boolean expressions.
+ * It leverages a more efficient counting strategy for segments sorted by
correlate_by column, falls back to a regular
+ * counting strategy for unsorted segments (e.g. uncommitted segments).
+ *
+ * Example:
+ * SELECT
+ * dateTrunc('day', timestamp) AS ts,
+ * FUNNEL_COUNT(
+ * STEPS(url = '/addToCart', url = '/checkout', url =
'/orderConfirmation'),
+ * CORRELATE_BY(user_id),
+ * SETTINGS('partitioned','sorted')
+ * ) as step_counts
+ * FROM user_log
+ * WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
+ * GROUP BY 1
+ *
+ */
+public class FunnelCountSortedAggregationFunction<A> extends
FunnelCountAggregationFunction<A, List<Long>> {
+ private final ExpressionContext _primaryCorrelationCol;
+ private final AggregationStrategy<SortedAggregationResult>
_sortedAggregationStrategy;
+ private final ResultExtractionStrategy<SortedAggregationResult, List<Long>>
_sortedResultExtractionStrategy;
+
+ public FunnelCountSortedAggregationFunction(List<ExpressionContext>
expressions,
+ List<ExpressionContext> stepExpressions, List<ExpressionContext>
correlateByExpressions,
+ AggregationStrategy<A> aggregationStrategy, ResultExtractionStrategy<A,
List<Long>> resultExtractionStrategy,
+ MergeStrategy<List<Long>> mergeStrategy) {
+ super(expressions, stepExpressions, correlateByExpressions,
aggregationStrategy, resultExtractionStrategy,
+ mergeStrategy);
+ _sortedAggregationStrategy = new
SortedAggregationStrategy(stepExpressions, correlateByExpressions);
+ _sortedResultExtractionStrategy = SortedAggregationResult::extractResult;;
+ _primaryCorrelationCol = correlateByExpressions.get(0);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ if (isSortedDictionary(blockValSetMap)) {
+ _sortedAggregationStrategy.aggregate(length, aggregationResultHolder,
blockValSetMap);
+ } else {
+ super.aggregate(length, aggregationResultHolder, blockValSetMap);
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ if (isSortedDictionary(blockValSetMap)) {
+ _sortedAggregationStrategy.aggregateGroupBySV(length, groupKeyArray,
groupByResultHolder, blockValSetMap);
+ } else {
+ super.aggregateGroupBySV(length, groupKeyArray, groupByResultHolder,
blockValSetMap);
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ if (isSortedDictionary(blockValSetMap)) {
+ _sortedAggregationStrategy.aggregateGroupByMV(length, groupKeysArray,
groupByResultHolder, blockValSetMap);
+ } else {
+ super.aggregateGroupByMV(length, groupKeysArray, groupByResultHolder,
blockValSetMap);
+ }
+ }
+
+ @Override
+ public List<Long> extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ if (isSortedAggResult(aggregationResultHolder.getResult())) {
+ return
_sortedResultExtractionStrategy.extractAggregationResult(aggregationResultHolder);
+ } else {
+ return super.extractAggregationResult(aggregationResultHolder);
+ }
+ }
+
+ @Override
+ public List<Long> extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ if (isSortedAggResult(groupByResultHolder.getResult(groupKey))) {
+ return
_sortedResultExtractionStrategy.extractGroupByResult(groupByResultHolder,
groupKey);
+ } else {
+ return super.extractGroupByResult(groupByResultHolder, groupKey);
+ }
+ }
+
+ private boolean isSortedDictionary(Map<ExpressionContext, BlockValSet>
blockValSetMap) {
+ return getDictionary(blockValSetMap).isSorted();
+ }
+
+ private boolean isSortedAggResult(Object aggResult) {
+ return aggResult instanceof SortedAggregationResult;
+ }
+
+ private Dictionary getDictionary(Map<ExpressionContext, BlockValSet>
blockValSetMap) {
+ final Dictionary primaryCorrelationDictionary =
blockValSetMap.get(_primaryCorrelationCol).getDictionary();
+ Preconditions.checkArgument(primaryCorrelationDictionary != null,
+ "CORRELATE_BY column in FUNNELCOUNT aggregation function not supported
for sorted setting, "
+ + "please use a dictionary encoded column.");
+ return primaryCorrelationDictionary;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/MergeStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/MergeStrategy.java
new file mode 100644
index 0000000000..47eaa57e6f
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/MergeStrategy.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import javax.annotation.concurrent.ThreadSafe;
+
+
+/**
+ * Interface for cross-segment merge strategy.
+ *
+ * <p>The implementation should be stateless, and can be shared among multiple
segments in multiple threads.
+ *
+ * @param <I> Intermediate result at segment level (extracted from aggregation
strategy result).
+ */
+@ThreadSafe
+interface MergeStrategy<I> {
+ I merge(I intermediateResult1, I intermediateResult2);
+
+ LongArrayList extractFinalResult(I intermediateResult);
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/PartitionedMergeStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/PartitionedMergeStrategy.java
new file mode 100644
index 0000000000..15ea02e090
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/PartitionedMergeStrategy.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import java.util.List;
+
+
+class PartitionedMergeStrategy implements MergeStrategy<List<Long>> {
+ protected final int _numSteps;
+
+ PartitionedMergeStrategy(int numSteps) {
+ _numSteps = numSteps;
+ }
+
+ @Override
+ public List<Long> merge(List<Long> a, List<Long> b) {
+ LongArrayList result = toLongArrayList(a);
+ long[] elements = result.elements();
+ for (int i = 0; i < _numSteps; i++) {
+ elements[i] += b.get(i);
+ }
+ return result;
+ }
+
+ @Override
+ public LongArrayList extractFinalResult(List<Long> intermediateResult) {
+ return toLongArrayList(intermediateResult);
+ }
+
+ private LongArrayList toLongArrayList(List<Long> longList) {
+ return longList instanceof LongArrayList ? ((LongArrayList)
longList).clone() : new LongArrayList(longList);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ResultExtractionStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ResultExtractionStrategy.java
new file mode 100644
index 0000000000..7c35251640
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ResultExtractionStrategy.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+
+
+/**
+ * Interface for segment aggregation result extraction strategy.
+ *
+ * <p>The implementation should be stateless, and can be shared among multiple
segments in multiple threads.
+ *
+ * @param <A> Aggregation result accumulated across blocks within segment,
kept by result holder.
+ * @param <I> Intermediate result at segment level (extracted from
aforementioned aggregation result).
+ */
+@ThreadSafe
+interface ResultExtractionStrategy<A, I> {
+
+ /**
+ * Extracts the intermediate result from the aggregation result holder
(aggregation only).
+ */
+ default I extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ return extractIntermediateResult(aggregationResultHolder.getResult());
+ }
+
+ /**
+ * Extracts the intermediate result from the group-by result holder for the
given group key (aggregation group-by).
+ */
+ default I extractGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey) {
+ return extractIntermediateResult(groupByResultHolder.getResult(groupKey));
+ }
+
+ I extractIntermediateResult(A aggregationResult);
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SetMergeStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SetMergeStrategy.java
new file mode 100644
index 0000000000..d5ae4bc987
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SetMergeStrategy.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import java.util.List;
+import java.util.Set;
+
+
+class SetMergeStrategy implements MergeStrategy<List<Set>> {
+ protected final int _numSteps;
+
+ SetMergeStrategy(int numSteps) {
+ _numSteps = numSteps;
+ }
+
+ @Override
+ public List<Set> merge(List<Set> intermediateResult1, List<Set>
intermediateResult2) {
+ for (int i = 0; i < _numSteps; i++) {
+ intermediateResult1.get(i).addAll(intermediateResult2.get(i));
+ }
+ return intermediateResult1;
+ }
+
+ @Override
+ public LongArrayList extractFinalResult(List<Set> stepsSets) {
+ long[] result = new long[_numSteps];
+ result[0] = stepsSets.get(0).size();
+ for (int i = 1; i < _numSteps; i++) {
+ // intersect this step with previous step
+ stepsSets.get(i).retainAll(stepsSets.get(i - 1));
+ result[i] = stepsSets.get(i).size();
+ }
+ return LongArrayList.wrap(result);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SetResultExtractionStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SetResultExtractionStrategy.java
new file mode 100644
index 0000000000..09ad7adad8
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SetResultExtractionStrategy.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * Aggregation strategy leveraging set algebra (unions/intersections).
+ */
+class SetResultExtractionStrategy implements
ResultExtractionStrategy<DictIdsWrapper, List<Set>> {
+ protected final int _numSteps;
+
+ SetResultExtractionStrategy(int numSteps) {
+ _numSteps = numSteps;
+ }
+
+ @Override
+ public List<Set> extractIntermediateResult(DictIdsWrapper dictIdsWrapper) {
+ Dictionary dictionary = dictIdsWrapper._dictionary;
+ List<Set> result = new ArrayList<>(_numSteps);
+ for (RoaringBitmap dictIdBitmap : dictIdsWrapper._stepsBitmaps) {
+ result.add(convertToValueSet(dictionary, dictIdBitmap));
+ }
+ return result;
+ }
+
+ private Set convertToValueSet(Dictionary dictionary, RoaringBitmap
dictIdBitmap) {
+ int numValues = dictIdBitmap.getCardinality();
+ PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
+ FieldSpec.DataType storedType = dictionary.getValueType();
+ switch (storedType) {
+ case INT:
+ IntOpenHashSet intSet = new IntOpenHashSet(numValues);
+ while (iterator.hasNext()) {
+ intSet.add(dictionary.getIntValue(iterator.next()));
+ }
+ return intSet;
+ case LONG:
+ LongOpenHashSet longSet = new LongOpenHashSet(numValues);
+ while (iterator.hasNext()) {
+ longSet.add(dictionary.getLongValue(iterator.next()));
+ }
+ return longSet;
+ case FLOAT:
+ FloatOpenHashSet floatSet = new FloatOpenHashSet(numValues);
+ while (iterator.hasNext()) {
+ floatSet.add(dictionary.getFloatValue(iterator.next()));
+ }
+ return floatSet;
+ case DOUBLE:
+ DoubleOpenHashSet doubleSet = new DoubleOpenHashSet(numValues);
+ while (iterator.hasNext()) {
+ doubleSet.add(dictionary.getDoubleValue(iterator.next()));
+ }
+ return doubleSet;
+ case STRING:
+ ObjectOpenHashSet<String> stringSet = new
ObjectOpenHashSet<>(numValues);
+ while (iterator.hasNext()) {
+ stringSet.add(dictionary.getStringValue(iterator.next()));
+ }
+ return stringSet;
+ default:
+ throw new IllegalArgumentException("Illegal data type for FUNNEL_COUNT
aggregation function: " + storedType);
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SortedAggregationResult.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SortedAggregationResult.java
new file mode 100644
index 0000000000..eb773eac7e
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SortedAggregationResult.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+
+
+/**
+ * Aggregation result data structure leveraged by sorted aggregation strategy.
+ */
+class SortedAggregationResult {
+ final int _numSteps;
+ final long[] _stepCounters;
+ final boolean[] _correlatedSteps;
+ int _lastCorrelationId = Integer.MIN_VALUE;
+
+ SortedAggregationResult(int numSteps) {
+ _numSteps = numSteps;
+ _stepCounters = new long[_numSteps];
+ _correlatedSteps = new boolean[_numSteps];
+ }
+
+ public void add(int step, int correlationId) {
+ if (correlationId != _lastCorrelationId) {
+ // End of correlation group, calculate funnel conversion counts
+ incrStepCounters();
+
+ // initialize next correlation group
+ for (int n = 0; n < _numSteps; n++) {
+ _correlatedSteps[n] = false;
+ }
+ _lastCorrelationId = correlationId;
+ }
+ _correlatedSteps[step] = true;
+ }
+
+ void incrStepCounters() {
+ for (int n = 0; n < _numSteps; n++) {
+ if (!_correlatedSteps[n]) {
+ break;
+ }
+ _stepCounters[n]++;
+ }
+ }
+
+ public LongArrayList extractResult() {
+ // count last correlation id left open
+ incrStepCounters();
+ return LongArrayList.wrap(_stepCounters);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SortedAggregationStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SortedAggregationStrategy.java
new file mode 100644
index 0000000000..533d8723a7
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SortedAggregationStrategy.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+
+
+/**
+ * Aggregation strategy for segments partitioned and sorted by the main
correlation column.
+ */
+class SortedAggregationStrategy extends
AggregationStrategy<SortedAggregationResult> {
+ public SortedAggregationStrategy(List<ExpressionContext> stepExpressions,
+ List<ExpressionContext> correlateByExpressions) {
+ super(stepExpressions, correlateByExpressions);
+ }
+
+ @Override
+ public SortedAggregationResult createAggregationResult(Dictionary
dictionary) {
+ return new SortedAggregationResult(_numSteps);
+ }
+
+ @Override
+ void add(Dictionary dictionary, SortedAggregationResult aggResult, int step,
int correlationId) {
+ aggResult.add(step, correlationId);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchAggregationStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchAggregationStrategy.java
new file mode 100644
index 0000000000..a2ac25f867
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchAggregationStrategy.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import java.util.List;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.theta.UpdateSketchBuilder;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+
+
+/**
+ * Aggregation strategy leveraging theta sketch algebra (unions/intersections).
+ */
+class ThetaSketchAggregationStrategy extends
AggregationStrategy<UpdateSketch[]> {
+ final UpdateSketchBuilder _updateSketchBuilder;
+
+ public ThetaSketchAggregationStrategy(List<ExpressionContext>
stepExpressions,
+ List<ExpressionContext> correlateByExpressions, int nominalEntries) {
+ super(stepExpressions, correlateByExpressions);
+ _updateSketchBuilder = new
UpdateSketchBuilder().setNominalEntries(nominalEntries);
+ }
+
+ @Override
+ public UpdateSketch[] createAggregationResult(Dictionary dictionary) {
+ final UpdateSketch[] stepsSketches = new UpdateSketch[_numSteps];
+ for (int n = 0; n < _numSteps; n++) {
+ stepsSketches[n] = _updateSketchBuilder.build();
+ }
+ return stepsSketches;
+ }
+
+ @Override
+ void add(Dictionary dictionary, UpdateSketch[] stepsSketches, int step, int
correlationId) {
+ final UpdateSketch sketch = stepsSketches[step];
+ switch (dictionary.getValueType()) {
+ case INT:
+ sketch.update(dictionary.getIntValue(correlationId));
+ break;
+ case LONG:
+ sketch.update(dictionary.getLongValue(correlationId));
+ break;
+ case FLOAT:
+ sketch.update(dictionary.getFloatValue(correlationId));
+ break;
+ case DOUBLE:
+ sketch.update(dictionary.getDoubleValue(correlationId));
+ break;
+ case STRING:
+ sketch.update(dictionary.getStringValue(correlationId));
+ break;
+ default:
+ throw new IllegalStateException("Illegal CORRELATED_BY column data
type for FUNNEL_COUNT aggregation function: "
+ + dictionary.getValueType());
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchMergeStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchMergeStrategy.java
new file mode 100644
index 0000000000..80378d7078
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchMergeStrategy.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.datasketches.theta.Intersection;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
+
+
+class ThetaSketchMergeStrategy implements MergeStrategy<List<Sketch>> {
+ protected final int _numSteps;
+ final SetOperationBuilder _setOperationBuilder;
+
+ ThetaSketchMergeStrategy(int numSteps, int nominalEntries) {
+ _numSteps = numSteps;
+ _setOperationBuilder = new
SetOperationBuilder().setNominalEntries(nominalEntries);
+ }
+
+ @Override
+ public List<Sketch> merge(List<Sketch> sketches1, List<Sketch> sketches2) {
+ final List<Sketch> mergedSketches = new ArrayList<>(_numSteps);
+ for (int i = 0; i < _numSteps; i++) {
+ // NOTE: Compact the sketch in unsorted, on-heap fashion for performance
concern.
+ // See https://datasketches.apache.org/docs/Theta/ThetaSize.html
for more details.
+
mergedSketches.add(_setOperationBuilder.buildUnion().union(sketches1.get(i),
sketches2.get(i), false, null));
+ }
+ return mergedSketches;
+ }
+
+ @Override
+ public LongArrayList extractFinalResult(List<Sketch> sketches) {
+ long[] result = new long[_numSteps];
+
+ Sketch sketch = sketches.get(0);
+ result[0] = Math.round(sketch.getEstimate());
+ for (int i = 1; i < _numSteps; i++) {
+ Intersection intersection = _setOperationBuilder.buildIntersection();
+ sketch = intersection.intersect(sketch, sketches.get(i));
+ result[i] = Math.round(sketch.getEstimate());
+ }
+ return LongArrayList.wrap(result);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchResultExtractionStrategy.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchResultExtractionStrategy.java
new file mode 100644
index 0000000000..2c794271bf
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchResultExtractionStrategy.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.theta.UpdateSketchBuilder;
+
+
+class ThetaSketchResultExtractionStrategy implements
ResultExtractionStrategy<UpdateSketch[], List<Sketch>> {
+ private static final Sketch EMPTY_SKETCH = new
UpdateSketchBuilder().build().compact();
+
+ protected final int _numSteps;
+
+ ThetaSketchResultExtractionStrategy(int numSteps) {
+ _numSteps = numSteps;
+ }
+
+ @Override
+ public List<Sketch> extractIntermediateResult(UpdateSketch[] stepsSketches) {
+ if (stepsSketches == null) {
+ return Collections.nCopies(_numSteps, EMPTY_SKETCH);
+ }
+ return Arrays.asList(stepsSketches);
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseFunnelCountQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseFunnelCountQueriesTest.java
index ef5c7d596f..c1a966a9b5 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseFunnelCountQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseFunnelCountQueriesTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.queries;
-import it.unimi.dsi.fastutil.longs.LongArrayList;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -87,8 +86,12 @@ abstract public class BaseFunnelCountQueriesTest extends
BaseQueriesTest {
private List<IndexSegment> _indexSegments;
protected abstract int getExpectedNumEntriesScannedInFilter();
+ protected abstract int getExpectedInterSegmentMultiplier();
protected abstract TableConfig getTableConfig();
protected abstract IndexSegment buildSegment(List<GenericRow> records)
throws Exception;
+ protected abstract void assertIntermediateResult(Object intermediateResult,
long[] expectedCounts);
+
+ protected abstract String getSettings();
@Override
protected String getFilter() {
@@ -132,13 +135,16 @@ abstract public class BaseFunnelCountQueriesTest extends
BaseQueriesTest {
return records;
}
+ private String getFunnelCountSql() {
+ return "FUNNEL_COUNT( "
+ + "STEPS(stepColumn = 'A', stepColumn = 'B'), "
+ + "CORRELATE_BY(idColumn), "
+ + getSettings()
+ + ") ";
+ }
@Test
public void testAggregationOnly() {
- String query = String.format("SELECT "
- + "FUNNEL_COUNT("
- + " STEPS(stepColumn = 'A', stepColumn = 'B'),"
- + " CORRELATE_BY(idColumn)"
- + ") FROM testTable");
+ String query = String.format("SELECT " + getFunnelCountSql() + "FROM
testTable");
// Inner segment
Predicate<Integer> filter = id -> id >= FILTER_LIMIT;
@@ -157,13 +163,11 @@ abstract public class BaseFunnelCountQueriesTest extends
BaseQueriesTest {
List<Object> aggregationResult = resultsBlock.getResults();
assertNotNull(aggregationResult);
assertEquals(aggregationResult.size(), 1);
- for (int i = 0; i < 2; i++) {
- assertEquals(((LongArrayList) aggregationResult.get(0)).getLong(i),
expectedResult[i]);
- }
+ assertIntermediateResult(aggregationResult.get(0), expectedResult);
- // Inter segments (expect 4 * inner segment result)
+ // Inter segments
for (int i = 0; i < 2; i++) {
- expectedResult[i] = 4 * expectedResult[i];
+ expectedResult[i] = expectedResult[i] *
getExpectedInterSegmentMultiplier();
}
Object[] expectedResults = { expectedResult };
@@ -176,10 +180,8 @@ abstract public class BaseFunnelCountQueriesTest extends
BaseQueriesTest {
public void testAggregationGroupBy() {
String query = String.format("SELECT "
+ "MOD(idColumn, %s), "
- + "FUNNEL_COUNT("
- + " STEPS(stepColumn = 'A', stepColumn = 'B'),"
- + " CORRELATE_BY(idColumn)"
- + ") FROM testTable "
+ + getFunnelCountSql()
+ + "FROM testTable "
+ "WHERE idColumn >= %s "
+ "GROUP BY 1 ORDER BY 1 LIMIT %s", NUM_GROUPS, FILTER_LIMIT,
NUM_GROUPS);
@@ -219,19 +221,20 @@ abstract public class BaseFunnelCountQueriesTest extends
BaseQueriesTest {
numGroups++;
GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
int key = ((Double) groupKey._keys[0]).intValue();
- assertEquals(aggregationGroupByResult.getResultForGroupId(0,
groupKey._groupId),
- new LongArrayList(expectedResult[key]));
+ assertIntermediateResult(
+ aggregationGroupByResult.getResultForGroupId(0, groupKey._groupId),
+ expectedResult[key]);
}
assertEquals(numGroups, expectedNumGroups);
- // Inter segments (expect 4 * inner segment result)
+ // Inter segments
List<Object[]> expectedRows = new ArrayList<>();
for (int i = 0; i < NUM_GROUPS; i++) {
if (expectedResult[i] == null) {
continue;
}
for (int step = 0; step < 2; step++) {
- expectedResult[i][step] = 4 * expectedResult[i][step];
+ expectedResult[i][step] = expectedResult[i][step] *
getExpectedInterSegmentMultiplier();
}
Object[] expectedRow = { Double.valueOf(i), expectedResult[i] };
expectedRows.add(expectedRow);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesBitmapTest.java
similarity index 58%
copy from
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
copy to
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesBitmapTest.java
index c89a5d74c9..1e36838f3b 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesBitmapTest.java
@@ -25,19 +25,33 @@ import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.roaringbitmap.RoaringBitmap;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
/**
* Queries test for FUNNEL_COUNT queries.
*/
@SuppressWarnings("rawtypes")
-public class FunnelCountQueriesNonSortedTest extends
BaseFunnelCountQueriesTest {
+public class FunnelCountQueriesBitmapTest extends BaseFunnelCountQueriesTest {
+
+ @Override
+ protected String getSettings() {
+ return "SETTINGS('bitmap')";
+ }
@Override
protected int getExpectedNumEntriesScannedInFilter() {
return NUM_RECORDS;
}
+ @Override
+ protected int getExpectedInterSegmentMultiplier() {
+ return 1;
+ }
+
@Override
protected TableConfig getTableConfig() {
return TABLE_CONFIG_BUILDER.build();
@@ -46,12 +60,24 @@ public class FunnelCountQueriesNonSortedTest extends
BaseFunnelCountQueriesTest
@Override
protected IndexSegment buildSegment(List<GenericRow> records)
throws Exception {
- MutableSegment mutableSegment = MutableSegmentImplTestUtils
- .createMutableSegmentImpl(SCHEMA, Collections.emptySet(),
Collections.emptySet(), Collections.emptySet(),
- false);
+ MutableSegment mutableSegment =
+ MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA,
Collections.emptySet(), Collections.emptySet(),
+ Collections.emptySet(), false);
for (GenericRow record : records) {
mutableSegment.index(record, null);
}
return mutableSegment;
}
+
+ @Override
+ protected void assertIntermediateResult(Object intermediateResult, long[]
expectedCounts) {
+ assertTrue(intermediateResult instanceof List);
+ List<RoaringBitmap> bitmaps = (List<RoaringBitmap>) intermediateResult;
+ // First step should match
+ assertEquals(Math.round(bitmaps.get(0).getCardinality()),
expectedCounts[0]);
+ for (int i = 1; i < bitmaps.size(); i++) {
+ // Sets are yet to be intersected, we check that they are at least the
size of the expected counts at this stage.
+ assertTrue(Math.round(bitmaps.get(i).getCardinality()) >=
expectedCounts[i]);
+ }
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesSortedTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesPartitionedSortedTest.java
similarity index 79%
rename from
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesSortedTest.java
rename to
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesPartitionedSortedTest.java
index f06fe26637..a453886050 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesSortedTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesPartitionedSortedTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.queries;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
import java.io.File;
import java.util.Comparator;
import java.util.List;
@@ -30,18 +31,31 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.ReadMode;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
/**
* Queries test for FUNNEL_COUNT queries using sorted strategy.
*/
@SuppressWarnings("rawtypes")
-public class FunnelCountQueriesSortedTest extends BaseFunnelCountQueriesTest {
+public class FunnelCountQueriesPartitionedSortedTest extends
BaseFunnelCountQueriesTest {
+
+ @Override
+ protected String getSettings() {
+ return "SETTINGS('partitioned', 'sorted')";
+ }
@Override
protected int getExpectedNumEntriesScannedInFilter() {
return 0;
}
+ @Override
+ protected int getExpectedInterSegmentMultiplier() {
+ return 4;
+ }
+
@Override
protected TableConfig getTableConfig() {
return TABLE_CONFIG_BUILDER.setSortedColumn(ID_COLUMN).build();
@@ -62,4 +76,10 @@ public class FunnelCountQueriesSortedTest extends
BaseFunnelCountQueriesTest {
driver.build();
return ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME),
ReadMode.mmap);
}
+
+ @Override
+ protected void assertIntermediateResult(Object intermediateResult, long[]
expectedCounts) {
+ assertTrue(intermediateResult instanceof LongArrayList);
+ assertEquals(((LongArrayList) intermediateResult).elements(),
expectedCounts);
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesPartitionedTest.java
similarity index 66%
copy from
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
copy to
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesPartitionedTest.java
index c89a5d74c9..da3f436b58 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesPartitionedTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.queries;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
import java.util.Collections;
import java.util.List;
import
org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImplTestUtils;
@@ -26,18 +27,31 @@ import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
/**
* Queries test for FUNNEL_COUNT queries.
*/
@SuppressWarnings("rawtypes")
-public class FunnelCountQueriesNonSortedTest extends
BaseFunnelCountQueriesTest {
+public class FunnelCountQueriesPartitionedTest extends
BaseFunnelCountQueriesTest {
+
+ @Override
+ protected String getSettings() {
+ return "SETTINGS('partitioned')";
+ }
@Override
protected int getExpectedNumEntriesScannedInFilter() {
return NUM_RECORDS;
}
+ @Override
+ protected int getExpectedInterSegmentMultiplier() {
+ return 4;
+ }
+
@Override
protected TableConfig getTableConfig() {
return TABLE_CONFIG_BUILDER.build();
@@ -46,12 +60,18 @@ public class FunnelCountQueriesNonSortedTest extends
BaseFunnelCountQueriesTest
@Override
protected IndexSegment buildSegment(List<GenericRow> records)
throws Exception {
- MutableSegment mutableSegment = MutableSegmentImplTestUtils
- .createMutableSegmentImpl(SCHEMA, Collections.emptySet(),
Collections.emptySet(), Collections.emptySet(),
- false);
+ MutableSegment mutableSegment =
+ MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA,
Collections.emptySet(), Collections.emptySet(),
+ Collections.emptySet(), false);
for (GenericRow record : records) {
mutableSegment.index(record, null);
}
return mutableSegment;
}
+
+ @Override
+ protected void assertIntermediateResult(Object intermediateResult, long[]
expectedCounts) {
+ assertTrue(intermediateResult instanceof LongArrayList);
+ assertEquals(((LongArrayList) intermediateResult).elements(),
expectedCounts);
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesSetTest.java
similarity index 60%
copy from
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
copy to
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesSetTest.java
index c89a5d74c9..b9361bc618 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesSetTest.java
@@ -20,24 +20,38 @@ package org.apache.pinot.queries;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import
org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImplTestUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
/**
* Queries test for FUNNEL_COUNT queries.
*/
@SuppressWarnings("rawtypes")
-public class FunnelCountQueriesNonSortedTest extends
BaseFunnelCountQueriesTest {
+public class FunnelCountQueriesSetTest extends BaseFunnelCountQueriesTest {
+
+ @Override
+ protected String getSettings() {
+ return "SETTINGS('set')";
+ }
@Override
protected int getExpectedNumEntriesScannedInFilter() {
return NUM_RECORDS;
}
+ @Override
+ protected int getExpectedInterSegmentMultiplier() {
+ return 1;
+ }
+
@Override
protected TableConfig getTableConfig() {
return TABLE_CONFIG_BUILDER.build();
@@ -46,12 +60,24 @@ public class FunnelCountQueriesNonSortedTest extends
BaseFunnelCountQueriesTest
@Override
protected IndexSegment buildSegment(List<GenericRow> records)
throws Exception {
- MutableSegment mutableSegment = MutableSegmentImplTestUtils
- .createMutableSegmentImpl(SCHEMA, Collections.emptySet(),
Collections.emptySet(), Collections.emptySet(),
- false);
+ MutableSegment mutableSegment =
+ MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA,
Collections.emptySet(), Collections.emptySet(),
+ Collections.emptySet(), false);
for (GenericRow record : records) {
mutableSegment.index(record, null);
}
return mutableSegment;
}
+
+ @Override
+ protected void assertIntermediateResult(Object intermediateResult, long[]
expectedCounts) {
+ assertTrue(intermediateResult instanceof List);
+ List<Set> sets = (List<Set>) intermediateResult;
+ // First step should match
+ assertEquals(Math.round(sets.get(0).size()), expectedCounts[0]);
+ for (int i = 1; i < sets.size(); i++) {
+ // Sets are yet to be intersected, we check that they are at least the
size of the expected counts at this stage.
+ assertTrue(Math.round(sets.get(i).size()) >= expectedCounts[i]);
+ }
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesThetaSketchTest.java
similarity index 58%
rename from
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
rename to
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesThetaSketchTest.java
index c89a5d74c9..ef55153280 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesThetaSketchTest.java
@@ -20,24 +20,33 @@ package org.apache.pinot.queries;
import java.util.Collections;
import java.util.List;
+import org.apache.datasketches.theta.Sketch;
import
org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImplTestUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
/**
* Queries test for FUNNEL_COUNT queries.
*/
@SuppressWarnings("rawtypes")
-public class FunnelCountQueriesNonSortedTest extends
BaseFunnelCountQueriesTest {
+public class FunnelCountQueriesThetaSketchTest extends
BaseFunnelCountQueriesTest {
@Override
protected int getExpectedNumEntriesScannedInFilter() {
return NUM_RECORDS;
}
+ @Override
+ protected int getExpectedInterSegmentMultiplier() {
+ return 1;
+ }
+
@Override
protected TableConfig getTableConfig() {
return TABLE_CONFIG_BUILDER.build();
@@ -46,12 +55,29 @@ public class FunnelCountQueriesNonSortedTest extends
BaseFunnelCountQueriesTest
@Override
protected IndexSegment buildSegment(List<GenericRow> records)
throws Exception {
- MutableSegment mutableSegment = MutableSegmentImplTestUtils
- .createMutableSegmentImpl(SCHEMA, Collections.emptySet(),
Collections.emptySet(), Collections.emptySet(),
- false);
+ MutableSegment mutableSegment =
+ MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA,
Collections.emptySet(), Collections.emptySet(),
+ Collections.emptySet(), false);
for (GenericRow record : records) {
mutableSegment.index(record, null);
}
return mutableSegment;
}
+
+ @Override
+ protected void assertIntermediateResult(Object intermediateResult, long[]
expectedCounts) {
+ assertTrue(intermediateResult instanceof List);
+ List<Sketch> sketches = (List<Sketch>) intermediateResult;
+ // First step should match
+ assertEquals(Math.round(sketches.get(0).getEstimate()), expectedCounts[0]);
+ for (int i = 1; i < sketches.size(); i++) {
+ // Sets are yet to be intersected, we check that they are at least the
size of the expected counts at this stage.
+ assertTrue(Math.round(sketches.get(i).getEstimate()) >=
expectedCounts[i]);
+ }
+ }
+
+ @Override
+ protected String getSettings() {
+ return "SETTINGS('theta_sketch')";
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]