This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0984168988 [multistage] support multi-tenant lookup for v2 engine
(#11241)
0984168988 is described below
commit 0984168988551b97c5a2884f3d2588e75975ff3b
Author: Rong Rong <[email protected]>
AuthorDate: Fri Aug 4 12:52:17 2023 -0700
[multistage] support multi-tenant lookup for v2 engine (#11241)
* support multi-tenant lookup for v2 engine
---------
Co-authored-by: Rong Rong <[email protected]>
---
.../api/resources/PinotQueryResource.java | 83 ++++++++++------------
.../helix/core/PinotHelixResourceManager.java | 10 +++
2 files changed, 49 insertions(+), 44 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index 2b48acd5f7..a913ca33ec 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -38,6 +38,7 @@ import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.GET;
@@ -56,6 +57,7 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.access.AccessControl;
@@ -202,7 +204,7 @@ public class PinotQueryResource {
QueryEnvironment queryEnvironment = new QueryEnvironment(new
TypeFactory(new TypeSystem()),
CalciteSchemaBuilder.asRootSchema(new
PinotCatalog(_pinotHelixResourceManager.getTableCache())), null, null);
List<String> tableNames = queryEnvironment.getTableNamesForQuery(query);
- String brokerTenant;
+ List<String> instanceIds;
if (tableNames.size() != 0) {
List<TableConfig> tableConfigList = getListTableConfigs(tableNames);
if (tableConfigList == null || tableConfigList.size() == 0) {
@@ -210,35 +212,19 @@ public class PinotQueryResource {
new Exception("Unable to find table in cluster, table does not
exist")).toString();
}
- // When routing a query, there should be at least one common broker
tenant for the table. However, the server
- // tenants can be completely disjoint. The leaf stages which access
segments will be processed on the respective
- // server tenants for each table. The intermediate stages can be
processed in either or all of the server tenants
- // belonging to the tables.
- brokerTenant = getCommonBrokerTenant(tableConfigList);
- if (brokerTenant == null) {
+ // find the unions of all the broker tenant tags of the queried tables.
+ Set<String> brokerTenantsUnion = getBrokerTenantsUnion(tableConfigList);
+ if (brokerTenantsUnion.isEmpty()) {
return
QueryException.getException(QueryException.BROKER_REQUEST_SEND_ERROR, new
Exception(
- String.format("Unable to dispatch multistage query with multiple
tables : %s " + "on different tenant",
- tableNames))).toString();
+ String.format("Unable to dispatch multistage query for tables:
[%s]", tableNames))).toString();
}
+ instanceIds = findCommonBrokerInstances(brokerTenantsUnion);
} else {
// TODO fail these queries going forward. Added this logic to take care
of tautologies like BETWEEN 0 and -1.
- List<String> allBrokerList = new
ArrayList<>(_pinotHelixResourceManager.getAllBrokerTenantNames());
- brokerTenant = allBrokerList.get(RANDOM.nextInt(allBrokerList.size()));
+ instanceIds = _pinotHelixResourceManager.getAllBrokerInstances();
LOGGER.error("Unable to find table name from SQL {} thus dispatching to
random broker.", query);
}
- List<String> instanceIds = new
ArrayList<>(_pinotHelixResourceManager.getAllInstancesForBrokerTenant(brokerTenant));
-
- if (instanceIds.isEmpty()) {
- return QueryException.BROKER_RESOURCE_MISSING_ERROR.toString();
- }
-
- instanceIds.retainAll(_pinotHelixResourceManager.getOnlineInstanceList());
- if (instanceIds.isEmpty()) {
- return QueryException.BROKER_INSTANCE_MISSING_ERROR.toString();
- }
-
- // Send query to a random broker.
- String instanceId = instanceIds.get(RANDOM.nextInt(instanceIds.size()));
+ String instanceId = selectRandomInstanceId(instanceIds);
return sendRequestToBroker(query, instanceId, traceEnabled, queryOptions,
httpHeaders);
}
@@ -265,18 +251,7 @@ public class PinotQueryResource {
// Get brokers for the resource table.
List<String> instanceIds =
_pinotHelixResourceManager.getBrokerInstancesFor(rawTableName);
- if (instanceIds.isEmpty()) {
- return QueryException.BROKER_RESOURCE_MISSING_ERROR.toString();
- }
-
- // Retain only online brokers.
- instanceIds.retainAll(_pinotHelixResourceManager.getOnlineInstanceList());
- if (instanceIds.isEmpty()) {
- return QueryException.BROKER_INSTANCE_MISSING_ERROR.toString();
- }
-
- // Send query to a random broker.
- String instanceId = instanceIds.get(RANDOM.nextInt(instanceIds.size()));
+ String instanceId = selectRandomInstanceId(instanceIds);
return sendRequestToBroker(query, instanceId, traceEnabled, queryOptions,
httpHeaders);
}
@@ -299,16 +274,36 @@ public class PinotQueryResource {
return allTableConfigList;
}
- // return the brokerTenant if all table configs point to the same broker,
else returns null
- private String getCommonBrokerTenant(List<TableConfig> tableConfigList) {
- Set<String> tableBrokers = new HashSet<>();
- for (TableConfig tableConfig : tableConfigList) {
- tableBrokers.add(tableConfig.getTenantConfig().getBroker());
+ private String selectRandomInstanceId(List<String> instanceIds) {
+ if (instanceIds.isEmpty()) {
+ return QueryException.BROKER_RESOURCE_MISSING_ERROR.toString();
+ }
+
+ instanceIds.retainAll(_pinotHelixResourceManager.getOnlineInstanceList());
+ if (instanceIds.isEmpty()) {
+ return QueryException.BROKER_INSTANCE_MISSING_ERROR.toString();
}
- if (tableBrokers.size() != 1) {
- return null;
+
+ // Send query to a random broker.
+ return instanceIds.get(RANDOM.nextInt(instanceIds.size()));
+ }
+
+ private List<String> findCommonBrokerInstances(Set<String> brokerTenants) {
+ Stream<InstanceConfig> brokerInstanceConfigs =
_pinotHelixResourceManager.getAllBrokerInstanceConfigs().stream();
+ for (String brokerTenant : brokerTenants) {
+ brokerInstanceConfigs = brokerInstanceConfigs.filter(
+ instanceConfig ->
instanceConfig.containsTag(TagNameUtils.getBrokerTagForTenant(brokerTenant)));
+ }
+ return
brokerInstanceConfigs.map(InstanceConfig::getInstanceName).collect(Collectors.toList());
+ }
+
+ // return the union of brokerTenants from the tables list.
+ private Set<String> getBrokerTenantsUnion(List<TableConfig> tableConfigList)
{
+ Set<String> tableBrokerTenants = new HashSet<>();
+ for (TableConfig tableConfig : tableConfigList) {
+ tableBrokerTenants.add(tableConfig.getTenantConfig().getBroker());
}
- return (String) (tableBrokers.toArray()[0]);
+ return tableBrokerTenants;
}
private String sendRequestToBroker(String query, String instanceId, String
traceEnabled, String queryOptions,
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ad784b2672..d584136740 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -467,6 +467,16 @@ public class PinotHelixResourceManager {
TagNameUtils.getBrokerTagForTenant(brokerTenantName));
}
+ public List<String> getAllBrokerInstances() {
+ return HelixHelper.getAllInstances(_helixAdmin, _helixClusterName).stream()
+ .filter(InstanceTypeUtils::isBroker).collect(Collectors.toList());
+ }
+
+ public List<InstanceConfig> getAllBrokerInstanceConfigs() {
+ return HelixHelper.getInstanceConfigs(_helixZkManager).stream()
+ .filter(instance ->
InstanceTypeUtils.isBroker(instance.getId())).collect(Collectors.toList());
+ }
+
public List<InstanceConfig> getAllControllerInstanceConfigs() {
return HelixHelper.getInstanceConfigs(_helixZkManager).stream()
.filter(instance ->
InstanceTypeUtils.isController(instance.getId())).collect(Collectors.toList());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]