This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 969bbf00d4 Add tablesQueried metadata to BrokerResponse (#14384)
969bbf00d4 is described below
commit 969bbf00d4ab0a421beaf75fdad9b189997f0e59
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Tue Nov 5 13:23:36 2024 +0530
Add tablesQueried metadata to BrokerResponse (#14384)
tablesQueried is a set of tables that were queried in the request. The
field is set for both single stage and multi-stage queries.
---
.../BaseSingleStageBrokerRequestHandler.java | 1 +
.../MultiStageBrokerRequestHandler.java | 1 +
.../pinot/common/response/BrokerResponse.java | 13 ++++++++++
.../response/broker/BrokerResponseNative.java | 16 +++++++++++-
.../response/broker/BrokerResponseNativeV2.java | 17 ++++++++++++-
.../tests/MultiStageEngineIntegrationTest.java | 29 ++++++++++++++++++++++
6 files changed, 75 insertions(+), 2 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 42ae1d13ab..89868cf6fa 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -819,6 +819,7 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable,
remainingTimeMs, serverStats,
requestContext);
}
+ brokerResponse.setTablesQueried(Set.of(rawTableName));
for (ProcessingException exception : exceptions) {
brokerResponse.addException(exception);
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index c199665425..f61138b3b2 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -246,6 +246,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
brokerResponse.setResultTable(queryResults.getResultTable());
+ brokerResponse.setTablesQueried(tableNames);
// TODO: Add servers queried/responded stats
brokerResponse.setBrokerReduceTimeMs(queryResults.getBrokerReduceTimeMs());
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index 5aed5106ab..a1db078a3c 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
@@ -294,4 +295,16 @@ public interface BrokerResponse {
* Returns the trace info for the query execution when tracing is enabled,
empty map otherwise.
*/
Map<String, String> getTraceInfo();
+
+ /**
+ * Set the tables queried in the request
+ * @param tablesQueried Set of tables queried
+ */
+ void setTablesQueried(Set<String> tablesQueried);
+
+ /**
+ * Get the tables queried in the request
+ * @return Set of tables queried
+ */
+ Set<String> getTablesQueried();
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 83a30a10a3..6d8cdac132 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -29,7 +29,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.ProcessingException;
@@ -51,7 +53,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
"offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
"offlineSystemActivitiesCpuTimeNs",
"realtimeSystemActivitiesCpuTimeNs",
"offlineResponseSerializationCpuTimeNs",
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
"realtimeTotalCpuTimeNs",
- "explainPlanNumEmptyFilterSegments",
"explainPlanNumMatchAllFilterSegments", "traceInfo"
+ "explainPlanNumEmptyFilterSegments",
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried"
})
@JsonIgnoreProperties(ignoreUnknown = true)
public class BrokerResponseNative implements BrokerResponse {
@@ -99,6 +101,7 @@ public class BrokerResponseNative implements BrokerResponse {
private long _explainPlanNumEmptyFilterSegments = 0L;
private long _explainPlanNumMatchAllFilterSegments = 0L;
private Map<String, String> _traceInfo = new HashMap<>();
+ private Set<String> _tablesQueried = Set.of();
public BrokerResponseNative() {
}
@@ -485,4 +488,15 @@ public class BrokerResponseNative implements
BrokerResponse {
public void setTraceInfo(Map<String, String> traceInfo) {
_traceInfo = traceInfo;
}
+
+ @Override
+ public void setTablesQueried(@NotNull Set<String> tablesQueried) {
+ _tablesQueried = tablesQueried;
+ }
+
+ @Override
+ @NotNull
+ public Set<String> getTablesQueried() {
+ return _tablesQueried;
+ }
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 2a4e80f77a..8e9a4d6b81 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -25,7 +25,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.ProcessingException;
@@ -45,7 +47,8 @@ import org.apache.pinot.common.response.ProcessingException;
"numSegmentsPrunedByLimit", "numSegmentsPrunedByValue",
"brokerReduceTimeMs", "offlineThreadCpuTimeNs",
"realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
"realtimeSystemActivitiesCpuTimeNs",
"offlineResponseSerializationCpuTimeNs",
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
- "realtimeTotalCpuTimeNs", "explainPlanNumEmptyFilterSegments",
"explainPlanNumMatchAllFilterSegments", "traceInfo"
+ "realtimeTotalCpuTimeNs", "explainPlanNumEmptyFilterSegments",
"explainPlanNumMatchAllFilterSegments", "traceInfo",
+ "tablesQueried"
})
public class BrokerResponseNativeV2 implements BrokerResponse {
private final StatMap<StatKey> _brokerStats = new StatMap<>(StatKey.class);
@@ -73,6 +76,7 @@ public class BrokerResponseNativeV2 implements BrokerResponse
{
private int _numServersQueried;
private int _numServersResponded;
private long _brokerReduceTimeMs;
+ private Set<String> _tablesQueried = Set.of();
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
@@ -384,4 +388,15 @@ public class BrokerResponseNativeV2 implements
BrokerResponse {
return _type;
}
}
+
+ @Override
+ public void setTablesQueried(@NotNull Set<String> tablesQueried) {
+ _tablesQueried = tablesQueried;
+ }
+
+ @Override
+ @NotNull
+ public Set<String> getTablesQueried() {
+ return _tablesQueried;
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index e91f9cc719..029b061666 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -56,6 +56,7 @@ import org.testng.annotations.Test;
import static org.apache.pinot.common.function.scalar.StringFunctions.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -1222,6 +1223,34 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
checkQueryPlanningErrorForDBTest(result,
QueryException.QUERY_PLANNING_ERROR_CODE);
}
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testTablesQueriedField(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query = "select sum(ActualElapsedTime) from mytable;";
+ JsonNode jsonNode = postQuery(query);
+ JsonNode tablesQueried = jsonNode.get("tablesQueried");
+ assertNotNull(tablesQueried);
+ assertTrue(tablesQueried.isArray());
+ assertEquals(tablesQueried.size(), 1);
+ assertEquals(tablesQueried.get(0).asText(), "mytable");
+ }
+
+ @Test
+ public void testTablesQueriedWithJoin()
+ throws Exception {
+ // Self Join
+ String query = "select sum(ActualElapsedTime) from mytable WHERE
ActualElapsedTime > "
+ + "(select avg(ActualElapsedTime) as avg_profit from mytable)";
+ JsonNode jsonNode = postQuery(query);
+ JsonNode tablesQueried = jsonNode.get("tablesQueried");
+ assertNotNull(tablesQueried);
+ assertTrue(tablesQueried.isArray());
+ assertEquals(tablesQueried.size(), 1);
+ assertEquals(tablesQueried.get(0).asText(), "mytable");
+ }
+
+
private void checkQueryResultForDBTest(String column, String tableName)
throws Exception {
checkQueryResultForDBTest(column, tableName, null, null);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]