This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 767e3cddd6 Set groupsTrimmed result flag when data is trimmed. (#16220)
767e3cddd6 is described below
commit 767e3cddd6472bb12307af3e78f82d471be1559d
Author: Bolek Ziobrowski <[email protected]>
AuthorDate: Wed Jul 2 14:19:16 2025 +0200
Set groupsTrimmed result flag when data is trimmed. (#16220)
---
.../apache/pinot/broker/querylog/QueryLogger.java | 6 +
.../requesthandler/BaseBrokerRequestHandler.java | 1 +
.../BaseSingleStageBrokerRequestHandler.java | 5 +-
.../MultiStageBrokerRequestHandler.java | 6 +
.../pinot/broker/querylog/QueryLoggerTest.java | 1 +
.../org/apache/pinot/client/ExecutionStats.java | 6 +
.../apache/pinot/client/ExecutionStatsTest.java | 11 +-
.../apache/pinot/common/datatable/DataTable.java | 5 +-
.../apache/pinot/common/metrics/BrokerMeter.java | 4 +
.../apache/pinot/common/metrics/ServerMeter.java | 6 +
.../pinot/common/response/BrokerResponse.java | 5 +
.../response/broker/BrokerResponseNative.java | 12 +-
.../response/broker/BrokerResponseNativeV2.java | 12 +-
.../response/broker/CursorResponseNative.java | 2 +
.../apache/pinot/core/data/table/IndexedTable.java | 7 +
.../blocks/results/GroupByResultsBlock.java | 12 +
.../operator/combine/GroupByCombineOperator.java | 9 +
.../operator/query/FilteredGroupByOperator.java | 4 +
.../pinot/core/operator/query/GroupByOperator.java | 4 +
.../query/reduce/ExecutionStatsAggregator.java | 3 +
.../core/query/reduce/GroupByDataTableReducer.java | 4 +
.../core/query/request/context/QueryContext.java | 47 ++
...erSegmentAggregationSingleValueQueriesTest.java | 30 +
.../GroupByEnableTrimOptionIntegrationTest.java | 2 +-
.../tests/GroupByOptionsIntegrationTest.java | 39 +-
.../tests/GroupByTrimmingIntegrationTest.java | 789 +++++++++++++++++++++
.../query/runtime/operator/AggregateOperator.java | 5 +
.../pinot/query/runtime/operator/LeafOperator.java | 4 +
.../query/runtime/operator/MultiStageOperator.java | 1 +
.../operator/MultistageGroupByExecutor.java | 17 +-
.../pinot/spi/trace/DefaultRequestContext.java | 11 +
.../org/apache/pinot/spi/trace/RequestContext.java | 4 +
32 files changed, 1060 insertions(+), 14 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
index 5edcc24570..61359e2b3f 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
@@ -245,6 +245,12 @@ public class QueryLogger {
.append(params._response.getNumServersQueried());
}
},
+ GROUPS_TRIMMED("groupsTrimmed") {
+ @Override
+ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams
params) {
+ builder.append(params._response.isGroupsTrimmed());
+ }
+ },
GROUP_LIMIT_REACHED("groupLimitReached") {
@Override
void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams
params) {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index e990fc3945..895102dc61 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -302,6 +302,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
}
statistics.setProcessingExceptions(processingExceptions);
statistics.setNumExceptions(numExceptions);
+ statistics.setGroupsTrimmed(response.isGroupsTrimmed());
statistics.setNumGroupsLimitReached(response.isNumGroupsLimitReached());
statistics.setProcessingTimeMillis(response.getTimeUsedMs());
statistics.setNumDocsScanned(response.getNumDocsScanned());
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index e3184f83d0..b30a3f7a9a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -811,6 +811,9 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
_brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED,
1);
}
+ if (brokerResponse.isGroupsTrimmed()) {
+ _brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.BROKER_RESPONSES_WITH_GROUPS_TRIMMED, 1);
+ }
// server returns STRING as default dataType for all columns in (some)
scenarios where no rows are returned
// this is an attempt to return more faithful information based on other
sources
@@ -1904,7 +1907,7 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
* TODO: come up with other criteria for forcing a log and come up with
better numbers
*/
private boolean forceLog(BrokerResponse brokerResponse, long totalTimeMs) {
- if (brokerResponse.isNumGroupsLimitReached()) {
+ if (brokerResponse.isNumGroupsLimitReached() ||
brokerResponse.isGroupsTrimmed()) {
return true;
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 78513ef01c..81bfe277d8 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -595,6 +595,12 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
}
}
+ if (brokerResponse.isGroupsTrimmed()) {
+ for (String table : tableNames) {
+ _brokerMetrics.addMeteredTableValue(table,
BrokerMeter.BROKER_RESPONSES_WITH_GROUPS_TRIMMED, 1);
+ }
+ }
+
// Set total query processing time
// TODO: Currently we don't emit metric for QUERY_TOTAL_TIME_MS
long totalTimeMs = System.currentTimeMillis() -
requestContext.getRequestArrivalTimeMillis();
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
index 141ff3949e..456b99ec8a 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
@@ -104,6 +104,7 @@ public class QueryLoggerTest {
+ ":5/6/7/8/9/10/21,"
+ "consumingFreshnessTimeMs=11,"
+ "servers=12/13,"
+ + "groupsTrimmed=false,"
+ "groupLimitReached=false,"
+ "groupWarningLimitReached=false,"
+ "brokerReduceTimeMs=20,"
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java
index 61b0029714..b2d2ef7919 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java
@@ -43,6 +43,7 @@ public class ExecutionStats {
private static final String NUM_CONSUMING_SEGMENTS_QUERIED =
"numConsumingSegmentsQueried";
private static final String MIN_CONSUMING_FRESHNESS_TIME_MS =
"minConsumingFreshnessTimeMs";
private static final String TOTAL_DOCS = "totalDocs";
+ private static final String GROUPS_TRIMMED = "groupsTrimmed";
private static final String NUM_GROUPS_LIMIT_REACHED =
"numGroupsLimitReached";
private static final String NUM_GROUPS_WARNING_LIMIT_REACHED =
"numGroupsWarningLimitReached";
private static final String BROKER_REDUCE_TIME_MS = "brokerReduceTimeMs";
@@ -112,6 +113,10 @@ public class ExecutionStats {
return _brokerResponse.has(NUM_GROUPS_LIMIT_REACHED) &&
_brokerResponse.get(NUM_GROUPS_LIMIT_REACHED).asBoolean();
}
+ public boolean isGroupsTrimmed() {
+ return _brokerResponse.has(GROUPS_TRIMMED) &&
_brokerResponse.get(GROUPS_TRIMMED).asBoolean();
+ }
+
public boolean isNumGroupsWarningLimitReached() {
return _brokerResponse.has(NUM_GROUPS_WARNING_LIMIT_REACHED)
&& _brokerResponse.get(NUM_GROUPS_WARNING_LIMIT_REACHED).asBoolean();
@@ -143,6 +148,7 @@ public class ExecutionStats {
map.put(NUM_CONSUMING_SEGMENTS_QUERIED, getNumConsumingSegmentsQueried());
map.put(MIN_CONSUMING_FRESHNESS_TIME_MS, getMinConsumingFreshnessTimeMs()
+ "ms");
map.put(TOTAL_DOCS, getTotalDocs());
+ map.put(GROUPS_TRIMMED, isGroupsTrimmed());
map.put(NUM_GROUPS_LIMIT_REACHED, isNumGroupsLimitReached());
map.put(NUM_GROUPS_WARNING_LIMIT_REACHED,
isNumGroupsWarningLimitReached());
map.put(BROKER_REDUCE_TIME_MS, getBrokerReduceTimeMs() + "ms");
diff --git
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java
index 583a28e5ff..3757566c83 100644
---
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java
+++
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java
@@ -43,7 +43,7 @@ public class ExecutionStatsTest {
+ "\"numEntriesScannedInFilter\":10,
\"numEntriesScannedPostFilter\":10, \"numSegmentsQueried\":10, "
+ "\"numSegmentsProcessed\":10, \"numSegmentsMatched\":10,
\"numConsumingSegmentsQueried\":10, "
+ "\"minConsumingFreshnessTimeMs\":10, \"totalDocs\":10,
\"numGroupsLimitReached\":true, "
- + "\"timeUsedMs\":10}";
+ + "\"timeUsedMs\":10, \"groupsTrimmed\": true}";
_mockBrokerResponse = JsonUtils.stringToJsonNode(json);
_executionStatsUnderTest = new ExecutionStats(_mockBrokerResponse);
}
@@ -156,6 +156,15 @@ public class ExecutionStatsTest {
assertTrue(result);
}
+ @Test
+ public void testIsGroupsTrimmed() {
+ // Run the test
+ final boolean result = _executionStatsUnderTest.isGroupsTrimmed();
+
+ // Verify the results
+ assertTrue(result);
+ }
+
@Test
public void testGetTimeUsedMs() {
// Run the test
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index bd39b76fd9..9f4040084f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -146,11 +146,12 @@ public interface DataTable {
THREAD_MEM_ALLOCATED_BYTES(36, "threadMemAllocatedBytes",
MetadataValueType.LONG),
RESPONSE_SER_MEM_ALLOCATED_BYTES(37, "responseSerMemAllocatedBytes",
MetadataValueType.LONG),
// NOTE: for server after release 1.3.0 this flag is always set to true
since servers now perform sorting
- SORTED(38, "sorted", MetadataValueType.BOOLEAN);
+ SORTED(38, "sorted", MetadataValueType.BOOLEAN),
+ GROUPS_TRIMMED(39, "groupsTrimmed", MetadataValueType.STRING);
// We keep this constant to track the max id added so far for backward
compatibility.
// Increase it when adding new keys, but NEVER DECREASE IT!!!
- private static final int MAX_ID = 38;
+ private static final int MAX_ID = GROUPS_TRIMMED.getId();
private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new
MetadataKey[MAX_ID + 1];
private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new
HashMap<>();
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 9c0010b474..a87784a554 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -152,6 +152,10 @@ public class BrokerMeter implements AbstractMetrics.Meter {
public static final BrokerMeter
SECONDARY_WORKLOAD_BROKER_RESPONSES_WITH_TIMEOUTS = create(
"SECONDARY_WORKLOAD_BROKER_RESPONSES_WITH_TIMEOUTS", "badResponses",
false);
+ // This metric track the number of broker responses with trimmed groups
(potential bad responses).
+ public static final BrokerMeter BROKER_RESPONSES_WITH_GROUPS_TRIMMED =
create(
+ "BROKER_RESPONSES_WITH_GROUPS_TRIMMED", "badResponses", false);
+
// This metric track the number of broker responses with number of groups
limit reached (potential bad responses).
public static final BrokerMeter
BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED = create(
"BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED", "badResponses", false);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 1c56532b55..8a73029ac2 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -150,6 +150,12 @@ public enum ServerMeter implements AbstractMetrics.Meter {
* But if a single query has 2 different join operators and each one reaches
the limit, this will be increased by 2.
*/
HASH_JOIN_TIMES_MAX_ROWS_REACHED("times", true),
+ /**
+ * Number of times group by results were trimmed.
+ * It is increased in one by each worker that reaches the limit within the
stage.
+ * That means that if a stage has 10 workers and all of them reach the
limit, this will be increased by 10.
+ */
+ AGGREGATE_TIMES_GROUPS_TRIMMED("times", true),
/**
* Number of times the max number of groups has been reached.
* It is increased in one by each worker that reaches the limit within the
stage.
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index dfecbdf808..0672b7d847 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -126,6 +126,11 @@ public interface BrokerResponse {
return getExceptions().size();
}
+ /**
+ * Returns whether groups were trimmed (reduced in size after sorting).
+ */
+ boolean isGroupsTrimmed();
+
/**
* Returns whether the number of groups limit has been reached.
*/
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 60a035ead6..23cbbdcf40 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -56,7 +56,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
"explainPlanNumEmptyFilterSegments",
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
"offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes",
"offlineResponseSerMemAllocatedBytes",
"realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes",
"realtimeTotalMemAllocatedBytes",
- "pools", "rlsFiltersApplied"
+ "pools", "rlsFiltersApplied", "groupsTrimmed"
})
@JsonIgnoreProperties(ignoreUnknown = true)
public class BrokerResponseNative implements BrokerResponse {
@@ -72,6 +72,7 @@ public class BrokerResponseNative implements BrokerResponse {
private ResultTable _resultTable;
private int _numRowsResultSet = 0;
private List<QueryProcessingException> _exceptions = new ArrayList<>();
+ private boolean _groupsTrimmed = false;
private boolean _numGroupsLimitReached = false;
private boolean _numGroupsWarningLimitReached = false;
private long _timeUsedMs = 0L;
@@ -205,6 +206,15 @@ public class BrokerResponseNative implements
BrokerResponse {
_exceptions.add(exception);
}
+ @Override
+ public boolean isGroupsTrimmed() {
+ return _groupsTrimmed;
+ }
+
+ public void setGroupsTrimmed(boolean groupsTrimmed) {
+ _groupsTrimmed = groupsTrimmed;
+ }
+
@Override
public boolean isNumGroupsLimitReached() {
return _numGroupsLimitReached;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 0926a811c0..d8e10f3745 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -51,7 +51,7 @@ import org.apache.pinot.common.response.ProcessingException;
"explainPlanNumEmptyFilterSegments",
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
"offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes",
"offlineResponseSerMemAllocatedBytes",
"realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes",
"realtimeTotalMemAllocatedBytes",
- "pools", "rlsFiltersApplied"
+ "pools", "rlsFiltersApplied", "groupsTrimmed"
})
public class BrokerResponseNativeV2 implements BrokerResponse {
private final StatMap<StatKey> _brokerStats = new StatMap<>(StatKey.class);
@@ -126,11 +126,20 @@ public class BrokerResponseNativeV2 implements
BrokerResponse {
addException(new QueryProcessingException(exception.getErrorCode(),
exception.getMessage()));
}
+ @Override
+ public boolean isGroupsTrimmed() {
+ return _brokerStats.getBoolean(StatKey.GROUPS_TRIMMED);
+ }
+
@Override
public boolean isNumGroupsLimitReached() {
return _brokerStats.getBoolean(StatKey.NUM_GROUPS_LIMIT_REACHED);
}
+ public void mergeGroupsTrimmed(boolean groupsTrimmed) {
+ _brokerStats.merge(StatKey.GROUPS_TRIMMED, groupsTrimmed);
+ }
+
public void mergeNumGroupsLimitReached(boolean numGroupsLimitReached) {
_brokerStats.merge(StatKey.NUM_GROUPS_LIMIT_REACHED,
numGroupsLimitReached);
}
@@ -444,6 +453,7 @@ public class BrokerResponseNativeV2 implements
BrokerResponse {
NUM_SEGMENTS_PRUNED_INVALID(StatMap.Type.INT),
NUM_SEGMENTS_PRUNED_BY_LIMIT(StatMap.Type.INT),
NUM_SEGMENTS_PRUNED_BY_VALUE(StatMap.Type.INT),
+ GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
index 5a5d2516e8..e1ba761767 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.response.CursorResponse;
"offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
"offlineResponseSerializationCpuTimeNs",
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
"realtimeTotalCpuTimeNs",
"explainPlanNumEmptyFilterSegments",
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tableQueries",
+ "groupsTrimmed",
// Fields specific to CursorResponse
"offset", "numRows", "cursorResultWriteTimeMs", "cursorFetchTimeMs",
"submissionTimeMs", "expirationTimeMs",
"brokerHost", "brokerPort", "bytesWritten"
@@ -57,6 +58,7 @@ public class CursorResponseNative extends
BrokerResponseNative implements Cursor
setResultTable(response.getResultTable());
setNumRowsResultSet(response.getNumRowsResultSet());
setExceptions(response.getExceptions());
+ setGroupsTrimmed(response.isGroupsTrimmed());
setNumGroupsLimitReached(response.isNumGroupsLimitReached());
setNumGroupsWarningLimitReached(response.isNumGroupsWarningLimitReached());
setTimeUsedMs(response.getTimeUsedMs());
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
index d96854ccb5..9ce27567d0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
@@ -270,6 +270,13 @@ public abstract class IndexedTable extends BaseTable {
return _numResizes;
}
+ public boolean isTrimmed() {
+ // single resize occurs on finish() if there's orderBy
+ // all other re-sizes are triggered by trim size and threshold
+ int min = _topRecords != null && _hasOrderBy ? 1 : 0;
+ return _numResizes > min;
+ }
+
public long getResizeTimeMs() {
return TimeUnit.NANOSECONDS.toMillis(_resizeTimeNs);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index 425bb2052c..48ff191c2d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -61,6 +61,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
private final Table _table;
private final QueryContext _queryContext;
+ private boolean _groupsTrimmed;
private boolean _numGroupsLimitReached;
private boolean _numGroupsWarningLimitReached;
private int _numResizes;
@@ -142,6 +143,14 @@ public class GroupByResultsBlock extends BaseResultsBlock {
_numGroupsLimitReached = numGroupsLimitReached;
}
+ public boolean isGroupsTrimmed() {
+ return _groupsTrimmed;
+ }
+
+ public void setGroupsTrimmed(boolean groupsTrimmed) {
+ _groupsTrimmed = groupsTrimmed;
+ }
+
public boolean isNumGroupsWarningLimitReached() {
return _numGroupsWarningLimitReached;
}
@@ -336,6 +345,9 @@ public class GroupByResultsBlock extends BaseResultsBlock {
@Override
public Map<String, String> getResultsMetadata() {
Map<String, String> metadata = super.getResultsMetadata();
+ if (_groupsTrimmed) {
+ metadata.put(MetadataKey.GROUPS_TRIMMED.getName(), "true");
+ }
if (_numGroupsLimitReached) {
metadata.put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true");
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index fb34278b12..18e67b9899 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -66,6 +66,7 @@ public class GroupByCombineOperator extends
BaseSingleBlockCombineOperator<Group
private final CountDownLatch _operatorLatch;
private volatile IndexedTable _indexedTable;
+ private volatile boolean _groupsTrimmed;
private volatile boolean _numGroupsLimitReached;
private volatile boolean _numGroupsWarningLimitReached;
@@ -120,6 +121,9 @@ public class GroupByCombineOperator extends
BaseSingleBlockCombineOperator<Group
}
}
+ if (resultsBlock.isGroupsTrimmed()) {
+ _groupsTrimmed = true;
+ }
// Set groups limit reached flag.
if (resultsBlock.isNumGroupsLimitReached()) {
_numGroupsLimitReached = true;
@@ -222,6 +226,10 @@ public class GroupByCombineOperator extends
BaseSingleBlockCombineOperator<Group
return new ExceptionResultsBlock(errMsg);
}
+ if (_indexedTable.isTrimmed() && _queryContext.isUnsafeTrim()) {
+ _groupsTrimmed = true;
+ }
+
IndexedTable indexedTable = _indexedTable;
if (_queryContext.isServerReturnFinalResult()) {
indexedTable.finish(true, true);
@@ -231,6 +239,7 @@ public class GroupByCombineOperator extends
BaseSingleBlockCombineOperator<Group
indexedTable.finish(false);
}
GroupByResultsBlock mergedBlock = new GroupByResultsBlock(indexedTable,
_queryContext);
+ mergedBlock.setGroupsTrimmed(_groupsTrimmed);
mergedBlock.setNumGroupsLimitReached(_numGroupsLimitReached);
mergedBlock.setNumGroupsWarningLimitReached(_numGroupsWarningLimitReached);
mergedBlock.setNumResizes(indexedTable.getNumResizes());
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
index c17e64d8de..1afda13ca6 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
@@ -195,7 +195,11 @@ public class FilteredGroupByOperator extends
BaseOperator<GroupByResultsBlock> {
TableResizer tableResizer = new TableResizer(_dataSchema,
_queryContext);
Collection<IntermediateRecord> intermediateRecords =
tableResizer.trimInSegmentResults(groupKeyGenerator,
groupByResultHolders, trimSize);
+
+
ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_GROUPS_TRIMMED,
1);
+ boolean unsafeTrim = _queryContext.isUnsafeTrim(); // set trim flag
only if it's not safe
GroupByResultsBlock resultsBlock = new
GroupByResultsBlock(_dataSchema, intermediateRecords, _queryContext);
+ resultsBlock.setGroupsTrimmed(unsafeTrim);
resultsBlock.setNumGroupsLimitReached(numGroupsLimitReached);
resultsBlock.setNumGroupsWarningLimitReached(numGroupsWarningLimitReached);
return resultsBlock;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
index 33072d5fd4..d28de96107 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
@@ -143,7 +143,11 @@ public class GroupByOperator extends
BaseOperator<GroupByResultsBlock> {
if (groupByExecutor.getNumGroups() > trimSize) {
TableResizer tableResizer = new TableResizer(_dataSchema,
_queryContext);
Collection<IntermediateRecord> intermediateRecords =
groupByExecutor.trimGroupByResult(trimSize, tableResizer);
+
+
ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_GROUPS_TRIMMED,
1);
+ boolean unsafeTrim = _queryContext.isUnsafeTrim(); // set trim flag
only if it's not safe
GroupByResultsBlock resultsBlock = new
GroupByResultsBlock(_dataSchema, intermediateRecords, _queryContext);
+ resultsBlock.setGroupsTrimmed(unsafeTrim);
resultsBlock.setNumGroupsLimitReached(numGroupsLimitReached);
resultsBlock.setNumGroupsWarningLimitReached(numGroupsWarningLimitReached);
return resultsBlock;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
index 02f08661e9..bb49e0543d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -70,6 +70,7 @@ public class ExecutionStatsAggregator {
private long _numSegmentsPrunedByValue = 0L;
private long _explainPlanNumEmptyFilterSegments = 0L;
private long _explainPlanNumMatchAllFilterSegments = 0L;
+ private boolean _groupsTrimmed = false;
private boolean _numGroupsLimitReached = false;
private boolean _numGroupsWarningLimitReached = false;
@@ -228,6 +229,7 @@ public class ExecutionStatsAggregator {
_numTotalDocs += Long.parseLong(numTotalDocsString);
}
+ _groupsTrimmed |=
Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.GROUPS_TRIMMED.getName()));
_numGroupsLimitReached |=
Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
_numGroupsWarningLimitReached |=
@@ -252,6 +254,7 @@ public class ExecutionStatsAggregator {
brokerResponseNative.setNumSegmentsProcessed(_numSegmentsProcessed);
brokerResponseNative.setNumSegmentsMatched(_numSegmentsMatched);
brokerResponseNative.setTotalDocs(_numTotalDocs);
+ brokerResponseNative.setGroupsTrimmed(_groupsTrimmed);
brokerResponseNative.setNumGroupsLimitReached(_numGroupsLimitReached);
brokerResponseNative.setNumGroupsWarningLimitReached(_numGroupsWarningLimitReached);
brokerResponseNative.setOfflineThreadCpuTimeNs(_offlineThreadCpuTimeNs);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 2072e937de..c5647c1b27 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -145,6 +145,10 @@ public class GroupByDataTableReducer implements
DataTableReducer {
throws TimeoutException {
// NOTE: This step will modify the data schema and also return final
aggregate results.
IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables,
reducerContext);
+ if (indexedTable.isTrimmed() && _queryContext.isUnsafeTrim()) {
+ brokerResponseNative.setGroupsTrimmed(true);
+ }
+
if (brokerMetrics != null) {
brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
brokerMetrics.addValueToTableGauge(rawTableName,
BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
index 484868f7ce..6c8076091e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.request.context;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -164,6 +165,48 @@ public class QueryContext {
_explain = explain;
}
+ private boolean isSameOrderAndGroupByColumns(QueryContext context) {
+ List<ExpressionContext> groupByKeys = context.getGroupByExpressions();
+ List<OrderByExpressionContext> orderByKeys =
context.getOrderByExpressions();
+
+ if (groupByKeys == null || groupByKeys.isEmpty()) {
+ return orderByKeys == null || orderByKeys.isEmpty();
+ } else if (orderByKeys == null || orderByKeys.isEmpty()) {
+ return false;
+ }
+
+ BitSet orderByKeysMatched = new BitSet(orderByKeys.size());
+
+ OUTER_GROUP:
+ for (ExpressionContext groupExp : groupByKeys) {
+ for (int i = 0; i < orderByKeys.size(); i++) {
+ OrderByExpressionContext orderExp = orderByKeys.get(i);
+ if (groupExp.equals(orderExp.getExpression())) {
+ orderByKeysMatched.set(i);
+ continue OUTER_GROUP;
+ }
+ }
+
+ return false;
+ }
+
+ OUTER_ORDER:
+ for (int i = 0, n = orderByKeys.size(); i < n; i++) {
+ if (orderByKeysMatched.get(i)) {
+ continue;
+ }
+
+ for (ExpressionContext groupExp : groupByKeys) {
+ if (groupExp.equals(orderByKeys.get(i).getExpression())) {
+ continue OUTER_ORDER;
+ }
+ }
+ return false;
+ }
+
+ return true;
+ }
+
/**
* Returns the table name.
* NOTE: on the broker side, table name might be {@code null} when subquery
is available.
@@ -520,6 +563,10 @@ public class QueryContext {
return isIndexUseAllowed(dataSource.getColumnName(), indexType);
}
+ public boolean isUnsafeTrim() {
+ return !isSameOrderAndGroupByColumns(this) || getHavingFilter() != null;
+ }
+
public static class Builder {
private String _tableName;
private QueryContext _subquery;
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
index 6fcb909374..b5fd08dda4 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
@@ -774,6 +774,36 @@ public class InterSegmentAggregationSingleValueQueriesTest
extends BaseSingleVal
assertTrue(brokerResponse.isNumGroupsLimitReached());
}
+ @Test
+ public void testGroupsTrimmedAtSegmentLevel() {
+ String query = "SELECT COUNT(*) FROM testTable GROUP BY column1, column3
ORDER BY column1";
+
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ assertFalse(brokerResponse.isGroupsTrimmed());
+
+ InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+ planMaker.setMinSegmentGroupTrimSize(5);
+ brokerResponse = getBrokerResponse(query, planMaker);
+
+ assertTrue(brokerResponse.isGroupsTrimmed());
+ }
+
+ @Test
+ public void testGroupsTrimmedAtServerLevel() {
+ String query = "SELECT COUNT(*) FROM testTable GROUP BY column1, column3
ORDER BY column1";
+
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ assertFalse(brokerResponse.isGroupsTrimmed());
+
+ InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+ planMaker.setMinServerGroupTrimSize(5);
+ // on server level, trimming occurs only when threshold is reached
+ planMaker.setGroupByTrimThreshold(100);
+ brokerResponse = getBrokerResponse(query, planMaker);
+
+ assertTrue(brokerResponse.isGroupsTrimmed());
+ }
+
@Test
public void testDistinctSum() {
String query = "select DISTINCTSUM(column1) as v1, DISTINCTSUM(column3) as
v2 from testTable";
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java
index 947c2032e0..4f534e458c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java
@@ -207,7 +207,7 @@ public class GroupByEnableTrimOptionIntegrationTest extends
BaseClusterIntegrati
JsonNode plan = postV2Query(option + " set explainAskingServers=true;
explain plan for " + query);
Assert.assertEquals(GroupByOptionsIntegrationTest.toResultStr(result),
expectedResult);
- Assert.assertEquals(GroupByOptionsIntegrationTest.toExplainStr(plan),
expectedPlan);
+ Assert.assertEquals(GroupByOptionsIntegrationTest.toExplainStr(plan,
true), expectedPlan);
}
private JsonNode postV2Query(String query)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
index 272f314482..e2251cece3 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
@@ -473,7 +474,7 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
JsonNode plan = postV2Query(option + " set explainAskingServers=true;
explain plan for " + query);
Assert.assertEquals(toResultStr(result), expectedResult);
- Assert.assertEquals(toExplainStr(plan), expectedPlan);
+ Assert.assertEquals(toExplainStr(plan, true), expectedPlan);
}
@Test
@@ -550,6 +551,17 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
getExtraQueryProperties());
}
+ public static @NotNull String toResultStr(ResultSetGroup resultSet) {
+ if (resultSet == null) {
+ return "null";
+ }
+ JsonNode node = resultSet.getBrokerResponse().getResultTable();
+ if (node == null) {
+ return toErrorString(resultSet.getBrokerResponse().getExceptions());
+ }
+ return toString(node);
+ }
+
public static @NotNull String toResultStr(JsonNode mainNode) {
if (mainNode == null) {
return "null";
@@ -561,7 +573,7 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
return toString(node);
}
- static @NotNull String toExplainStr(JsonNode mainNode) {
+ static @NotNull String toExplainStr(JsonNode mainNode, boolean isMSQE) {
if (mainNode == null) {
return "null";
}
@@ -569,7 +581,11 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
if (node == null) {
return toErrorString(mainNode.get("exceptions"));
}
- return toExplainString(node);
+ return toExplainString(node, isMSQE);
+ }
+
+ static @NotNull String toExplainStr(JsonNode mainNode) {
+ return toExplainStr(mainNode, false);
}
public static String toErrorString(JsonNode node) {
@@ -613,8 +629,21 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
return buf.toString();
}
- public static String toExplainString(JsonNode node) {
- return node.get("rows").get(0).get(1).textValue();
+ private static String toExplainString(JsonNode node, boolean isMSQE) {
+ if (isMSQE) {
+ return node.get("rows").get(0).get(1).textValue();
+ } else {
+ StringBuilder result = new StringBuilder();
+ JsonNode rows = node.get("rows");
+ for (int i = 0, n = rows.size(); i < n; i++) {
+ JsonNode row = rows.get(i);
+ result.append(row.get(0).textValue())
+ .append(",\t").append(row.get(1).intValue())
+ .append(",\t").append(row.get(2).intValue())
+ .append('\n');
+ }
+ return result.toString();
+ }
}
@AfterClass
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java
new file mode 100644
index 0000000000..51f42729a0
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java
@@ -0,0 +1,789 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.client.Connection;
+import org.apache.pinot.client.ResultSetGroup;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static
org.apache.pinot.integration.tests.GroupByOptionsIntegrationTest.toExplainStr;
+import static
org.apache.pinot.integration.tests.GroupByOptionsIntegrationTest.toResultStr;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+
+
+// Tests that 'groupsTrimmed' flag is set when results trimming occurs at:
+// SSQE - segment, inter-segment/server and broker levels
+// MSQE - segment, inter-segment and intermediate levels
+// Note: MSQE doesn't push collations depending on group by result into
aggregation nodes
+// so e.g. ORDER BY i*j doesn't trigger trimming even when hints are set
+public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSet {
+
+ static final int FILES_NO = 4;
+ static final int RECORDS_NO = 1000;
+ static final String I_COL = "i";
+ static final String J_COL = "j";
+ static final int SERVERS_NO = 2;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ startZk();
+ startController();
+ startServers(SERVERS_NO);
+ startBroker();
+
+ Schema schema = new
Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME)
+ .addSingleValueDimension(I_COL, FieldSpec.DataType.INT)
+ .addSingleValueDimension(J_COL, FieldSpec.DataType.LONG)
+ .build();
+ addSchema(schema);
+ TableConfig tableConfig = createOfflineTableConfig();
+ addTableConfig(tableConfig);
+
+ List<File> avroFiles = createAvroFile(_tempDir);
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig,
schema, 0, _segmentDir, _tarDir);
+ uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
+
+ // Wait for all documents loaded
+ TestUtils.waitForCondition(() ->
getCurrentCountStarResult(DEFAULT_TABLE_NAME) == FILES_NO * RECORDS_NO, 100L,
+ 60_000,
+ "Failed to load documents", true, Duration.ofMillis(60_000 / 10));
+
+ setUseMultiStageQueryEngine(true);
+
+ Map<String, List<String>> map =
getTableServersToSegmentsMap(getTableName(), TableType.OFFLINE);
+
+ // make sure segments are split between multiple servers
+ assertEquals(map.size(), SERVERS_NO);
+ }
+
+ protected TableConfig createOfflineTableConfig() {
+ return new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName(getTableName())
+ .setNumReplicas(getNumReplicas())
+ .setBrokerTenant(getBrokerTenant())
+ .build();
+ }
+
+ public static List<File> createAvroFile(File tempDir)
+ throws IOException {
+
+ // create avro schema
+ org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+ avroSchema.setFields(ImmutableList.of(
+ new org.apache.avro.Schema.Field(I_COL,
+ org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT),
null, null),
+ new org.apache.avro.Schema.Field(J_COL,
+ org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG),
null, null)));
+
+ List<File> files = new ArrayList<>();
+ for (int file = 0; file < FILES_NO; file++) {
+ File avroFile = new File(tempDir, "data_" + file + ".avro");
+ try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+ fileWriter.create(avroSchema, avroFile);
+
+ for (int docId = 0; docId < RECORDS_NO; docId++) {
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ record.put(I_COL, docId % 100);
+ record.put(J_COL, docId);
+ fileWriter.append(record);
+ }
+ files.add(avroFile);
+ }
+ }
+ return files;
+ }
+
+ // MSQE - multi stage query engine
+ @Test
+ public void testMSQEOrderByOnDependingOnAggregateResultIsNotPushedDown()
+ throws Exception {
+ setUseMultiStageQueryEngine(true);
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable
GROUP BY i, j ORDER BY i*j DESC LIMIT 5"));
+
+ String options = "SET minSegmentGroupTrimSize=5; ";
+ String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i,
j, COUNT(*) "
+ + "FROM mytable GROUP BY i, j ORDER BY i*j DESC LIMIT 5 ";
+
+ ResultSetGroup result = conn.execute(options + query);
+ assertTrimFlagNotSet(result);
+
+ assertEquals("Execution Plan\n"
+ + "LogicalSort(sort0=[$3], dir0=[DESC], offset=[0], fetch=[5])\n"
+ + " PinotLogicalSortExchange(distribution=[hash], collation=[[3
DESC]], isSortOnSender=[false], "
+ + "isSortOnReceiver=[true])\n"
+ + " LogicalSort(sort0=[$3], dir0=[DESC], fetch=[5])\n" // <--
actual sort & limit
+ + " LogicalProject(i=[$0], j=[$1], EXPR$2=[$2], EXPR$3=[*($0,
$1)])\n"
+ // <-- order by value is computed here, so trimming in upstream
stages is not possible
+ + " PinotLogicalAggregate(group=[{0, 1}],
agg#0=[COUNT($2)], aggType=[FINAL])\n"
+ + " PinotLogicalExchange(distribution=[hash[0, 1]])\n"
+ + " LeafStageCombineOperator(table=[mytable])\n"
+ + " StreamingInstanceResponse\n"
+ + " CombineGroupBy\n"
+ + " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n"
+ + " Project(columns=[[i, j]])\n"
+ + " DocIdSet(maxDocs=[40000])\n"
+ + "
FilterMatchEntireSegment(numDocs=[4000])\n",
+ toExplainStr(postQuery(options + " SET explainAskingServers=true;
EXPLAIN PLAN FOR " + query), true));
+ }
+
+ @Test
+ public void
testMSQEGroupsTrimmedAtSegmentLevelWithOrderByOnSomeGroupByKeysIsNotSafe()
+ throws Exception {
+ setUseMultiStageQueryEngine(true);
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable
GROUP BY i, j ORDER BY j DESC LIMIT 5"));
+
+ String options = "SET minSegmentGroupTrimSize=5; ";
+ String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i,
j, COUNT(*) "
+ + "FROM mytable GROUP BY i, j ORDER BY j DESC LIMIT 5 ";
+
+ ResultSetGroup result = conn.execute(options + query);
+ assertTrimFlagSet(result);
+
+ assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]\n"
+ + "99,\t999,\t4\n"
+ + "98,\t998,\t4\n"
+ + "97,\t997,\t4\n"
+ + "96,\t996,\t4\n"
+ + "95,\t995,\t4", toResultStr(result));
+
+ assertEquals("Execution Plan\n"
+ + "LogicalSort(sort0=[$1], dir0=[DESC], offset=[0], fetch=[5])\n"
+ + " PinotLogicalSortExchange(distribution=[hash], collation=[[1
DESC]], isSortOnSender=[false], "
+ + "isSortOnReceiver=[true])\n"
+ + " LogicalSort(sort0=[$1], dir0=[DESC], fetch=[5])\n"
+ + " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL], collations=[[1 DESC]],"
+ + " limit=[5])\n"
+ + " PinotLogicalExchange(distribution=[hash[0, 1]])\n"
+ + " LeafStageCombineOperator(table=[mytable])\n"
+ + " StreamingInstanceResponse\n"
+ + " CombineGroupBy\n"
+ + " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n" // <-- trimming happens here
+ + " Project(columns=[[i, j]])\n"
+ + " DocIdSet(maxDocs=[40000])\n"
+ + "
FilterMatchEntireSegment(numDocs=[4000])\n",
+ toExplainStr(postQuery(options + " SET explainAskingServers=true;
EXPLAIN PLAN FOR " + query), true));
+ }
+
+ @Test
+ public void
testMSQEGroupsTrimmedAtSegmentLevelWithOrderByOnAggregateIsNotSafe()
+ throws Exception {
+ setUseMultiStageQueryEngine(true);
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(
+ conn.execute("SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER
BY COUNT(*) DESC LIMIT 5"));
+
+ String options = "SET minSegmentGroupTrimSize=5; ";
+ String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i,
j, COUNT(*) "
+ + "FROM mytable GROUP BY i, j ORDER BY count(*) DESC LIMIT 5 ";
+
+ ResultSetGroup result = conn.execute(options + query);
+ assertTrimFlagSet(result);
+
+ assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]\n"
+ + "77,\t377,\t4\n"
+ + "66,\t566,\t4\n"
+ + "39,\t339,\t4\n"
+ + "96,\t396,\t4\n"
+ + "25,\t25,\t4", toResultStr(result));
+
+ assertEquals("Execution Plan\n"
+ + "LogicalSort(sort0=[$2], dir0=[DESC], offset=[0], fetch=[5])\n"
+ + " PinotLogicalSortExchange(distribution=[hash], collation=[[2
DESC]], isSortOnSender=[false], "
+ + "isSortOnReceiver=[true])\n"
+ + " LogicalSort(sort0=[$2], dir0=[DESC], fetch=[5])\n"
+ + " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL], collations=[[2 DESC]],"
+ + " limit=[5])\n"
+ + " PinotLogicalExchange(distribution=[hash[0, 1]])\n"
+ + " LeafStageCombineOperator(table=[mytable])\n"
+ + " StreamingInstanceResponse\n"
+ + " CombineGroupBy\n"
+ + " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n" //<-- trimming happens here
+ + " Project(columns=[[i, j]])\n"
+ + " DocIdSet(maxDocs=[40000])\n"
+ + "
FilterMatchEntireSegment(numDocs=[4000])\n",
+ toExplainStr(postQuery(options + " SET explainAskingServers=true;
EXPLAIN PLAN FOR " + query), true));
+ }
+
+ @Test
+ public void
testMSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnSomeGroupByKeysIsNotSafe()
+ throws Exception {
+ setUseMultiStageQueryEngine(true);
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable
GROUP BY i, j ORDER BY j DESC LIMIT 5"));
+
+ String options = "SET minServerGroupTrimSize = 5; SET groupTrimThreshold =
100; ";
+ String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i,
j, COUNT(*) "
+ + "FROM mytable "
+ + "GROUP BY i, j "
+ + "ORDER BY j DESC "
+ + "LIMIT 5 ";
+ ResultSetGroup result = conn.execute(options + query);
+ assertTrimFlagSet(result);
+
+ assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]\n"
+ + "99,\t999,\t4\n"
+ + "98,\t998,\t4\n"
+ + "97,\t997,\t4\n"
+ + "96,\t996,\t4\n"
+ + "95,\t995,\t4", toResultStr(result));
+
+ assertEquals("Execution Plan\n"
+ + "LogicalSort(sort0=[$1], dir0=[DESC], offset=[0], fetch=[5])\n"
+ + " PinotLogicalSortExchange(distribution=[hash], collation=[[1
DESC]], isSortOnSender=[false], "
+ + "isSortOnReceiver=[true])\n"
+ + " LogicalSort(sort0=[$1], dir0=[DESC], fetch=[5])\n"
+ + " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL], collations=[[1 DESC]],"
+ + " limit=[5])\n"
+ + " PinotLogicalExchange(distribution=[hash[0, 1]])\n"
+ + " LeafStageCombineOperator(table=[mytable])\n"
+ + " StreamingInstanceResponse\n"
+ + " CombineGroupBy\n" // <-- trimming happens here
+ + " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n"
+ + " Project(columns=[[i, j]])\n"
+ + " DocIdSet(maxDocs=[40000])\n"
+ + "
FilterMatchEntireSegment(numDocs=[4000])\n",
+ toExplainStr(postQuery(options + "SET explainAskingServers=true;
EXPLAIN PLAN FOR " + query), true));
+ }
+
+ @Test
+ public void
testMSQEGroupsTrimmedAtIntermediateLevelWithOrderByOnSomeGroupByKeysIsNotSafe()
+ throws Exception {
+ setUseMultiStageQueryEngine(true);
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable
GROUP BY i, j ORDER BY j DESC LIMIT 5"));
+
+ // This case is tricky because intermediate results are hash-split among
servers so one gets 50 rows on average.
+ // That's the reason both limit and trim size needs to be so small.
+ String query = "SELECT /*+
aggOptions(is_enable_group_trim='true',mse_min_group_trim_size='5') */ i, j,
COUNT(*) "
+ + "FROM mytable "
+ + "GROUP BY i, j "
+ + "ORDER BY j DESC "
+ + "LIMIT 5 ";
+ ResultSetGroup result = conn.execute(query);
+ assertTrimFlagSet(result);
+
+ assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]\n"
+ + "99,\t999,\t4\n"
+ + "98,\t998,\t4\n"
+ + "97,\t997,\t4\n"
+ + "96,\t996,\t4\n"
+ + "95,\t995,\t4", toResultStr(result));
+
+ assertEquals(
+ "Execution Plan\n"
+ + "LogicalSort(sort0=[$1], dir0=[DESC], offset=[0], fetch=[5])\n"
+ + " PinotLogicalSortExchange(distribution=[hash], collation=[[1
DESC]], isSortOnSender=[false], "
+ + "isSortOnReceiver=[true])\n"
+ + " LogicalSort(sort0=[$1], dir0=[DESC], fetch=[5])\n"
+ + " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL], collations=[[1 DESC]],"
+ + " limit=[5])\n" // receives 50-row-big blocks, trimming kicks in
only if limit is lower
+ + " PinotLogicalExchange(distribution=[hash[0, 1]])\n" //
splits blocks via hash distribution
+ + " LeafStageCombineOperator(table=[mytable])\n" // no
trimming happens 'below'
+ + " StreamingInstanceResponse\n"
+ + " CombineGroupBy\n"
+ + " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n"
+ + " Project(columns=[[i, j]])\n"
+ + " DocIdSet(maxDocs=[40000])\n"
+ + "
FilterMatchEntireSegment(numDocs=[4000])\n",
+ toExplainStr(postQuery(" set explainAskingServers=true; EXPLAIN PLAN
FOR " + query), true));
+ }
+
+ // SSQE segment level
+ @Test
+ public void
testSSQEFilteredGroupsTrimmedAtSegmentLevelWithOrderGroupByKeysDerivedFunctionIsNotSafe()
+ throws Exception {
+ setUseMultiStageQueryEngine(false);
+ String query = "SELECT i, j, SUM(i) FILTER (WHERE i > 0) FROM mytable
GROUP BY i, j ORDER BY i + j DESC LIMIT 5";
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(conn.execute(query));
+
+ ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " +
query);
+ assertTrimFlagSet(result);
+
+ assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"sum(i) FILTER(WHERE i >
'0')\"[\"DOUBLE\"]\n"
+ + "99,\t999,\t396.0\n"
+ + "98,\t998,\t392.0\n"
+ + "97,\t997,\t388.0\n"
+ + "96,\t996,\t384.0\n"
+ + "95,\t995,\t380.0", toResultStr(result));
+
+ assertEquals(
+ "BROKER_REDUCE(sort:[plus(i,j)
DESC],limit:5,postAggregations:filter(sum(i),greater_than(i,'0'))),\t1,\t0\n"
+ + "COMBINE_GROUP_BY,\t2,\t1\n"
+ + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+ + "GROUP_BY_FILTERED(groupKeys:i, j,
aggregations:sum(i)),\t3,\t2\n" // <-- trimming happens here
+ + "PROJECT(i, j),\t4,\t3\n"
+ + "DOC_ID_SET,\t5,\t4\n"
+ + "FILTER_FULL_SCAN(operator:RANGE,predicate:i > '0'),\t6,\t5\n"
+ + "PROJECT(i, j),\t7,\t3\n"
+ + "DOC_ID_SET,\t8,\t7\n"
+ + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t9,\t8\n",
+ toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+ }
+
+ @Test
+ public void
testSSQEGroupsTrimmedAtSegmentLevelWithOrderGroupByKeysDerivedFunctionIsNotSafe()
+ throws Exception {
+ setUseMultiStageQueryEngine(false);
+ String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
i + j DESC LIMIT 5";
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(conn.execute(query));
+
+ ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " +
query);
+ assertTrimFlagSet(result);
+
+ assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n"
+ + "99,\t999,\t4\n"
+ + "98,\t998,\t4\n"
+ + "97,\t997,\t4\n"
+ + "96,\t996,\t4\n"
+ + "95,\t995,\t4", toResultStr(result));
+
+ assertEquals("BROKER_REDUCE(sort:[plus(i,j) DESC],limit:5),\t1,\t0\n"
+ + "COMBINE_GROUP_BY,\t2,\t1\n"
+ + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+ + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
//<-- trimming happens here
+ + "PROJECT(i, j),\t4,\t3\n"
+ + "DOC_ID_SET,\t5,\t4\n"
+ + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+ toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+ }
+
+ @Test
+ public void
testSSQEGroupsTrimmedAtSegmentLevelWithOrderBySomeGroupByKeysIsNotSafe()
+ throws Exception {
+ setUseMultiStageQueryEngine(false);
+ String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
j DESC LIMIT 5";
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(conn.execute(query));
+
+ ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " +
query);
+ assertTrimFlagSet(result);
+
+ // With test data set result is stable, but in general, trimming data
ordered by subset of
+ // group by keys can produce incomplete group aggregates due to lack of
stability.
+ // That is because, for a given value of j, sorting treats all values of i
the same,
+ // and segment data is usually unordered.
+ assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n"
+ + "99,\t999,\t4\n"
+ + "98,\t998,\t4\n"
+ + "97,\t997,\t4\n"
+ + "96,\t996,\t4\n"
+ + "95,\t995,\t4", toResultStr(result));
+
+ assertEquals(
+ "BROKER_REDUCE(sort:[j DESC],limit:5),\t1,\t0\n"
+ + "COMBINE_GROUP_BY,\t2,\t1\n"
+ + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+ + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" //
<- trimming happens here
+ + "PROJECT(i, j),\t4,\t3\n"
+ + "DOC_ID_SET,\t5,\t4\n"
+ + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+ toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+ }
+
+ @Test
+ public void
testSSQEGroupsTrimmedAtSegmentLevelWithOrderByAllGroupByKeysAndHavingIsNotSafe()
+ throws Exception {
+ setUseMultiStageQueryEngine(false);
+
+ // trimming is safe on rows ordered by all group by keys (regardless of
key order, direction or duplications)
+ // but not when HAVING clause is present
+ String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j HAVING i
> 50 ORDER BY i ASC, j ASC";
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(conn.execute(query));
+
+ ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " +
query);
+ assertTrimFlagSet(result);
+
+ // Result is unexpectedly empty because segment-level trim keeps first 50
records ordered by i ASC, j ASC
+ // that are later filtered out at broker stage.
+ assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]",
toResultStr(result));
+
+ assertEquals("BROKER_REDUCE(havingFilter:i > '50',sort:[i ASC, j
ASC],limit:10),\t1,\t0\n"
+ + "COMBINE_GROUP_BY,\t2,\t1\n"
+ + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+ + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" //
<- trimming happens here
+ + "PROJECT(i, j),\t4,\t3\n"
+ + "DOC_ID_SET,\t5,\t4\n"
+ + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+ toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+ }
+
+ @Test
+ public void
testSSQEGroupsTrimmedAtSegmentLevelWithOrderByAllGroupByKeysIsSafe()
+ throws Exception {
+ setUseMultiStageQueryEngine(false);
+
+ // trimming is safe on rows ordered by all group by keys (regardless of
key order, direction or duplications)
+ String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
j ASC, i DESC, j ASC LIMIT 5";
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(conn.execute(query));
+
+ ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " +
query);
+ assertTrimFlagNotSet(result);
+
+ assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n"
+ + "0,\t0,\t4\n"
+ + "1,\t1,\t4\n"
+ + "2,\t2,\t4\n"
+ + "3,\t3,\t4\n"
+ + "4,\t4,\t4", toResultStr(result));
+
+ assertEquals(
+ "BROKER_REDUCE(sort:[j ASC, i DESC],limit:5),\t1,\t0\n"
+ + "COMBINE_GROUP_BY,\t2,\t1\n"
+ + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+ + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+ + "PROJECT(i, j),\t4,\t3\n"
+ + "DOC_ID_SET,\t5,\t4\n"
+ + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+ toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+ }
+
+ @Test
+ public void
testSSQEGroupsTrimmedAtSegmentLevelWithOrderByAggregateIsNotSafe()
+ throws Exception {
+ setUseMultiStageQueryEngine(false);
+
+ // trimming is safe on rows ordered by all group by keys (regardless of
key order or direction)
+ String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
count(*)*j ASC LIMIT 5";
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(conn.execute(query));
+
+ ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " +
query);
+ assertTrimFlagSet(result);
+
+ assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n"
+ + "0,\t0,\t4\n"
+ + "1,\t1,\t4\n"
+ + "2,\t2,\t4\n"
+ + "3,\t3,\t4\n"
+ + "4,\t4,\t4", toResultStr(result));
+
+ assertEquals("BROKER_REDUCE(sort:[times(count(*),j)
ASC],limit:5,postAggregations:times(count(*),j)),\t1,\t0\n"
+ + "COMBINE_GROUP_BY,\t2,\t1\n"
+ + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+ + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+ + "PROJECT(i, j),\t4,\t3\n"
+ + "DOC_ID_SET,\t5,\t4\n"
+ + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+ toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+ }
+
+ // SSQE inter-segment level
+
+ @Test
+ public void
testSSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnSomeGroupByKeysIsNotSafe()
+ throws Exception {
+ setUseMultiStageQueryEngine(false);
+ String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
i DESC LIMIT 5";
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(conn.execute(query));
+
+ // on server level, trimming occurs only when threshold is reached
+ ResultSetGroup result = conn.execute("SET minServerGroupTrimSize = 5; SET
groupTrimThreshold = 50; " + query);
+ assertTrimFlagSet(result);
+ // result's order is not stable due to concurrent operations on indexed
table
+
+ assertEquals("BROKER_REDUCE(sort:[i DESC],limit:5),\t1,\t0\n"
+ + "COMBINE_GROUP_BY,\t2,\t1\n" // <-- trimming happens here
+ + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+ + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+ + "PROJECT(i, j),\t4,\t3\n"
+ + "DOC_ID_SET,\t5,\t4\n"
+ + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+ toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+ }
+
+ @Test
+ public void
testSSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnAllGroupByKeysIsSafe()
+ throws Exception {
+ // for SSQE server level == inter-segment level
+ setUseMultiStageQueryEngine(false);
+ String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
i, j LIMIT 5";
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(conn.execute(query));
+
+ // on server level, trimming occurs only when threshold is reached
+ ResultSetGroup result = conn.execute("SET minServerGroupTrimSize = 5; SET
groupTrimThreshold = 100; " + query);
+ assertTrimFlagNotSet(result);
+
+ assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n"
+ + "0,\t0,\t4\n"
+ + "0,\t100,\t4\n"
+ + "0,\t200,\t4\n"
+ + "0,\t300,\t4\n"
+ + "0,\t400,\t4", toResultStr(result));
+
+ assertEquals(
+ "BROKER_REDUCE(sort:[i ASC, j ASC],limit:5),\t1,\t0\n"
+ + "COMBINE_GROUP_BY,\t2,\t1\n" //<-- trimming happens here
+ + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+ + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+ + "PROJECT(i, j),\t4,\t3\n"
+ + "DOC_ID_SET,\t5,\t4\n"
+ + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+ toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+ }
+
+ @Test
+ public void
testSSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnAggregateIsNotSafe()
+ throws Exception {
+ // for SSQE server level == inter-segment level
+ setUseMultiStageQueryEngine(false);
+ String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
count(*)*j DESC LIMIT 5";
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(conn.execute(query));
+
+ // on server level, trimming occurs only when threshold is reached
+ ResultSetGroup result = conn.execute("SET minServerGroupTrimSize = 5; SET
groupTrimThreshold = 100; " + query);
+ assertTrimFlagSet(result);
+
+ // Result, though unstable due to concurrent operations on IndexedTable,
is similar to the following
+ // (which is not correct):
+ //i[INT],j[LONG],count(*)[LONG]
+ //98,\t998,\t4
+ //94,\t994,\t4
+ //90,\t990,\t4
+ //86,\t986,\t4
+ //79,\t979,\t4
+
+ assertEquals("BROKER_REDUCE(sort:[times(count(*),j)
DESC],limit:5,postAggregations:times(count(*),j)),\t1,\t0\n"
+ + "COMBINE_GROUP_BY,\t2,\t1\n" //<-- trimming happens here
+ + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+ + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+ + "PROJECT(i, j),\t4,\t3\n"
+ + "DOC_ID_SET,\t5,\t4\n"
+ + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+ toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+ }
+
+ @Test
+ public void
testSSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnAllGroupByKeysAndHavingIsNotSafe()
+ throws Exception {
+ setUseMultiStageQueryEngine(false);
+ String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j HAVING i
> 50 ORDER BY i ASC, j ASC LIMIT 5";
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(conn.execute(query));
+
+ // on server level, trimming occurs only when threshold is reached
+ ResultSetGroup result = conn.execute("SET minServerGroupTrimSize = 5; SET
groupTrimThreshold = 50; " + query);
+ assertTrimFlagSet(result);
+
+ // Result is unexpectedly empty because inter-segment-level trim keeps
first 25 records ordered by i ASC, j ASC
+ // that are later filtered out at broker stage.
+ assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]",
toResultStr(result));
+
+ assertEquals("BROKER_REDUCE(havingFilter:i > '50',sort:[i ASC, j
ASC],limit:5),\t1,\t0\n"
+ + "COMBINE_GROUP_BY,\t2,\t1\n" //<-- trimming happens here
+ + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+ + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+ + "PROJECT(i, j),\t4,\t3\n"
+ + "DOC_ID_SET,\t5,\t4\n"
+ + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+ toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+ }
+
+ // SSQE broker level
+
+ @Test
+ public void testSSQEGroupsTrimmedAtBrokerLevelOrderedByAllGroupByKeysIsSafe()
+ throws Exception {
+ setUseMultiStageQueryEngine(false);
+ String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
i, j LIMIT 5";
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(conn.execute(query));
+
+ // on broker level, trimming occurs only when threshold is reached
+ ResultSetGroup result = conn.execute("SET minBrokerGroupTrimSize = 5; SET
groupTrimThreshold = 50; " + query);
+ assertTrimFlagNotSet(result);
+
+ assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n"
+ + "0,\t0,\t4\n"
+ + "0,\t100,\t4\n"
+ + "0,\t200,\t4\n"
+ + "0,\t300,\t4\n"
+ + "0,\t400,\t4", toResultStr(result));
+
+ assertEquals("BROKER_REDUCE(sort:[i ASC, j ASC],limit:5),\t1,\t0\n" //<--
trimming happens here
+ + "COMBINE_GROUP_BY,\t2,\t1\n"
+ + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+ + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+ + "PROJECT(i, j),\t4,\t3\n"
+ + "DOC_ID_SET,\t5,\t4\n"
+ + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+ toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+ }
+
+ @Test
+ public void
testSSQEGroupsTrimmedAtBrokerLevelOrderedBySomeGroupByKeysIsNotSafe()
+ throws Exception {
+ setUseMultiStageQueryEngine(false);
+ String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
j DESC LIMIT 5";
+
+ Connection conn = getPinotConnection();
+ ResultSetGroup result1 = conn.execute(query);
+ assertTrimFlagNotSet(result1);
+
+ // on broker level, trimming occurs only when threshold is reached
+ ResultSetGroup result = conn.execute("SET minBrokerGroupTrimSize = 5; SET
groupTrimThreshold = 50; " + query);
+ assertTrimFlagSet(result);
+
+ assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n"
+ + "99,\t999,\t4\n"
+ + "98,\t998,\t4\n"
+ + "97,\t997,\t4\n"
+ + "96,\t996,\t4\n"
+ + "95,\t995,\t4", toResultStr(result));
+
+ assertEquals("BROKER_REDUCE(sort:[j DESC],limit:5),\t1,\t0\n" //<--
trimming happens here
+ + "COMBINE_GROUP_BY,\t2,\t1\n"
+ + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+ + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+ + "PROJECT(i, j),\t4,\t3\n"
+ + "DOC_ID_SET,\t5,\t4\n"
+ + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+ toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+ }
+
+ @Test
+ public void
testSSQEGroupsTrimmedAtBrokerLevelOrderedByAllGroupByKeysAndHavingIsNotSafe()
+ throws Exception {
+ setUseMultiStageQueryEngine(false);
+ String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j HAVING i
> 50 ORDER BY i ASC, j ASC LIMIT 5";
+
+ Connection conn = getPinotConnection();
+ ResultSetGroup result1 = conn.execute(query);
+ assertTrimFlagNotSet(result1);
+
+ // on broker level, trimming occurs only when threshold is reached
+ ResultSetGroup result = conn.execute("SET minBrokerGroupTrimSize = 5; SET
groupTrimThreshold = 50; " + query);
+ assertTrimFlagSet(result);
+
+ // Result is unexpectedly empty because segment-level trim keeps first 50
records ordered by i ASC, j ASC
+ // that are later filtered out at broker stage.
+ assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]",
toResultStr(result));
+
+ assertEquals(
+ "BROKER_REDUCE(havingFilter:i > '50',sort:[i ASC, j
ASC],limit:5),\t1,\t0\n" //<-- trimming happens here
+ + "COMBINE_GROUP_BY,\t2,\t1\n"
+ + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+ + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+ + "PROJECT(i, j),\t4,\t3\n"
+ + "DOC_ID_SET,\t5,\t4\n"
+ + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+ toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+ }
+
+ @Test
+ public void
testSSQEGroupsTrimmedAtBrokerLevelOrderedByAllGroupByAggregateIsNotSafe()
+ throws Exception {
+ setUseMultiStageQueryEngine(false);
+ String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
count(*)*j DESC LIMIT 5";
+
+ Connection conn = getPinotConnection();
+ assertTrimFlagNotSet(conn.execute(query));
+
+ // on broker level, trimming occurs only when threshold is reached
+ ResultSetGroup result = conn.execute("SET minBrokerGroupTrimSize = 5; SET
groupTrimThreshold = 50; " + query);
+ assertTrimFlagSet(result);
+
+ // result is similar to the following, but unstable:
+ //i[INT],j[LONG],count(*)[LONG]
+ //99,999,4
+ //98,998,4
+ //97,997,4
+ //96,996,4
+ //82,982,4
+
+ assertEquals("BROKER_REDUCE(sort:[times(count(*),j) DESC],limit:5," //<--
trimming happens here
+ + "postAggregations:times(count(*),j)),\t1,\t0\n"
+ + "COMBINE_GROUP_BY,\t2,\t1\n"
+ + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+ + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+ + "PROJECT(i, j),\t4,\t3\n"
+ + "DOC_ID_SET,\t5,\t4\n"
+ + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+ toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+ }
+
+ private static void assertTrimFlagNotSet(ResultSetGroup result) {
+
assertFalse(result.getBrokerResponse().getExecutionStats().isGroupsTrimmed());
+ }
+
+ private static void assertTrimFlagSet(ResultSetGroup result) {
+
assertTrue(result.getBrokerResponse().getExecutionStats().isGroupsTrimmed());
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ dropOfflineTable(DEFAULT_TABLE_NAME);
+
+ stopServer();
+ stopBroker();
+ stopController();
+ stopZk();
+
+ FileUtils.deleteDirectory(_tempDir);
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 16115fa728..f9fb427dd1 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -229,6 +229,10 @@ public class AggregateOperator extends MultiStageOperator {
return _eosBlock;
} else {
MseBlock dataBlock = new RowHeapDataBlock(rows, _resultSchema,
_aggFunctions);
+ if (_groupByExecutor.getRowsProcessed() > _groupTrimSize) {
+ _statMap.merge(StatKey.GROUPS_TRIMMED, true);
+ }
+
if (_groupByExecutor.isNumGroupsLimitReached()) {
if (_errorOnNumGroupsLimit) {
_input.earlyTerminate();
@@ -461,6 +465,7 @@ public class AggregateOperator extends MultiStageOperator {
return true;
}
},
+ GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN);
//@formatter:on
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
index 20c96fdcf1..fab26d033b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
@@ -240,6 +240,9 @@ public class LeafOperator extends MultiStageOperator {
case TOTAL_DOCS:
_statMap.merge(StatKey.TOTAL_DOCS,
Long.parseLong(entry.getValue()));
break;
+ case GROUPS_TRIMMED:
+ _statMap.merge(StatKey.GROUPS_TRIMMED,
Boolean.parseBoolean(entry.getValue()));
+ break;
case NUM_GROUPS_LIMIT_REACHED:
_statMap.merge(StatKey.NUM_GROUPS_LIMIT_REACHED,
Boolean.parseBoolean(entry.getValue()));
break;
@@ -654,6 +657,7 @@ public class LeafOperator extends MultiStageOperator {
NUM_SEGMENTS_PRUNED_INVALID(StatMap.Type.INT),
NUM_SEGMENTS_PRUNED_BY_LIMIT(StatMap.Type.INT),
NUM_SEGMENTS_PRUNED_BY_VALUE(StatMap.Type.INT),
+ GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN),
NUM_RESIZES(StatMap.Type.INT, null),
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index d9a285d342..3b179ed4b8 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -236,6 +236,7 @@ public abstract class MultiStageOperator
public void mergeInto(BrokerResponseNativeV2 response, StatMap<?> map) {
@SuppressWarnings("unchecked")
StatMap<AggregateOperator.StatKey> stats =
(StatMap<AggregateOperator.StatKey>) map;
+
response.mergeGroupsTrimmed(stats.getBoolean(AggregateOperator.StatKey.GROUPS_TRIMMED));
response.mergeNumGroupsLimitReached(stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED));
response.mergeNumGroupsWarningLimitReached(
stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED));
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
index 9b978c4c43..7eafbd858e 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
@@ -60,6 +60,7 @@ public class MultistageGroupByExecutor {
private final AggType _aggType;
private final boolean _leafReturnFinalResult;
private final DataSchema _resultSchema;
+ private int _rowsProcessed;
private final int _numGroupsLimit;
private final int _numGroupsWarningLimit;
private final boolean _filteredAggregationsSkipEmptyGroups;
@@ -206,12 +207,14 @@ public class MultistageGroupByExecutor {
_groupIdGenerator.getGroupKeyIterator(numKeys + numFunctions);
int idx = 0;
- while (idx++ < numGroups && groupKeyIterator.hasNext()) {
+ while (idx < numGroups && groupKeyIterator.hasNext()) {
Object[] row = getRow(groupKeyIterator, numKeys, numFunctions,
resultStoredTypes);
sortedRows.add(row);
+ idx++;
}
while (groupKeyIterator.hasNext()) {
+ idx++;
// TODO: allocate new array row only if row enters set
Object[] row = getRow(groupKeyIterator, numKeys, numFunctions,
resultStoredTypes);
if (comparator.compare(sortedRows.peek(), row) < 0) {
@@ -220,6 +223,8 @@ public class MultistageGroupByExecutor {
}
}
+ _rowsProcessed = idx;
+
int resultSize = sortedRows.size();
ArrayList<Object[]> result = new ArrayList<>(sortedRows.size());
for (int i = resultSize - 1; i >= 0; i--) {
@@ -245,9 +250,13 @@ public class MultistageGroupByExecutor {
_groupIdGenerator.getGroupKeyIterator(numKeys + numFunctions);
int idx = 0;
- while (groupKeyIterator.hasNext() && idx++ < numGroups) {
+ while (groupKeyIterator.hasNext() && idx < numGroups) {
Object[] row = getRow(groupKeyIterator, numKeys, numFunctions,
resultStoredTypes);
rows.add(row);
+ idx++;
+ }
+ if (groupKeyIterator.hasNext()) {
+ _rowsProcessed = idx + 1;
}
return rows;
}
@@ -292,6 +301,10 @@ public class MultistageGroupByExecutor {
return _groupIdGenerator.getNumGroups();
}
+ public int getRowsProcessed() {
+ return _rowsProcessed;
+ }
+
public boolean isNumGroupsLimitReached() {
return _groupIdGenerator.getNumGroups() == _numGroupsLimit;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
index 114752ae0e..3e928943d4 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
@@ -62,6 +62,7 @@ public class DefaultRequestContext implements RequestScope {
private long _realtimeTotalMemAllocatedBytes;
private int _numServersQueried;
private int _numServersResponded;
+ private boolean _groupsTrimmed;
private boolean _isNumGroupsLimitReached;
private int _numExceptions;
private String _brokerId;
@@ -340,6 +341,11 @@ public class DefaultRequestContext implements RequestScope
{
return _realtimeThreadCpuTimeNs;
}
+ @Override
+ public boolean isGroupsTrimmed() {
+ return _groupsTrimmed;
+ }
+
@Override
public boolean isNumGroupsLimitReached() {
return _isNumGroupsLimitReached;
@@ -480,6 +486,11 @@ public class DefaultRequestContext implements RequestScope
{
_numServersResponded = numServersResponded;
}
+ @Override
+ public void setGroupsTrimmed(boolean groupsTrimmed) {
+ _groupsTrimmed = groupsTrimmed;
+ }
+
@Override
public void setNumGroupsLimitReached(boolean numGroupsLimitReached) {
_isNumGroupsLimitReached = numGroupsLimitReached;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
index e49f3e769f..fda3e4d4b5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
@@ -144,6 +144,8 @@ public interface RequestContext {
long getRealtimeTotalMemAllocatedBytes();
void setRealtimeTotalMemAllocatedBytes(long realtimeTotalMemAllocatedBytes);
+ boolean isGroupsTrimmed();
+
boolean isNumGroupsLimitReached();
int getNumExceptions();
@@ -176,6 +178,8 @@ public interface RequestContext {
void setNumServersResponded(int numServersResponded);
+ void setGroupsTrimmed(boolean groupsTrimmed);
+
void setNumGroupsLimitReached(boolean numGroupsLimitReached);
void setNumExceptions(int numExceptions);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]