This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0984168988 [multistage] support multi-tenant lookup for v2 engine 
(#11241)
0984168988 is described below

commit 0984168988551b97c5a2884f3d2588e75975ff3b
Author: Rong Rong <[email protected]>
AuthorDate: Fri Aug 4 12:52:17 2023 -0700

    [multistage] support multi-tenant lookup for v2 engine (#11241)
    
    * support multi-tenant lookup for v2 engine
    
    ---------
    
    Co-authored-by: Rong Rong <[email protected]>
---
 .../api/resources/PinotQueryResource.java          | 83 ++++++++++------------
 .../helix/core/PinotHelixResourceManager.java      | 10 +++
 2 files changed, 49 insertions(+), 44 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index 2b48acd5f7..a913ca33ec 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -38,6 +38,7 @@ import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.ws.rs.GET;
@@ -56,6 +57,7 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.access.AccessControl;
@@ -202,7 +204,7 @@ public class PinotQueryResource {
     QueryEnvironment queryEnvironment = new QueryEnvironment(new 
TypeFactory(new TypeSystem()),
         CalciteSchemaBuilder.asRootSchema(new 
PinotCatalog(_pinotHelixResourceManager.getTableCache())), null, null);
     List<String> tableNames = queryEnvironment.getTableNamesForQuery(query);
-    String brokerTenant;
+    List<String> instanceIds;
     if (tableNames.size() != 0) {
       List<TableConfig> tableConfigList = getListTableConfigs(tableNames);
       if (tableConfigList == null || tableConfigList.size() == 0) {
@@ -210,35 +212,19 @@ public class PinotQueryResource {
             new Exception("Unable to find table in cluster, table does not 
exist")).toString();
       }
 
-      // When routing a query, there should be at least one common broker 
tenant for the table. However, the server
-      // tenants can be completely disjoint. The leaf stages which access 
segments will be processed on the respective
-      // server tenants for each table. The intermediate stages can be 
processed in either or all of the server tenants
-      // belonging to the tables.
-      brokerTenant = getCommonBrokerTenant(tableConfigList);
-      if (brokerTenant == null) {
+      // find the unions of all the broker tenant tags of the queried tables.
+      Set<String> brokerTenantsUnion = getBrokerTenantsUnion(tableConfigList);
+      if (brokerTenantsUnion.isEmpty()) {
         return 
QueryException.getException(QueryException.BROKER_REQUEST_SEND_ERROR, new 
Exception(
-            String.format("Unable to dispatch multistage query with multiple 
tables : %s " + "on different tenant",
-                tableNames))).toString();
+            String.format("Unable to dispatch multistage query for tables: 
[%s]", tableNames))).toString();
       }
+      instanceIds = findCommonBrokerInstances(brokerTenantsUnion);
     } else {
       // TODO fail these queries going forward. Added this logic to take care 
of tautologies like BETWEEN 0 and -1.
-      List<String> allBrokerList = new 
ArrayList<>(_pinotHelixResourceManager.getAllBrokerTenantNames());
-      brokerTenant = allBrokerList.get(RANDOM.nextInt(allBrokerList.size()));
+      instanceIds = _pinotHelixResourceManager.getAllBrokerInstances();
       LOGGER.error("Unable to find table name from SQL {} thus dispatching to 
random broker.", query);
     }
-    List<String> instanceIds = new 
ArrayList<>(_pinotHelixResourceManager.getAllInstancesForBrokerTenant(brokerTenant));
-
-    if (instanceIds.isEmpty()) {
-      return QueryException.BROKER_RESOURCE_MISSING_ERROR.toString();
-    }
-
-    instanceIds.retainAll(_pinotHelixResourceManager.getOnlineInstanceList());
-    if (instanceIds.isEmpty()) {
-      return QueryException.BROKER_INSTANCE_MISSING_ERROR.toString();
-    }
-
-    // Send query to a random broker.
-    String instanceId = instanceIds.get(RANDOM.nextInt(instanceIds.size()));
+    String instanceId = selectRandomInstanceId(instanceIds);
     return sendRequestToBroker(query, instanceId, traceEnabled, queryOptions, 
httpHeaders);
   }
 
@@ -265,18 +251,7 @@ public class PinotQueryResource {
 
     // Get brokers for the resource table.
     List<String> instanceIds = 
_pinotHelixResourceManager.getBrokerInstancesFor(rawTableName);
-    if (instanceIds.isEmpty()) {
-      return QueryException.BROKER_RESOURCE_MISSING_ERROR.toString();
-    }
-
-    // Retain only online brokers.
-    instanceIds.retainAll(_pinotHelixResourceManager.getOnlineInstanceList());
-    if (instanceIds.isEmpty()) {
-      return QueryException.BROKER_INSTANCE_MISSING_ERROR.toString();
-    }
-
-    // Send query to a random broker.
-    String instanceId = instanceIds.get(RANDOM.nextInt(instanceIds.size()));
+    String instanceId = selectRandomInstanceId(instanceIds);
     return sendRequestToBroker(query, instanceId, traceEnabled, queryOptions, 
httpHeaders);
   }
 
@@ -299,16 +274,36 @@ public class PinotQueryResource {
     return allTableConfigList;
   }
 
-  // return the brokerTenant if all table configs point to the same broker, 
else returns null
-  private String getCommonBrokerTenant(List<TableConfig> tableConfigList) {
-    Set<String> tableBrokers = new HashSet<>();
-    for (TableConfig tableConfig : tableConfigList) {
-      tableBrokers.add(tableConfig.getTenantConfig().getBroker());
+  private String selectRandomInstanceId(List<String> instanceIds) {
+    if (instanceIds.isEmpty()) {
+      return QueryException.BROKER_RESOURCE_MISSING_ERROR.toString();
+    }
+
+    instanceIds.retainAll(_pinotHelixResourceManager.getOnlineInstanceList());
+    if (instanceIds.isEmpty()) {
+      return QueryException.BROKER_INSTANCE_MISSING_ERROR.toString();
     }
-    if (tableBrokers.size() != 1) {
-      return null;
+
+    // Send query to a random broker.
+    return instanceIds.get(RANDOM.nextInt(instanceIds.size()));
+  }
+
+  private List<String> findCommonBrokerInstances(Set<String> brokerTenants) {
+    Stream<InstanceConfig> brokerInstanceConfigs = 
_pinotHelixResourceManager.getAllBrokerInstanceConfigs().stream();
+    for (String brokerTenant : brokerTenants) {
+      brokerInstanceConfigs = brokerInstanceConfigs.filter(
+          instanceConfig -> 
instanceConfig.containsTag(TagNameUtils.getBrokerTagForTenant(brokerTenant)));
+    }
+    return 
brokerInstanceConfigs.map(InstanceConfig::getInstanceName).collect(Collectors.toList());
+  }
+
+  // return the union of brokerTenants from the tables list.
+  private Set<String> getBrokerTenantsUnion(List<TableConfig> tableConfigList) 
{
+    Set<String> tableBrokerTenants = new HashSet<>();
+    for (TableConfig tableConfig : tableConfigList) {
+      tableBrokerTenants.add(tableConfig.getTenantConfig().getBroker());
     }
-    return (String) (tableBrokers.toArray()[0]);
+    return tableBrokerTenants;
   }
 
   private String sendRequestToBroker(String query, String instanceId, String 
traceEnabled, String queryOptions,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ad784b2672..d584136740 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -467,6 +467,16 @@ public class PinotHelixResourceManager {
         TagNameUtils.getBrokerTagForTenant(brokerTenantName));
   }
 
+  public List<String> getAllBrokerInstances() {
+    return HelixHelper.getAllInstances(_helixAdmin, _helixClusterName).stream()
+        .filter(InstanceTypeUtils::isBroker).collect(Collectors.toList());
+  }
+
+  public List<InstanceConfig> getAllBrokerInstanceConfigs() {
+    return HelixHelper.getInstanceConfigs(_helixZkManager).stream()
+        .filter(instance -> 
InstanceTypeUtils.isBroker(instance.getId())).collect(Collectors.toList());
+  }
+
   public List<InstanceConfig> getAllControllerInstanceConfigs() {
     return HelixHelper.getInstanceConfigs(_helixZkManager).stream()
         .filter(instance -> 
InstanceTypeUtils.isController(instance.getId())).collect(Collectors.toList());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to