This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 969bbf00d4 Add tablesQueried metadata to BrokerResponse (#14384)
969bbf00d4 is described below

commit 969bbf00d4ab0a421beaf75fdad9b189997f0e59
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Tue Nov 5 13:23:36 2024 +0530

    Add tablesQueried metadata to BrokerResponse (#14384)
    
    tablesQueried is a set of tables that were queried in the request. The 
field is set for both single stage and multi-stage queries.
---
 .../BaseSingleStageBrokerRequestHandler.java       |  1 +
 .../MultiStageBrokerRequestHandler.java            |  1 +
 .../pinot/common/response/BrokerResponse.java      | 13 ++++++++++
 .../response/broker/BrokerResponseNative.java      | 16 +++++++++++-
 .../response/broker/BrokerResponseNativeV2.java    | 17 ++++++++++++-
 .../tests/MultiStageEngineIntegrationTest.java     | 29 ++++++++++++++++++++++
 6 files changed, 75 insertions(+), 2 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 42ae1d13ab..89868cf6fa 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -819,6 +819,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
             offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, 
remainingTimeMs, serverStats,
             requestContext);
       }
+      brokerResponse.setTablesQueried(Set.of(rawTableName));
 
       for (ProcessingException exception : exceptions) {
         brokerResponse.addException(exception);
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index c199665425..f61138b3b2 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -246,6 +246,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
 
     BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
     brokerResponse.setResultTable(queryResults.getResultTable());
+    brokerResponse.setTablesQueried(tableNames);
     // TODO: Add servers queried/responded stats
     brokerResponse.setBrokerReduceTimeMs(queryResults.getBrokerReduceTimeMs());
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index 5aed5106ab..a1db078a3c 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.response.broker.ResultTable;
@@ -294,4 +295,16 @@ public interface BrokerResponse {
    * Returns the trace info for the query execution when tracing is enabled, 
empty map otherwise.
    */
   Map<String, String> getTraceInfo();
+
+  /**
+   * Set the tables queried in the request
+   * @param tablesQueried Set of tables queried
+   */
+  void setTablesQueried(Set<String> tablesQueried);
+
+  /**
+   * Get the tables queried in the request
+   * @return Set of tables queried
+   */
+  Set<String> getTablesQueried();
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 83a30a10a3..6d8cdac132 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -29,7 +29,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.ProcessingException;
@@ -51,7 +53,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
     "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", 
"offlineSystemActivitiesCpuTimeNs",
     "realtimeSystemActivitiesCpuTimeNs", 
"offlineResponseSerializationCpuTimeNs",
     "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", 
"realtimeTotalCpuTimeNs",
-    "explainPlanNumEmptyFilterSegments", 
"explainPlanNumMatchAllFilterSegments", "traceInfo"
+    "explainPlanNumEmptyFilterSegments", 
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried"
 })
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class BrokerResponseNative implements BrokerResponse {
@@ -99,6 +101,7 @@ public class BrokerResponseNative implements BrokerResponse {
   private long _explainPlanNumEmptyFilterSegments = 0L;
   private long _explainPlanNumMatchAllFilterSegments = 0L;
   private Map<String, String> _traceInfo = new HashMap<>();
+  private Set<String> _tablesQueried = Set.of();
 
   public BrokerResponseNative() {
   }
@@ -485,4 +488,15 @@ public class BrokerResponseNative implements 
BrokerResponse {
   public void setTraceInfo(Map<String, String> traceInfo) {
     _traceInfo = traceInfo;
   }
+
+  @Override
+  public void setTablesQueried(@NotNull Set<String> tablesQueried) {
+    _tablesQueried = tablesQueried;
+  }
+
+  @Override
+  @NotNull
+  public Set<String> getTablesQueried() {
+    return _tablesQueried;
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 2a4e80f77a..8e9a4d6b81 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -25,7 +25,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
 import org.apache.pinot.common.datatable.StatMap;
 import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.ProcessingException;
@@ -45,7 +47,8 @@ import org.apache.pinot.common.response.ProcessingException;
     "numSegmentsPrunedByLimit", "numSegmentsPrunedByValue", 
"brokerReduceTimeMs", "offlineThreadCpuTimeNs",
     "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", 
"realtimeSystemActivitiesCpuTimeNs",
     "offlineResponseSerializationCpuTimeNs", 
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
-    "realtimeTotalCpuTimeNs", "explainPlanNumEmptyFilterSegments", 
"explainPlanNumMatchAllFilterSegments", "traceInfo"
+    "realtimeTotalCpuTimeNs", "explainPlanNumEmptyFilterSegments", 
"explainPlanNumMatchAllFilterSegments", "traceInfo",
+    "tablesQueried"
 })
 public class BrokerResponseNativeV2 implements BrokerResponse {
   private final StatMap<StatKey> _brokerStats = new StatMap<>(StatKey.class);
@@ -73,6 +76,7 @@ public class BrokerResponseNativeV2 implements BrokerResponse 
{
   private int _numServersQueried;
   private int _numServersResponded;
   private long _brokerReduceTimeMs;
+  private Set<String> _tablesQueried = Set.of();
 
   @JsonInclude(JsonInclude.Include.NON_NULL)
   @Nullable
@@ -384,4 +388,15 @@ public class BrokerResponseNativeV2 implements 
BrokerResponse {
       return _type;
     }
   }
+
+  @Override
+  public void setTablesQueried(@NotNull Set<String> tablesQueried) {
+    _tablesQueried = tablesQueried;
+  }
+
+  @Override
+  @NotNull
+  public Set<String> getTablesQueried() {
+    return _tablesQueried;
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index e91f9cc719..029b061666 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -56,6 +56,7 @@ import org.testng.annotations.Test;
 import static org.apache.pinot.common.function.scalar.StringFunctions.*;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
 
@@ -1222,6 +1223,34 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
     checkQueryPlanningErrorForDBTest(result, 
QueryException.QUERY_PLANNING_ERROR_CODE);
   }
 
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testTablesQueriedField(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query = "select sum(ActualElapsedTime) from mytable;";
+    JsonNode jsonNode = postQuery(query);
+    JsonNode tablesQueried = jsonNode.get("tablesQueried");
+    assertNotNull(tablesQueried);
+    assertTrue(tablesQueried.isArray());
+    assertEquals(tablesQueried.size(), 1);
+    assertEquals(tablesQueried.get(0).asText(), "mytable");
+  }
+
+  @Test
+  public void testTablesQueriedWithJoin()
+      throws Exception {
+    // Self Join
+    String query = "select sum(ActualElapsedTime) from mytable WHERE 
ActualElapsedTime > "
+        + "(select avg(ActualElapsedTime) as avg_profit from mytable)";
+    JsonNode jsonNode = postQuery(query);
+    JsonNode tablesQueried = jsonNode.get("tablesQueried");
+    assertNotNull(tablesQueried);
+    assertTrue(tablesQueried.isArray());
+    assertEquals(tablesQueried.size(), 1);
+    assertEquals(tablesQueried.get(0).asText(), "mytable");
+  }
+
+
   private void checkQueryResultForDBTest(String column, String tableName)
       throws Exception {
     checkQueryResultForDBTest(column, tableName, null, null);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to