This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b2b90d939d [federation] Add multi-cluster routing support for SSE 
queries (#17439)
5b2b90d939d is described below

commit 5b2b90d939d2a26b08f631021a82fe9152294308
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Mon Dec 29 20:23:34 2025 -0800

    [federation] Add multi-cluster routing support for SSE queries (#17439)
    
    * [federation] Add multi-cluster routing support for SSE queries
    
    * Addressed comments
    
    ---------
    
    Co-authored-by: shauryachats <[email protected]>
---
 .../broker/broker/helix/BaseBrokerStarter.java     |  10 +-
 .../requesthandler/BaseBrokerRequestHandler.java   |   6 +-
 .../BaseSingleStageBrokerRequestHandler.java       |  15 ++-
 .../requesthandler/GrpcBrokerRequestHandler.java   |   6 +-
 .../MultiStageBrokerRequestHandler.java            |   6 +-
 .../SingleConnectionBrokerRequestHandler.java      |   6 +-
 .../requesthandler/TimeSeriesRequestHandler.java   |   6 +-
 .../BaseSingleStageBrokerRequestHandlerTest.java   |   2 +-
 .../LiteralOnlyBrokerRequestTest.java              |   6 +-
 .../common/utils/LogicalTableConfigUtils.java      |  32 ++++---
 .../common/utils/config/QueryOptionsUtils.java     |   5 +
 .../core/routing/LogicalTableRouteProvider.java    |  50 +++++++++-
 .../core/routing/MultiClusterRoutingContext.java   |  25 +++++
 .../multicluster/MultiClusterIntegrationTest.java  | 103 ++++++++++++++++++++-
 .../apache/pinot/spi/data/PhysicalTableConfig.java |  15 +++
 .../apache/pinot/spi/utils/CommonConstants.java    |   2 +
 16 files changed, 253 insertions(+), 42 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 6bb740339f7..7b1e6ee802d 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -391,7 +391,8 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     if 
(brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE))
 {
       singleStageBrokerRequestHandler =
           new GrpcBrokerRequestHandler(_brokerConf, brokerId, 
requestIdGenerator, _routingManager,
-              _accessControlFactory, _queryQuotaManager, _tableCache, 
_failureDetector, _threadAccountant);
+              _accessControlFactory, _queryQuotaManager, _tableCache, 
_failureDetector, _threadAccountant,
+              multiClusterRoutingContext);
     } else {
       // Default request handler type, i.e. netty
       NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, 
Broker.BROKER_NETTY_PREFIX);
@@ -403,7 +404,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
       singleStageBrokerRequestHandler =
           new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, 
requestIdGenerator, _routingManager,
               _accessControlFactory, _queryQuotaManager, _tableCache, 
nettyDefaults, tlsDefaults,
-              _serverRoutingStatsManager, _failureDetector, _threadAccountant);
+              _serverRoutingStatsManager, _failureDetector, _threadAccountant, 
multiClusterRoutingContext);
     }
     MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
     QueryDispatcher queryDispatcher = null;
@@ -417,14 +418,15 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
       multiStageBrokerRequestHandler =
           new MultiStageBrokerRequestHandler(_brokerConf, brokerId, 
requestIdGenerator, _routingManager,
               _accessControlFactory, _queryQuotaManager, _tableCache, 
_multiStageQueryThrottler, _failureDetector,
-              _threadAccountant);
+              _threadAccountant, multiClusterRoutingContext);
     }
     TimeSeriesRequestHandler timeSeriesRequestHandler = null;
     if 
(StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey())))
 {
       Preconditions.checkNotNull(queryDispatcher, "Multistage Engine should be 
enabled to use time-series engine");
       timeSeriesRequestHandler =
           new TimeSeriesRequestHandler(_brokerConf, brokerId, 
requestIdGenerator, _routingManager,
-              _accessControlFactory, _queryQuotaManager, _tableCache, 
queryDispatcher, _threadAccountant);
+              _accessControlFactory, _queryQuotaManager, _tableCache, 
queryDispatcher, _threadAccountant,
+              multiClusterRoutingContext);
     }
 
     LOGGER.info("Initializing PinotFSFactory");
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 5646068a648..86f56aa03f2 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -52,6 +52,7 @@ import 
org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.spi.accounting.ThreadAccountant;
 import org.apache.pinot.spi.auth.AuthorizationResult;
@@ -95,6 +96,8 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   protected final boolean _enableQueryCancellation;
   @Nullable
   protected final String _enableAutoRewriteAggregationType;
+  @Nullable
+  protected final MultiClusterRoutingContext _multiClusterRoutingContext;
 
   /**
    * Maps broker-generated query id to the query string.
@@ -108,7 +111,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId,
       BrokerRequestIdGenerator requestIdGenerator, RoutingManager 
routingManager,
       AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
-      ThreadAccountant threadAccountant) {
+      ThreadAccountant threadAccountant, MultiClusterRoutingContext 
multiClusterRoutingContext) {
     _config = config;
     _brokerId = brokerId;
     _requestIdGenerator = requestIdGenerator;
@@ -117,6 +120,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     _queryQuotaManager = queryQuotaManager;
     _tableCache = tableCache;
     _threadAccountant = threadAccountant;
+    _multiClusterRoutingContext = multiClusterRoutingContext;
     _brokerMetrics = BrokerMetrics.get();
     _brokerQueryEventListener = 
BrokerQueryEventListenerFactory.getBrokerQueryEventListener();
     _trackedHeaders = BrokerQueryEventListenerFactory.getTrackedHeaders();
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index ff978892388..bc0b8562e3a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -91,6 +91,7 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.core.routing.ImplicitHybridTableRouteProvider;
 import org.apache.pinot.core.routing.LogicalTableRouteProvider;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.TableRouteInfo;
 import org.apache.pinot.core.routing.TableRouteProvider;
@@ -183,9 +184,9 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
   public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String 
brokerId,
       BrokerRequestIdGenerator requestIdGenerator, RoutingManager 
routingManager,
       AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
-      ThreadAccountant threadAccountant) {
+      ThreadAccountant threadAccountant, MultiClusterRoutingContext 
multiClusterRoutingContext) {
     super(config, brokerId, requestIdGenerator, routingManager, 
accessControlFactory, queryQuotaManager, tableCache,
-        threadAccountant);
+        threadAccountant, multiClusterRoutingContext);
     _disableGroovy = _config.getProperty(Broker.DISABLE_GROOVY, 
Broker.DEFAULT_DISABLE_GROOVY);
     _useApproximateFunction = 
_config.getProperty(Broker.USE_APPROXIMATE_FUNCTION, false);
     _defaultHllLog2m = 
_config.getProperty(CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY,
@@ -216,7 +217,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
         Broker.DEFAULT_USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA);
 
     _implicitHybridTableRouteProvider = new ImplicitHybridTableRouteProvider();
-    _logicalTableRouteProvider = new LogicalTableRouteProvider();
+    _logicalTableRouteProvider = new 
LogicalTableRouteProvider(multiClusterRoutingContext);
 
     LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query 
response limit: {}, "
             + "default query limit {}, query log max length: {}, query log max 
rate: {}, query cancellation "
@@ -525,8 +526,12 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
       routeProvider = _implicitHybridTableRouteProvider;
     }
 
+    // Get the appropriate routing manager based on query options
+    RoutingManager selectedRoutingManager = _multiClusterRoutingContext != 
null && logicalTableConfig != null
+        ? 
_multiClusterRoutingContext.getRoutingManager(pinotQuery.getQueryOptions())
+        : _routingManager;
     // Get the tables hit by the request
-    TableRouteInfo routeInfo = routeProvider.getTableRouteInfo(tableName, 
_tableCache, _routingManager);
+    TableRouteInfo routeInfo = routeProvider.getTableRouteInfo(tableName, 
_tableCache, selectedRoutingManager);
 
     if (!routeInfo.isExists()) {
       LOGGER.info("Table not found for request {}: {}", requestId, query);
@@ -650,7 +655,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     // Calculate routing table for the query
     // TODO: Modify RoutingManager interface to directly take PinotQuery
     long routingStartTimeNs = System.nanoTime();
-    routeProvider.calculateRoutes(routeInfo, _routingManager, 
offlineBrokerRequest, realtimeBrokerRequest,
+    routeProvider.calculateRoutes(routeInfo, selectedRoutingManager, 
offlineBrokerRequest, realtimeBrokerRequest,
         requestId);
 
     Set<ServerInstance> offlineExecutionServers = 
routeInfo.getOfflineExecutionServers();
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index e97eab96d04..b593a63ed37 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -37,6 +37,7 @@ import 
org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.utils.grpc.ServerGrpcQueryClient;
 import org.apache.pinot.common.utils.grpc.ServerGrpcRequestBuilder;
 import org.apache.pinot.core.query.reduce.StreamingReduceService;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.SegmentsToQuery;
 import org.apache.pinot.core.routing.TableRouteInfo;
@@ -66,9 +67,10 @@ public class GrpcBrokerRequestHandler extends 
BaseSingleStageBrokerRequestHandle
   public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId,
       BrokerRequestIdGenerator requestIdGenerator, RoutingManager 
routingManager,
       AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
-      FailureDetector failureDetector, ThreadAccountant threadAccountant) {
+      FailureDetector failureDetector, ThreadAccountant threadAccountant,
+      MultiClusterRoutingContext multiClusterRoutingContext) {
     super(config, brokerId, requestIdGenerator, routingManager, 
accessControlFactory, queryQuotaManager, tableCache,
-        threadAccountant);
+        threadAccountant, multiClusterRoutingContext);
     _streamingReduceService = new StreamingReduceService(config);
     _streamingQueryClient = new 
PinotServerStreamingQueryClient(GrpcConfig.buildGrpcQueryConfig(config));
     _failureDetector = failureDetector;
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 68bee89bf32..710ba7cccb6 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -71,6 +71,7 @@ import org.apache.pinot.common.utils.Timer;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.common.utils.request.QueryFingerprintUtils;
 import org.apache.pinot.common.utils.tls.TlsUtils;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.ImmutableQueryEnvironment;
@@ -139,9 +140,10 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
   public MultiStageBrokerRequestHandler(PinotConfiguration config, String 
brokerId,
       BrokerRequestIdGenerator requestIdGenerator, RoutingManager 
routingManager,
       AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
-      MultiStageQueryThrottler queryThrottler, FailureDetector 
failureDetector, ThreadAccountant threadAccountant) {
+      MultiStageQueryThrottler queryThrottler, FailureDetector 
failureDetector, ThreadAccountant threadAccountant,
+      MultiClusterRoutingContext multiClusterRoutingContext) {
     super(config, brokerId, requestIdGenerator, routingManager, 
accessControlFactory, queryQuotaManager, tableCache,
-        threadAccountant);
+        threadAccountant, multiClusterRoutingContext);
     String hostname = 
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
     int port = 
Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
     _workerManager = new WorkerManager(_brokerId, hostname, port, 
_routingManager);
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 8e8e7cd85aa..9f4d51252fe 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -38,6 +38,7 @@ import 
org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.query.reduce.BrokerReduceService;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.TableRouteInfo;
 import org.apache.pinot.core.transport.AsyncQueryResponse;
@@ -73,9 +74,10 @@ public class SingleConnectionBrokerRequestHandler extends 
BaseSingleStageBrokerR
       BrokerRequestIdGenerator requestIdGenerator, RoutingManager 
routingManager,
       AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
       NettyConfig nettyConfig, TlsConfig tlsConfig, ServerRoutingStatsManager 
serverRoutingStatsManager,
-      FailureDetector failureDetector, ThreadAccountant threadAccountant) {
+      FailureDetector failureDetector, ThreadAccountant threadAccountant,
+      MultiClusterRoutingContext multiClusterRoutingContext) {
     super(config, brokerId, requestIdGenerator, routingManager, 
accessControlFactory, queryQuotaManager, tableCache,
-        threadAccountant);
+        threadAccountant, multiClusterRoutingContext);
     _brokerReduceService = new BrokerReduceService(_config);
     _queryRouter = new QueryRouter(_brokerId, nettyConfig, tlsConfig, 
serverRoutingStatsManager, threadAccountant);
     _failureDetector = failureDetector;
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index 8306c4bd33b..309a53dddc3 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -52,6 +52,7 @@ import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.utils.HumanReadableDuration;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
 import org.apache.pinot.query.service.dispatch.QueryDispatcher;
 import org.apache.pinot.spi.accounting.ThreadAccountant;
 import org.apache.pinot.spi.auth.AuthorizationResult;
@@ -85,9 +86,10 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
   public TimeSeriesRequestHandler(PinotConfiguration config, String brokerId,
       BrokerRequestIdGenerator requestIdGenerator, BrokerRoutingManager 
routingManager,
       AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
-      QueryDispatcher queryDispatcher, ThreadAccountant threadAccountant) {
+      QueryDispatcher queryDispatcher, ThreadAccountant threadAccountant,
+      MultiClusterRoutingContext multiClusterRoutingContext) {
     super(config, brokerId, requestIdGenerator, routingManager, 
accessControlFactory, queryQuotaManager, tableCache,
-        threadAccountant);
+        threadAccountant, multiClusterRoutingContext);
     _queryEnvironment = new TimeSeriesQueryEnvironment(config, routingManager, 
tableCache);
     _queryEnvironment.init(config);
     _queryDispatcher = queryDispatcher;
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
index 3df8c7bcc8f..6cce23c0e80 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
@@ -191,7 +191,7 @@ public class BaseSingleStageBrokerRequestHandlerTest {
     BaseSingleStageBrokerRequestHandler requestHandler =
         new BaseSingleStageBrokerRequestHandler(config, "testBrokerId", new 
BrokerRequestIdGenerator(), routingManager,
             new AllowAllAccessControlFactory(), queryQuotaManager, tableCache,
-            ThreadAccountantUtils.getNoOpAccountant()) {
+            ThreadAccountantUtils.getNoOpAccountant(), null) {
           @Override
           public void start() {
           }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
index ab588aeb7bc..f0038143cd5 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
@@ -174,7 +174,7 @@ public class LiteralOnlyBrokerRequestTest {
         new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), 
"testBrokerId",
             new BrokerRequestIdGenerator(), null, ACCESS_CONTROL_FACTORY, 
null, null, null, null,
             mock(ServerRoutingStatsManager.class), mock(FailureDetector.class),
-            ThreadAccountantUtils.getNoOpAccountant());
+            ThreadAccountantUtils.getNoOpAccountant(), null);
 
     long randNum = RANDOM.nextLong();
     byte[] randBytes = new byte[12];
@@ -200,7 +200,7 @@ public class LiteralOnlyBrokerRequestTest {
         new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), 
"testBrokerId",
             new BrokerRequestIdGenerator(), null, ACCESS_CONTROL_FACTORY, 
null, null, null, null,
             mock(ServerRoutingStatsManager.class), mock(FailureDetector.class),
-            ThreadAccountantUtils.getNoOpAccountant());
+            ThreadAccountantUtils.getNoOpAccountant(), null);
     long currentTsMin = System.currentTimeMillis();
     BrokerResponse brokerResponse = requestHandler.handleRequest(
         "SELECT now() AS currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd 
z') AS firstDayOf2020");
@@ -356,7 +356,7 @@ public class LiteralOnlyBrokerRequestTest {
         new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), 
"testBrokerId",
             new BrokerRequestIdGenerator(), null, ACCESS_CONTROL_FACTORY, 
null, null, null, null,
             mock(ServerRoutingStatsManager.class), mock(FailureDetector.class),
-            ThreadAccountantUtils.getNoOpAccountant());
+            ThreadAccountantUtils.getNoOpAccountant(), null);
 
     // Test 1: select constant
     BrokerResponse brokerResponse = requestHandler.handleRequest("EXPLAIN PLAN 
FOR SELECT 1.5, 'test'");
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
index e1ac8e352e1..9e1bbc3d8f0 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
@@ -147,20 +147,6 @@ public class LogicalTableConfigUtils {
       String physicalTableName = entry.getKey();
       PhysicalTableConfig physicalTableConfig = entry.getValue();
 
-      // validate database name matches
-      String physicalTableDatabaseName = 
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(physicalTableName);
-      if (!StringUtils.equalsIgnoreCase(databaseName, 
physicalTableDatabaseName)) {
-        throw new IllegalArgumentException(
-            "Invalid logical table. Reason: '" + physicalTableName
-                + "' should have the same database name as logical table: " + 
databaseName + " != "
-                + physicalTableDatabaseName);
-      }
-
-      // validate physical table exists
-      if (!physicalTableExistsPredicate.test(physicalTableName)) {
-        throw new IllegalArgumentException(
-            "Invalid logical table. Reason: '" + physicalTableName + "' should 
be one of the existing tables");
-      }
       // validate physical table config is not null
       if (physicalTableConfig == null) {
         throw new IllegalArgumentException(
@@ -168,6 +154,24 @@ public class LogicalTableConfigUtils {
                 + physicalTableName);
       }
 
+      // Skip database and existence validation for multi-cluster physical 
tables
+      if (!physicalTableConfig.isMultiCluster()) {
+        // validate database name matches
+        String physicalTableDatabaseName = 
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(physicalTableName);
+        if (!StringUtils.equalsIgnoreCase(databaseName, 
physicalTableDatabaseName)) {
+          throw new IllegalArgumentException(
+              "Invalid logical table. Reason: '" + physicalTableName
+                  + "' should have the same database name as logical table: " 
+ databaseName + " != "
+                  + physicalTableDatabaseName);
+        }
+
+        // validate physical table exists
+        if (!physicalTableExistsPredicate.test(physicalTableName)) {
+          throw new IllegalArgumentException(
+              "Invalid logical table. Reason: '" + physicalTableName + "' 
should be one of the existing tables");
+        }
+      }
+
       if (TableNameBuilder.isOfflineTableResource(physicalTableName)) {
         offlineTableNames.add(physicalTableName);
       } else if (TableNameBuilder.isRealtimeTableResource(physicalTableName)) {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 5ef28bde147..6cd5f1da334 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -475,6 +475,11 @@ public class QueryOptionsUtils {
     return option != null ? Boolean.parseBoolean(option) : defaultValue;
   }
 
+  public static boolean isMultiClusterRoutingEnabled(Map<String, String> 
queryOptions, boolean defaultValue) {
+    String option = 
queryOptions.get(QueryOptionKey.ENABLE_MULTI_CLUSTER_ROUTING);
+    return option != null ? Boolean.parseBoolean(option) : defaultValue;
+  }
+
   public static boolean isUseLiteMode(Map<String, String> queryOptions, 
boolean defaultValue) {
     String option = queryOptions.get(QueryOptionKey.USE_LITE_MODE);
     return option != null ? Boolean.parseBoolean(option) : defaultValue;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/routing/LogicalTableRouteProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/routing/LogicalTableRouteProvider.java
index db359e404dd..2ad5fb6b2de 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/routing/LogicalTableRouteProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/routing/LogicalTableRouteProvider.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.routing;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo;
@@ -29,6 +30,7 @@ import 
org.apache.pinot.core.routing.timeboundary.TimeBoundaryStrategyService;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,6 +38,39 @@ import org.slf4j.LoggerFactory;
 
 public class LogicalTableRouteProvider implements TableRouteProvider {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LogicalTableRouteProvider.class);
+  @Nullable
+  private final MultiClusterRoutingContext _multiClusterRoutingContext;
+
+  public LogicalTableRouteProvider() {
+    this(null);
+  }
+
+  public LogicalTableRouteProvider(MultiClusterRoutingContext 
multiClusterRoutingContext) {
+    _multiClusterRoutingContext = multiClusterRoutingContext;
+  }
+
+  /**
+   * Finds table config from local cache first, then searches federated caches 
if not found.
+   * Returns null if the table config is not found in any cache.
+   */
+  private TableCache findTableCache(String tableName, TableCache 
localTableCache) {
+    // Try local cache first
+    TableConfig tableConfig = localTableCache.getTableConfig(tableName);
+    if (tableConfig != null) {
+      return localTableCache;
+    }
+
+    // If not found locally and federation is enabled, search federated caches
+    if (_multiClusterRoutingContext != null) {
+      for (TableCache federatedCache : 
_multiClusterRoutingContext.getTableCacheMap().values()) {
+        tableConfig = federatedCache.getTableConfig(tableName);
+        if (tableConfig != null) {
+          return federatedCache;
+        }
+      }
+    }
+    return null;
+  }
 
   @Override
   public TableRouteInfo getTableRouteInfo(String tableName, TableCache 
tableCache, RoutingManager routingManager) {
@@ -56,11 +91,22 @@ public class LogicalTableRouteProvider implements 
TableRouteProvider {
 
     List<ImplicitHybridTableRouteInfo> offlineTables = new ArrayList<>();
     List<ImplicitHybridTableRouteInfo> realtimeTables = new ArrayList<>();
-    for (String physicalTableName : 
logicalTableConfig.getPhysicalTableConfigMap().keySet()) {
+    for (var physicalTableEntrySet : 
logicalTableConfig.getPhysicalTableConfigMap().entrySet()) {
+      String physicalTableName = physicalTableEntrySet.getKey();
+      PhysicalTableConfig physicalTableConfig = 
physicalTableEntrySet.getValue();
       TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(physicalTableName);
       Preconditions.checkNotNull(tableType);
+
+      // Selecting any table cache containing the physical table is acceptable 
for federation since
+      // fillTableConfigMetadata only reads metadata from table config, and if 
a table exists in multiple
+      // federated clusters, the table configs should be consistent.
+      TableCache selectedTableCache = findTableCache(physicalTableName, 
tableCache);
+      if (selectedTableCache == null) {
+        // if the table is not found in any cache, ignore it.
+        continue;
+      }
       ImplicitHybridTableRouteInfo physicalTableInfo = new 
ImplicitHybridTableRouteInfo();
-      routeProvider.fillTableConfigMetadata(physicalTableInfo, 
physicalTableName, tableCache);
+      routeProvider.fillTableConfigMetadata(physicalTableInfo, 
physicalTableName, selectedTableCache);
 
       if (physicalTableInfo.isExists()) {
         if (tableType == TableType.OFFLINE) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
index 1f139da56ac..251e84ee048 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.routing;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 
 
 /**
@@ -52,4 +53,28 @@ public class MultiClusterRoutingContext {
     _localRoutingManager = localRoutingManager;
     _multiClusterRoutingManager = multiClusterRoutingManager;
   }
+
+  public Map<String, TableCache> getTableCacheMap() {
+    return _tableCacheMap;
+  }
+
+  public TableCache getTableCache(String clusterName) {
+    return _tableCacheMap.get(clusterName);
+  }
+
+  /**
+   * Returns the appropriate routing manager based on query options.
+   * If federation is enabled in query options and a multi cluster routing 
manager is available,
+   * returns the federated routing manager. Otherwise, returns the primary 
routing manager.
+   *
+   * @param queryOptions Query options containing federation flag
+   * @return The appropriate routing manager for the query
+   */
+  public RoutingManager getRoutingManager(Map<String, String> queryOptions) {
+    boolean isMultiClusterRoutingEnabled = 
QueryOptionsUtils.isMultiClusterRoutingEnabled(queryOptions, false);
+    if (isMultiClusterRoutingEnabled && _multiClusterRoutingManager != null) {
+      return _multiClusterRoutingManager;
+    }
+    return _localRoutingManager;
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
index e9ac332e2e9..e2857348a85 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
@@ -39,6 +39,7 @@ import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.BaseControllerStarter;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerRequestClient;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
 import org.apache.pinot.integration.tests.ClusterTest;
@@ -46,6 +47,8 @@ import 
org.apache.pinot.server.starter.helix.BaseServerStarter;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -54,6 +57,8 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import org.apache.pinot.spi.utils.CommonConstants.Server;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.NetUtils;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.slf4j.Logger;
@@ -76,6 +81,13 @@ public class MultiClusterIntegrationTest extends ClusterTest 
{
   protected static final String CLUSTER_2_NAME = "DualIsolatedCluster2";
   protected static final ClusterConfig CLUSTER_1_CONFIG = new 
ClusterConfig(CLUSTER_1_NAME, 30000);
   protected static final ClusterConfig CLUSTER_2_CONFIG = new 
ClusterConfig(CLUSTER_2_NAME, 40000);
+  protected static final String DEFAULT_TENANT = "DefaultTenant";
+  protected static final String LOGICAL_TABLE_NAME = "logical_table";
+  protected static final String LOGICAL_TABLE_NAME_2 = "logical_table_2";
+  protected static final String LOGICAL_FEDERATION_CLUSTER_1_TABLE = 
"logical_federation_table_cluster1";
+  protected static final String LOGICAL_FEDERATION_CLUSTER_2_TABLE = 
"logical_federation_table_cluster2";
+  protected static final int CLUSTER_1_SIZE = 1500;
+  protected static final int CLUSTER_2_SIZE = 1000;
 
   protected ClusterComponents _cluster1;
   protected ClusterComponents _cluster2;
@@ -127,8 +139,8 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     createSchemaAndTableOnBothClusters(testTableName);
 
     // Create and load test data into both clusters
-    _cluster1AvroFiles = createAvroData(100, 1);
-    _cluster2AvroFiles = createAvroData(100, 2);
+    _cluster1AvroFiles = createAvroData(CLUSTER_1_SIZE, 1);
+    _cluster2AvroFiles = createAvroData(CLUSTER_2_SIZE, 2);
 
     loadDataIntoCluster(_cluster1AvroFiles, testTableName, _cluster1);
     loadDataIntoCluster(_cluster2AvroFiles, testTableName, _cluster2);
@@ -138,17 +150,36 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     String result1 = executeQuery(query, _cluster1);
     assertNotNull(result1, "Query result from cluster 1 should not be null");
     long count1 = parseCountResult(result1);
-    assertEquals(count1, 100, "Cluster 1 should have 100 records");
+    assertEquals(count1, CLUSTER_1_SIZE);
 
     // Verify cluster 2 is queryable
     String result2 = executeQuery(query, _cluster2);
     assertNotNull(result2, "Query result from cluster 2 should not be null");
     long count2 = parseCountResult(result2);
-    assertEquals(count2, 100, "Cluster 2 should have 100 records");
+    assertEquals(count2, CLUSTER_2_SIZE);
 
     LOGGER.info("Multi-cluster broker test passed: both clusters started and 
queryable");
   }
 
+  @Test
+  public void testLogicalFederationTwoOfflineTablesSSE() throws Exception {
+    dropLogicalTableIfExists(LOGICAL_TABLE_NAME, 
_cluster1._controllerBaseApiUrl);
+    dropLogicalTableIfExists(LOGICAL_TABLE_NAME, 
_cluster2._controllerBaseApiUrl);
+    dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2, 
_cluster1._controllerBaseApiUrl);
+    dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2, 
_cluster2._controllerBaseApiUrl);
+    setupFirstLogicalFederatedTable();
+    createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME,
+        LOGICAL_FEDERATION_CLUSTER_1_TABLE, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE);
+    cleanSegmentDirs();
+    _cluster1AvroFiles = createAvroData(CLUSTER_1_SIZE, 1);
+    _cluster2AvroFiles = createAvroData(CLUSTER_2_SIZE, 2);
+    loadDataIntoCluster(_cluster1AvroFiles, 
LOGICAL_FEDERATION_CLUSTER_1_TABLE, _cluster1);
+    loadDataIntoCluster(_cluster2AvroFiles, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE, _cluster2);
+    long expectedTotal = CLUSTER_1_SIZE + CLUSTER_2_SIZE;
+    assertEquals(getCount(LOGICAL_TABLE_NAME, _cluster1, true), expectedTotal);
+    assertEquals(getCount(LOGICAL_TABLE_NAME, _cluster2, true), expectedTotal);
+  }
+
   @Override
   protected BaseBrokerStarter createBrokerStarter() {
     return new MultiClusterHelixBrokerStarter();
@@ -487,4 +518,68 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
       LOGGER.warn("Error stopping cluster", e);
     }
   }
+
+  protected void cleanSegmentDirs() {
+    cleanDirectories(_cluster1._segmentDir, _cluster1._tarDir, 
_cluster2._segmentDir, _cluster2._tarDir);
+  }
+
+  protected long getCount(String tableName, ClusterComponents cluster, boolean 
enableMultiClusterRouting)
+      throws Exception {
+    String query = "SET enableMultiClusterRouting=" + 
enableMultiClusterRouting + "; SELECT COUNT(*) as count FROM "
+      + tableName;
+    return parseCountResult(executeQuery(query, cluster));
+  }
+
+  /*
+  Logical table helper methods
+  */
+  protected void createLogicalTable(String schemaFile,
+      Map<String, PhysicalTableConfig> physicalTableConfigMap, String 
brokerTenant, String controllerBaseApiUrl,
+      String logicalTable, String refOfflineTable, String refRealtimeTable) 
throws IOException {
+    ControllerRequestURLBuilder urlBuilder = 
ControllerRequestURLBuilder.baseUrl(controllerBaseApiUrl);
+    ControllerRequestClient client = new ControllerRequestClient(urlBuilder, 
getHttpClient(),
+        getControllerRequestClientHeaders());
+    Schema schema = createSchema(schemaFile);
+    schema.setSchemaName(logicalTable);
+    client.addSchema(schema);
+    LogicalTableConfig config = new LogicalTableConfigBuilder()
+        .setTableName(logicalTable)
+        .setBrokerTenant(brokerTenant)
+        .setRefOfflineTableName(refOfflineTable)
+        .setRefRealtimeTableName(refRealtimeTable)
+        .setPhysicalTableConfigMap(physicalTableConfigMap)
+        .build();
+    String response = 
ControllerTest.sendPostRequest(urlBuilder.forLogicalTableCreate(),
+        config.toSingleLineJsonString(), Map.of());
+    assertEquals(response, "{\"unrecognizedProperties\":{},\"status\":\"" + 
logicalTable
+        + " logical table successfully added.\"}");
+  }
+
+  protected void createLogicalTableOnBothClusters(String logicalTableName,
+      String cluster1PhysicalTable, String cluster2PhysicalTable) throws 
IOException {
+    Map<String, PhysicalTableConfig> physicalTableConfigMap = Map.of(
+        cluster1PhysicalTable + "_OFFLINE", new PhysicalTableConfig(true),
+        cluster2PhysicalTable + "_OFFLINE", new PhysicalTableConfig(true)
+    );
+
+    createLogicalTable(SCHEMA_FILE, physicalTableConfigMap, DEFAULT_TENANT,
+        _cluster1._controllerBaseApiUrl, logicalTableName, 
cluster1PhysicalTable + "_OFFLINE", null);
+    createLogicalTable(SCHEMA_FILE, physicalTableConfigMap, DEFAULT_TENANT,
+        _cluster2._controllerBaseApiUrl, logicalTableName, 
cluster2PhysicalTable + "_OFFLINE", null);
+  }
+
+  protected void dropLogicalTableIfExists(String logicalTableName, String 
controllerBaseApiUrl) {
+    dropResource(controllerBaseApiUrl + "/logicalTables/" + logicalTableName);
+  }
+
+  protected void setupFirstLogicalFederatedTable() throws Exception {
+    setupLogicalFederatedTable(LOGICAL_FEDERATION_CLUSTER_1_TABLE, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE);
+  }
+
+  protected void setupLogicalFederatedTable(String cluster1TableName, String 
cluster2TableName) throws Exception {
+    dropTableAndSchemaIfExists(cluster1TableName, 
_cluster1._controllerBaseApiUrl);
+    dropTableAndSchemaIfExists(cluster2TableName, 
_cluster2._controllerBaseApiUrl);
+    createSchemaAndTableForCluster(cluster1TableName, 
_cluster1._controllerBaseApiUrl);
+    createSchemaAndTableForCluster(cluster2TableName, 
_cluster2._controllerBaseApiUrl);
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java
index c86fcf97dcf..af91f83e910 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.spi.data;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.pinot.spi.config.BaseJsonConfig;
 
 
@@ -26,4 +27,18 @@ import org.apache.pinot.spi.config.BaseJsonConfig;
  * This is empty by design and more docs would be added as features are added.
  */
 public class PhysicalTableConfig extends BaseJsonConfig {
+  @JsonProperty("multiCluster")
+  private final boolean _isMultiCluster;
+
+  public PhysicalTableConfig() {
+    this(false);
+  }
+
+  public PhysicalTableConfig(boolean isMultiCluster) {
+    _isMultiCluster = isMultiCluster;
+  }
+
+  public boolean isMultiCluster() {
+    return _isMultiCluster;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 3004cafdc73..69862016e8c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -865,6 +865,8 @@ public class CommonConstants {
         // MAX(stringCol) -> MAXSTRING(stringCol)
         // SUM(intCol) -> SUMINT(intCol)
         public static final String AUTO_REWRITE_AGGREGATION_TYPE = 
"autoRewriteAggregationType";
+        // When enabled, allows multi cluster/federated queries to be executed.
+        public static final String ENABLE_MULTI_CLUSTER_ROUTING = 
"enableMultiClusterRouting";
 
         /// Option to customize the value of 
[Broker#CONFIG_OF_SORT_EXCHANGE_COPY_THRESHOLD]
         public static final String SORT_EXCHANGE_COPY_THRESHOLD = 
"sortExchangeCopyThreshold";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to