This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5b2b90d939d [federation] Add multi-cluster routing support for SSE
queries (#17439)
5b2b90d939d is described below
commit 5b2b90d939d2a26b08f631021a82fe9152294308
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Mon Dec 29 20:23:34 2025 -0800
[federation] Add multi-cluster routing support for SSE queries (#17439)
* [federation] Add multi-cluster routing support for SSE queries
* Addressed comments
---------
Co-authored-by: shauryachats <[email protected]>
---
.../broker/broker/helix/BaseBrokerStarter.java | 10 +-
.../requesthandler/BaseBrokerRequestHandler.java | 6 +-
.../BaseSingleStageBrokerRequestHandler.java | 15 ++-
.../requesthandler/GrpcBrokerRequestHandler.java | 6 +-
.../MultiStageBrokerRequestHandler.java | 6 +-
.../SingleConnectionBrokerRequestHandler.java | 6 +-
.../requesthandler/TimeSeriesRequestHandler.java | 6 +-
.../BaseSingleStageBrokerRequestHandlerTest.java | 2 +-
.../LiteralOnlyBrokerRequestTest.java | 6 +-
.../common/utils/LogicalTableConfigUtils.java | 32 ++++---
.../common/utils/config/QueryOptionsUtils.java | 5 +
.../core/routing/LogicalTableRouteProvider.java | 50 +++++++++-
.../core/routing/MultiClusterRoutingContext.java | 25 +++++
.../multicluster/MultiClusterIntegrationTest.java | 103 ++++++++++++++++++++-
.../apache/pinot/spi/data/PhysicalTableConfig.java | 15 +++
.../apache/pinot/spi/utils/CommonConstants.java | 2 +
16 files changed, 253 insertions(+), 42 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 6bb740339f7..7b1e6ee802d 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -391,7 +391,8 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
if
(brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE))
{
singleStageBrokerRequestHandler =
new GrpcBrokerRequestHandler(_brokerConf, brokerId,
requestIdGenerator, _routingManager,
- _accessControlFactory, _queryQuotaManager, _tableCache,
_failureDetector, _threadAccountant);
+ _accessControlFactory, _queryQuotaManager, _tableCache,
_failureDetector, _threadAccountant,
+ multiClusterRoutingContext);
} else {
// Default request handler type, i.e. netty
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf,
Broker.BROKER_NETTY_PREFIX);
@@ -403,7 +404,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId,
requestIdGenerator, _routingManager,
_accessControlFactory, _queryQuotaManager, _tableCache,
nettyDefaults, tlsDefaults,
- _serverRoutingStatsManager, _failureDetector, _threadAccountant);
+ _serverRoutingStatsManager, _failureDetector, _threadAccountant,
multiClusterRoutingContext);
}
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
QueryDispatcher queryDispatcher = null;
@@ -417,14 +418,15 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId,
requestIdGenerator, _routingManager,
_accessControlFactory, _queryQuotaManager, _tableCache,
_multiStageQueryThrottler, _failureDetector,
- _threadAccountant);
+ _threadAccountant, multiClusterRoutingContext);
}
TimeSeriesRequestHandler timeSeriesRequestHandler = null;
if
(StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey())))
{
Preconditions.checkNotNull(queryDispatcher, "Multistage Engine should be
enabled to use time-series engine");
timeSeriesRequestHandler =
new TimeSeriesRequestHandler(_brokerConf, brokerId,
requestIdGenerator, _routingManager,
- _accessControlFactory, _queryQuotaManager, _tableCache,
queryDispatcher, _threadAccountant);
+ _accessControlFactory, _queryQuotaManager, _tableCache,
queryDispatcher, _threadAccountant,
+ multiClusterRoutingContext);
}
LOGGER.info("Initializing PinotFSFactory");
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 5646068a648..86f56aa03f2 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -52,6 +52,7 @@ import
org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.spi.accounting.ThreadAccountant;
import org.apache.pinot.spi.auth.AuthorizationResult;
@@ -95,6 +96,8 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
protected final boolean _enableQueryCancellation;
@Nullable
protected final String _enableAutoRewriteAggregationType;
+ @Nullable
+ protected final MultiClusterRoutingContext _multiClusterRoutingContext;
/**
* Maps broker-generated query id to the query string.
@@ -108,7 +111,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId,
BrokerRequestIdGenerator requestIdGenerator, RoutingManager
routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
- ThreadAccountant threadAccountant) {
+ ThreadAccountant threadAccountant, MultiClusterRoutingContext
multiClusterRoutingContext) {
_config = config;
_brokerId = brokerId;
_requestIdGenerator = requestIdGenerator;
@@ -117,6 +120,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
_queryQuotaManager = queryQuotaManager;
_tableCache = tableCache;
_threadAccountant = threadAccountant;
+ _multiClusterRoutingContext = multiClusterRoutingContext;
_brokerMetrics = BrokerMetrics.get();
_brokerQueryEventListener =
BrokerQueryEventListenerFactory.getBrokerQueryEventListener();
_trackedHeaders = BrokerQueryEventListenerFactory.getTrackedHeaders();
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index ff978892388..bc0b8562e3a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -91,6 +91,7 @@ import
org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.routing.ImplicitHybridTableRouteProvider;
import org.apache.pinot.core.routing.LogicalTableRouteProvider;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.TableRouteInfo;
import org.apache.pinot.core.routing.TableRouteProvider;
@@ -183,9 +184,9 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String
brokerId,
BrokerRequestIdGenerator requestIdGenerator, RoutingManager
routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
- ThreadAccountant threadAccountant) {
+ ThreadAccountant threadAccountant, MultiClusterRoutingContext
multiClusterRoutingContext) {
super(config, brokerId, requestIdGenerator, routingManager,
accessControlFactory, queryQuotaManager, tableCache,
- threadAccountant);
+ threadAccountant, multiClusterRoutingContext);
_disableGroovy = _config.getProperty(Broker.DISABLE_GROOVY,
Broker.DEFAULT_DISABLE_GROOVY);
_useApproximateFunction =
_config.getProperty(Broker.USE_APPROXIMATE_FUNCTION, false);
_defaultHllLog2m =
_config.getProperty(CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY,
@@ -216,7 +217,7 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
Broker.DEFAULT_USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA);
_implicitHybridTableRouteProvider = new ImplicitHybridTableRouteProvider();
- _logicalTableRouteProvider = new LogicalTableRouteProvider();
+ _logicalTableRouteProvider = new
LogicalTableRouteProvider(multiClusterRoutingContext);
LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query
response limit: {}, "
+ "default query limit {}, query log max length: {}, query log max
rate: {}, query cancellation "
@@ -525,8 +526,12 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
routeProvider = _implicitHybridTableRouteProvider;
}
+ // Get the appropriate routing manager based on query options
+ RoutingManager selectedRoutingManager = _multiClusterRoutingContext !=
null && logicalTableConfig != null
+ ?
_multiClusterRoutingContext.getRoutingManager(pinotQuery.getQueryOptions())
+ : _routingManager;
// Get the tables hit by the request
- TableRouteInfo routeInfo = routeProvider.getTableRouteInfo(tableName,
_tableCache, _routingManager);
+ TableRouteInfo routeInfo = routeProvider.getTableRouteInfo(tableName,
_tableCache, selectedRoutingManager);
if (!routeInfo.isExists()) {
LOGGER.info("Table not found for request {}: {}", requestId, query);
@@ -650,7 +655,7 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
// Calculate routing table for the query
// TODO: Modify RoutingManager interface to directly take PinotQuery
long routingStartTimeNs = System.nanoTime();
- routeProvider.calculateRoutes(routeInfo, _routingManager,
offlineBrokerRequest, realtimeBrokerRequest,
+ routeProvider.calculateRoutes(routeInfo, selectedRoutingManager,
offlineBrokerRequest, realtimeBrokerRequest,
requestId);
Set<ServerInstance> offlineExecutionServers =
routeInfo.getOfflineExecutionServers();
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index e97eab96d04..b593a63ed37 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -37,6 +37,7 @@ import
org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.grpc.ServerGrpcQueryClient;
import org.apache.pinot.common.utils.grpc.ServerGrpcRequestBuilder;
import org.apache.pinot.core.query.reduce.StreamingReduceService;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.SegmentsToQuery;
import org.apache.pinot.core.routing.TableRouteInfo;
@@ -66,9 +67,10 @@ public class GrpcBrokerRequestHandler extends
BaseSingleStageBrokerRequestHandle
public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId,
BrokerRequestIdGenerator requestIdGenerator, RoutingManager
routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
- FailureDetector failureDetector, ThreadAccountant threadAccountant) {
+ FailureDetector failureDetector, ThreadAccountant threadAccountant,
+ MultiClusterRoutingContext multiClusterRoutingContext) {
super(config, brokerId, requestIdGenerator, routingManager,
accessControlFactory, queryQuotaManager, tableCache,
- threadAccountant);
+ threadAccountant, multiClusterRoutingContext);
_streamingReduceService = new StreamingReduceService(config);
_streamingQueryClient = new
PinotServerStreamingQueryClient(GrpcConfig.buildGrpcQueryConfig(config));
_failureDetector = failureDetector;
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 68bee89bf32..710ba7cccb6 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -71,6 +71,7 @@ import org.apache.pinot.common.utils.Timer;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.request.QueryFingerprintUtils;
import org.apache.pinot.common.utils.tls.TlsUtils;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.ImmutableQueryEnvironment;
@@ -139,9 +140,10 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
public MultiStageBrokerRequestHandler(PinotConfiguration config, String
brokerId,
BrokerRequestIdGenerator requestIdGenerator, RoutingManager
routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
- MultiStageQueryThrottler queryThrottler, FailureDetector
failureDetector, ThreadAccountant threadAccountant) {
+ MultiStageQueryThrottler queryThrottler, FailureDetector
failureDetector, ThreadAccountant threadAccountant,
+ MultiClusterRoutingContext multiClusterRoutingContext) {
super(config, brokerId, requestIdGenerator, routingManager,
accessControlFactory, queryQuotaManager, tableCache,
- threadAccountant);
+ threadAccountant, multiClusterRoutingContext);
String hostname =
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port =
Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
_workerManager = new WorkerManager(_brokerId, hostname, port,
_routingManager);
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 8e8e7cd85aa..9f4d51252fe 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -38,6 +38,7 @@ import
org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.query.reduce.BrokerReduceService;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.TableRouteInfo;
import org.apache.pinot.core.transport.AsyncQueryResponse;
@@ -73,9 +74,10 @@ public class SingleConnectionBrokerRequestHandler extends
BaseSingleStageBrokerR
BrokerRequestIdGenerator requestIdGenerator, RoutingManager
routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
NettyConfig nettyConfig, TlsConfig tlsConfig, ServerRoutingStatsManager
serverRoutingStatsManager,
- FailureDetector failureDetector, ThreadAccountant threadAccountant) {
+ FailureDetector failureDetector, ThreadAccountant threadAccountant,
+ MultiClusterRoutingContext multiClusterRoutingContext) {
super(config, brokerId, requestIdGenerator, routingManager,
accessControlFactory, queryQuotaManager, tableCache,
- threadAccountant);
+ threadAccountant, multiClusterRoutingContext);
_brokerReduceService = new BrokerReduceService(_config);
_queryRouter = new QueryRouter(_brokerId, nettyConfig, tlsConfig,
serverRoutingStatsManager, threadAccountant);
_failureDetector = failureDetector;
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index 8306c4bd33b..309a53dddc3 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -52,6 +52,7 @@ import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.utils.HumanReadableDuration;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.accounting.ThreadAccountant;
import org.apache.pinot.spi.auth.AuthorizationResult;
@@ -85,9 +86,10 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
public TimeSeriesRequestHandler(PinotConfiguration config, String brokerId,
BrokerRequestIdGenerator requestIdGenerator, BrokerRoutingManager
routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
- QueryDispatcher queryDispatcher, ThreadAccountant threadAccountant) {
+ QueryDispatcher queryDispatcher, ThreadAccountant threadAccountant,
+ MultiClusterRoutingContext multiClusterRoutingContext) {
super(config, brokerId, requestIdGenerator, routingManager,
accessControlFactory, queryQuotaManager, tableCache,
- threadAccountant);
+ threadAccountant, multiClusterRoutingContext);
_queryEnvironment = new TimeSeriesQueryEnvironment(config, routingManager,
tableCache);
_queryEnvironment.init(config);
_queryDispatcher = queryDispatcher;
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
index 3df8c7bcc8f..6cce23c0e80 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
@@ -191,7 +191,7 @@ public class BaseSingleStageBrokerRequestHandlerTest {
BaseSingleStageBrokerRequestHandler requestHandler =
new BaseSingleStageBrokerRequestHandler(config, "testBrokerId", new
BrokerRequestIdGenerator(), routingManager,
new AllowAllAccessControlFactory(), queryQuotaManager, tableCache,
- ThreadAccountantUtils.getNoOpAccountant()) {
+ ThreadAccountantUtils.getNoOpAccountant(), null) {
@Override
public void start() {
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
index ab588aeb7bc..f0038143cd5 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
@@ -174,7 +174,7 @@ public class LiteralOnlyBrokerRequestTest {
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
"testBrokerId",
new BrokerRequestIdGenerator(), null, ACCESS_CONTROL_FACTORY,
null, null, null, null,
mock(ServerRoutingStatsManager.class), mock(FailureDetector.class),
- ThreadAccountantUtils.getNoOpAccountant());
+ ThreadAccountantUtils.getNoOpAccountant(), null);
long randNum = RANDOM.nextLong();
byte[] randBytes = new byte[12];
@@ -200,7 +200,7 @@ public class LiteralOnlyBrokerRequestTest {
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
"testBrokerId",
new BrokerRequestIdGenerator(), null, ACCESS_CONTROL_FACTORY,
null, null, null, null,
mock(ServerRoutingStatsManager.class), mock(FailureDetector.class),
- ThreadAccountantUtils.getNoOpAccountant());
+ ThreadAccountantUtils.getNoOpAccountant(), null);
long currentTsMin = System.currentTimeMillis();
BrokerResponse brokerResponse = requestHandler.handleRequest(
"SELECT now() AS currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd
z') AS firstDayOf2020");
@@ -356,7 +356,7 @@ public class LiteralOnlyBrokerRequestTest {
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
"testBrokerId",
new BrokerRequestIdGenerator(), null, ACCESS_CONTROL_FACTORY,
null, null, null, null,
mock(ServerRoutingStatsManager.class), mock(FailureDetector.class),
- ThreadAccountantUtils.getNoOpAccountant());
+ ThreadAccountantUtils.getNoOpAccountant(), null);
// Test 1: select constant
BrokerResponse brokerResponse = requestHandler.handleRequest("EXPLAIN PLAN
FOR SELECT 1.5, 'test'");
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
index e1ac8e352e1..9e1bbc3d8f0 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
@@ -147,20 +147,6 @@ public class LogicalTableConfigUtils {
String physicalTableName = entry.getKey();
PhysicalTableConfig physicalTableConfig = entry.getValue();
- // validate database name matches
- String physicalTableDatabaseName =
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(physicalTableName);
- if (!StringUtils.equalsIgnoreCase(databaseName,
physicalTableDatabaseName)) {
- throw new IllegalArgumentException(
- "Invalid logical table. Reason: '" + physicalTableName
- + "' should have the same database name as logical table: " +
databaseName + " != "
- + physicalTableDatabaseName);
- }
-
- // validate physical table exists
- if (!physicalTableExistsPredicate.test(physicalTableName)) {
- throw new IllegalArgumentException(
- "Invalid logical table. Reason: '" + physicalTableName + "' should
be one of the existing tables");
- }
// validate physical table config is not null
if (physicalTableConfig == null) {
throw new IllegalArgumentException(
@@ -168,6 +154,24 @@ public class LogicalTableConfigUtils {
+ physicalTableName);
}
+ // Skip database and existence validation for multi-cluster physical
tables
+ if (!physicalTableConfig.isMultiCluster()) {
+ // validate database name matches
+ String physicalTableDatabaseName =
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(physicalTableName);
+ if (!StringUtils.equalsIgnoreCase(databaseName,
physicalTableDatabaseName)) {
+ throw new IllegalArgumentException(
+ "Invalid logical table. Reason: '" + physicalTableName
+ + "' should have the same database name as logical table: "
+ databaseName + " != "
+ + physicalTableDatabaseName);
+ }
+
+ // validate physical table exists
+ if (!physicalTableExistsPredicate.test(physicalTableName)) {
+ throw new IllegalArgumentException(
+ "Invalid logical table. Reason: '" + physicalTableName + "'
should be one of the existing tables");
+ }
+ }
+
if (TableNameBuilder.isOfflineTableResource(physicalTableName)) {
offlineTableNames.add(physicalTableName);
} else if (TableNameBuilder.isRealtimeTableResource(physicalTableName)) {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 5ef28bde147..6cd5f1da334 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -475,6 +475,11 @@ public class QueryOptionsUtils {
return option != null ? Boolean.parseBoolean(option) : defaultValue;
}
+ public static boolean isMultiClusterRoutingEnabled(Map<String, String>
queryOptions, boolean defaultValue) {
+ String option =
queryOptions.get(QueryOptionKey.ENABLE_MULTI_CLUSTER_ROUTING);
+ return option != null ? Boolean.parseBoolean(option) : defaultValue;
+ }
+
public static boolean isUseLiteMode(Map<String, String> queryOptions,
boolean defaultValue) {
String option = queryOptions.get(QueryOptionKey.USE_LITE_MODE);
return option != null ? Boolean.parseBoolean(option) : defaultValue;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/routing/LogicalTableRouteProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/routing/LogicalTableRouteProvider.java
index db359e404dd..2ad5fb6b2de 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/routing/LogicalTableRouteProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/routing/LogicalTableRouteProvider.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.routing;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo;
@@ -29,6 +30,7 @@ import
org.apache.pinot.core.routing.timeboundary.TimeBoundaryStrategyService;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +38,39 @@ import org.slf4j.LoggerFactory;
public class LogicalTableRouteProvider implements TableRouteProvider {
private static final Logger LOGGER =
LoggerFactory.getLogger(LogicalTableRouteProvider.class);
+ @Nullable
+ private final MultiClusterRoutingContext _multiClusterRoutingContext;
+
+ public LogicalTableRouteProvider() {
+ this(null);
+ }
+
+ public LogicalTableRouteProvider(MultiClusterRoutingContext
multiClusterRoutingContext) {
+ _multiClusterRoutingContext = multiClusterRoutingContext;
+ }
+
+ /**
+ * Finds table config from local cache first, then searches federated caches
if not found.
+ * Returns null if the table config is not found in any cache.
+ */
+ private TableCache findTableCache(String tableName, TableCache
localTableCache) {
+ // Try local cache first
+ TableConfig tableConfig = localTableCache.getTableConfig(tableName);
+ if (tableConfig != null) {
+ return localTableCache;
+ }
+
+ // If not found locally and federation is enabled, search federated caches
+ if (_multiClusterRoutingContext != null) {
+ for (TableCache federatedCache :
_multiClusterRoutingContext.getTableCacheMap().values()) {
+ tableConfig = federatedCache.getTableConfig(tableName);
+ if (tableConfig != null) {
+ return federatedCache;
+ }
+ }
+ }
+ return null;
+ }
@Override
public TableRouteInfo getTableRouteInfo(String tableName, TableCache
tableCache, RoutingManager routingManager) {
@@ -56,11 +91,22 @@ public class LogicalTableRouteProvider implements
TableRouteProvider {
List<ImplicitHybridTableRouteInfo> offlineTables = new ArrayList<>();
List<ImplicitHybridTableRouteInfo> realtimeTables = new ArrayList<>();
- for (String physicalTableName :
logicalTableConfig.getPhysicalTableConfigMap().keySet()) {
+ for (var physicalTableEntrySet :
logicalTableConfig.getPhysicalTableConfigMap().entrySet()) {
+ String physicalTableName = physicalTableEntrySet.getKey();
+ PhysicalTableConfig physicalTableConfig =
physicalTableEntrySet.getValue();
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(physicalTableName);
Preconditions.checkNotNull(tableType);
+
+ // Selecting any table cache containing the physical table is acceptable
for federation since
+ // fillTableConfigMetadata only reads metadata from table config, and if
a table exists in multiple
+ // federated clusters, the table configs should be consistent.
+ TableCache selectedTableCache = findTableCache(physicalTableName,
tableCache);
+ if (selectedTableCache == null) {
+ // if the table is not found in any cache, ignore it.
+ continue;
+ }
ImplicitHybridTableRouteInfo physicalTableInfo = new
ImplicitHybridTableRouteInfo();
- routeProvider.fillTableConfigMetadata(physicalTableInfo,
physicalTableName, tableCache);
+ routeProvider.fillTableConfigMetadata(physicalTableInfo,
physicalTableName, selectedTableCache);
if (physicalTableInfo.isExists()) {
if (tableType == TableType.OFFLINE) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
b/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
index 1f139da56ac..251e84ee048 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.routing;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
/**
@@ -52,4 +53,28 @@ public class MultiClusterRoutingContext {
_localRoutingManager = localRoutingManager;
_multiClusterRoutingManager = multiClusterRoutingManager;
}
+
+ public Map<String, TableCache> getTableCacheMap() {
+ return _tableCacheMap;
+ }
+
+ public TableCache getTableCache(String clusterName) {
+ return _tableCacheMap.get(clusterName);
+ }
+
+ /**
+ * Returns the appropriate routing manager based on query options.
+ * If federation is enabled in query options and a multi cluster routing
manager is available,
+ * returns the federated routing manager. Otherwise, returns the primary
routing manager.
+ *
+ * @param queryOptions Query options containing federation flag
+ * @return The appropriate routing manager for the query
+ */
+ public RoutingManager getRoutingManager(Map<String, String> queryOptions) {
+ boolean isMultiClusterRoutingEnabled =
QueryOptionsUtils.isMultiClusterRoutingEnabled(queryOptions, false);
+ if (isMultiClusterRoutingEnabled && _multiClusterRoutingManager != null) {
+ return _multiClusterRoutingManager;
+ }
+ return _localRoutingManager;
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
index e9ac332e2e9..e2857348a85 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
@@ -39,6 +39,7 @@ import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.BaseControllerStarter;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerRequestClient;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
import org.apache.pinot.integration.tests.ClusterTest;
@@ -46,6 +47,8 @@ import
org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -54,6 +57,8 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.NetUtils;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
@@ -76,6 +81,13 @@ public class MultiClusterIntegrationTest extends ClusterTest
{
protected static final String CLUSTER_2_NAME = "DualIsolatedCluster2";
protected static final ClusterConfig CLUSTER_1_CONFIG = new
ClusterConfig(CLUSTER_1_NAME, 30000);
protected static final ClusterConfig CLUSTER_2_CONFIG = new
ClusterConfig(CLUSTER_2_NAME, 40000);
+ protected static final String DEFAULT_TENANT = "DefaultTenant";
+ protected static final String LOGICAL_TABLE_NAME = "logical_table";
+ protected static final String LOGICAL_TABLE_NAME_2 = "logical_table_2";
+ protected static final String LOGICAL_FEDERATION_CLUSTER_1_TABLE =
"logical_federation_table_cluster1";
+ protected static final String LOGICAL_FEDERATION_CLUSTER_2_TABLE =
"logical_federation_table_cluster2";
+ protected static final int CLUSTER_1_SIZE = 1500;
+ protected static final int CLUSTER_2_SIZE = 1000;
protected ClusterComponents _cluster1;
protected ClusterComponents _cluster2;
@@ -127,8 +139,8 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
createSchemaAndTableOnBothClusters(testTableName);
// Create and load test data into both clusters
- _cluster1AvroFiles = createAvroData(100, 1);
- _cluster2AvroFiles = createAvroData(100, 2);
+ _cluster1AvroFiles = createAvroData(CLUSTER_1_SIZE, 1);
+ _cluster2AvroFiles = createAvroData(CLUSTER_2_SIZE, 2);
loadDataIntoCluster(_cluster1AvroFiles, testTableName, _cluster1);
loadDataIntoCluster(_cluster2AvroFiles, testTableName, _cluster2);
@@ -138,17 +150,36 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
String result1 = executeQuery(query, _cluster1);
assertNotNull(result1, "Query result from cluster 1 should not be null");
long count1 = parseCountResult(result1);
- assertEquals(count1, 100, "Cluster 1 should have 100 records");
+ assertEquals(count1, CLUSTER_1_SIZE);
// Verify cluster 2 is queryable
String result2 = executeQuery(query, _cluster2);
assertNotNull(result2, "Query result from cluster 2 should not be null");
long count2 = parseCountResult(result2);
- assertEquals(count2, 100, "Cluster 2 should have 100 records");
+ assertEquals(count2, CLUSTER_2_SIZE);
LOGGER.info("Multi-cluster broker test passed: both clusters started and
queryable");
}
+ @Test
+ public void testLogicalFederationTwoOfflineTablesSSE() throws Exception {
+ dropLogicalTableIfExists(LOGICAL_TABLE_NAME,
_cluster1._controllerBaseApiUrl);
+ dropLogicalTableIfExists(LOGICAL_TABLE_NAME,
_cluster2._controllerBaseApiUrl);
+ dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2,
_cluster1._controllerBaseApiUrl);
+ dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2,
_cluster2._controllerBaseApiUrl);
+ setupFirstLogicalFederatedTable();
+ createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME,
+ LOGICAL_FEDERATION_CLUSTER_1_TABLE,
LOGICAL_FEDERATION_CLUSTER_2_TABLE);
+ cleanSegmentDirs();
+ _cluster1AvroFiles = createAvroData(CLUSTER_1_SIZE, 1);
+ _cluster2AvroFiles = createAvroData(CLUSTER_2_SIZE, 2);
+ loadDataIntoCluster(_cluster1AvroFiles,
LOGICAL_FEDERATION_CLUSTER_1_TABLE, _cluster1);
+ loadDataIntoCluster(_cluster2AvroFiles,
LOGICAL_FEDERATION_CLUSTER_2_TABLE, _cluster2);
+ long expectedTotal = CLUSTER_1_SIZE + CLUSTER_2_SIZE;
+ assertEquals(getCount(LOGICAL_TABLE_NAME, _cluster1, true), expectedTotal);
+ assertEquals(getCount(LOGICAL_TABLE_NAME, _cluster2, true), expectedTotal);
+ }
+
@Override
protected BaseBrokerStarter createBrokerStarter() {
return new MultiClusterHelixBrokerStarter();
@@ -487,4 +518,68 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
LOGGER.warn("Error stopping cluster", e);
}
}
+
+ protected void cleanSegmentDirs() {
+ cleanDirectories(_cluster1._segmentDir, _cluster1._tarDir,
_cluster2._segmentDir, _cluster2._tarDir);
+ }
+
+ protected long getCount(String tableName, ClusterComponents cluster, boolean
enableMultiClusterRouting)
+ throws Exception {
+ String query = "SET enableMultiClusterRouting=" +
enableMultiClusterRouting + "; SELECT COUNT(*) as count FROM "
+ + tableName;
+ return parseCountResult(executeQuery(query, cluster));
+ }
+
+ /*
+ Logical table helper methods
+ */
+ protected void createLogicalTable(String schemaFile,
+ Map<String, PhysicalTableConfig> physicalTableConfigMap, String
brokerTenant, String controllerBaseApiUrl,
+ String logicalTable, String refOfflineTable, String refRealtimeTable)
throws IOException {
+ ControllerRequestURLBuilder urlBuilder =
ControllerRequestURLBuilder.baseUrl(controllerBaseApiUrl);
+ ControllerRequestClient client = new ControllerRequestClient(urlBuilder,
getHttpClient(),
+ getControllerRequestClientHeaders());
+ Schema schema = createSchema(schemaFile);
+ schema.setSchemaName(logicalTable);
+ client.addSchema(schema);
+ LogicalTableConfig config = new LogicalTableConfigBuilder()
+ .setTableName(logicalTable)
+ .setBrokerTenant(brokerTenant)
+ .setRefOfflineTableName(refOfflineTable)
+ .setRefRealtimeTableName(refRealtimeTable)
+ .setPhysicalTableConfigMap(physicalTableConfigMap)
+ .build();
+ String response =
ControllerTest.sendPostRequest(urlBuilder.forLogicalTableCreate(),
+ config.toSingleLineJsonString(), Map.of());
+ assertEquals(response, "{\"unrecognizedProperties\":{},\"status\":\"" +
logicalTable
+ + " logical table successfully added.\"}");
+ }
+
+ protected void createLogicalTableOnBothClusters(String logicalTableName,
+ String cluster1PhysicalTable, String cluster2PhysicalTable) throws
IOException {
+ Map<String, PhysicalTableConfig> physicalTableConfigMap = Map.of(
+ cluster1PhysicalTable + "_OFFLINE", new PhysicalTableConfig(true),
+ cluster2PhysicalTable + "_OFFLINE", new PhysicalTableConfig(true)
+ );
+
+ createLogicalTable(SCHEMA_FILE, physicalTableConfigMap, DEFAULT_TENANT,
+ _cluster1._controllerBaseApiUrl, logicalTableName,
cluster1PhysicalTable + "_OFFLINE", null);
+ createLogicalTable(SCHEMA_FILE, physicalTableConfigMap, DEFAULT_TENANT,
+ _cluster2._controllerBaseApiUrl, logicalTableName,
cluster2PhysicalTable + "_OFFLINE", null);
+ }
+
+ protected void dropLogicalTableIfExists(String logicalTableName, String
controllerBaseApiUrl) {
+ dropResource(controllerBaseApiUrl + "/logicalTables/" + logicalTableName);
+ }
+
+ protected void setupFirstLogicalFederatedTable() throws Exception {
+ setupLogicalFederatedTable(LOGICAL_FEDERATION_CLUSTER_1_TABLE,
LOGICAL_FEDERATION_CLUSTER_2_TABLE);
+ }
+
+ protected void setupLogicalFederatedTable(String cluster1TableName, String
cluster2TableName) throws Exception {
+ dropTableAndSchemaIfExists(cluster1TableName,
_cluster1._controllerBaseApiUrl);
+ dropTableAndSchemaIfExists(cluster2TableName,
_cluster2._controllerBaseApiUrl);
+ createSchemaAndTableForCluster(cluster1TableName,
_cluster1._controllerBaseApiUrl);
+ createSchemaAndTableForCluster(cluster2TableName,
_cluster2._controllerBaseApiUrl);
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java
index c86fcf97dcf..af91f83e910 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.spi.data;
+import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.pinot.spi.config.BaseJsonConfig;
@@ -26,4 +27,18 @@ import org.apache.pinot.spi.config.BaseJsonConfig;
* This is empty by design and more docs would be added as features are added.
*/
public class PhysicalTableConfig extends BaseJsonConfig {
+ @JsonProperty("multiCluster")
+ private final boolean _isMultiCluster;
+
+ public PhysicalTableConfig() {
+ this(false);
+ }
+
+ public PhysicalTableConfig(boolean isMultiCluster) {
+ _isMultiCluster = isMultiCluster;
+ }
+
+ public boolean isMultiCluster() {
+ return _isMultiCluster;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 3004cafdc73..69862016e8c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -865,6 +865,8 @@ public class CommonConstants {
// MAX(stringCol) -> MAXSTRING(stringCol)
// SUM(intCol) -> SUMINT(intCol)
public static final String AUTO_REWRITE_AGGREGATION_TYPE =
"autoRewriteAggregationType";
+ // When enabled, allows multi cluster/federated queries to be executed.
+ public static final String ENABLE_MULTI_CLUSTER_ROUTING =
"enableMultiClusterRouting";
/// Option to customize the value of
[Broker#CONFIG_OF_SORT_EXCHANGE_COPY_THRESHOLD]
public static final String SORT_EXCHANGE_COPY_THRESHOLD =
"sortExchangeCopyThreshold";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]