This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 81bda1d26a5c5173864e6edc3081e7135041f4f1
Author: Rong Rong <[email protected]>
AuthorDate: Mon Jun 6 16:41:15 2022 -0700

    fix merge conflicts
---
 .github/workflows/pinot_tests.yml                  |  1 -
 .../org/apache/pinot/query/QueryEnvironment.java   |  8 ++-
 .../query/runtime/utils/ServerRequestUtils.java    |  2 +-
 .../org/apache/pinot/server/conf/ServerConf.java   |  9 ++++
 .../pinot/server/starter/ServerInstance.java       | 57 ++++++++++------------
 .../server/starter/helix/BaseServerStarter.java    |  8 +++
 6 files changed, 51 insertions(+), 34 deletions(-)

diff --git a/.github/workflows/pinot_tests.yml 
b/.github/workflows/pinot_tests.yml
index cd7faab560..e27af6f790 100644
--- a/.github/workflows/pinot_tests.yml
+++ b/.github/workflows/pinot_tests.yml
@@ -34,7 +34,6 @@ on:
   pull_request:
     branches:
       - master
-      - multi_stage_query_engine
     paths-ignore:
       - "contrib/**"
       - "docs/**"
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 3a19156287..02c66247b6 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -19,6 +19,9 @@
 package org.apache.pinot.query;
 
 import java.util.Collection;
+import java.util.Properties;
+import org.apache.calcite.config.CalciteConnectionConfigImpl;
+import org.apache.calcite.config.CalciteConnectionProperty;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptCluster;
@@ -84,7 +87,10 @@ public class QueryEnvironment {
     _planner = new PlannerImpl(_config);
 
     // catalog
-    _catalogReader = new CalciteCatalogReader(_rootSchema, 
_rootSchema.path(null), _typeFactory, null);
+    Properties catalogReaderConfigProperties = new Properties();
+    
catalogReaderConfigProperties.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(),
 "true");
+    _catalogReader = new CalciteCatalogReader(_rootSchema, 
_rootSchema.path(null), _typeFactory,
+        new CalciteConnectionConfigImpl(catalogReaderConfigProperties));
     _validator = new Validator(SqlStdOperatorTable.instance(), _catalogReader, 
_typeFactory);
 
     // optimizer rules
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
index 5f17e83305..86ec13adcf 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
@@ -20,7 +20,6 @@ package org.apache.pinot.query.runtime.utils;
 
 import java.util.Map;
 import java.util.stream.Collectors;
-import org.apache.pinot.common.metrics.PinotMetricUtils;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.DataSource;
@@ -36,6 +35,7 @@ import org.apache.pinot.query.planner.stage.ProjectNode;
 import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
 
 
 /**
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java 
b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
index a1f8e910af..908eb1b484 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import org.apache.pinot.spi.utils.CommonConstants.Server;
@@ -109,6 +110,14 @@ public class ServerConf {
     return _serverConf.getProperty(Server.CONFIG_OF_GRPC_PORT, 
Server.DEFAULT_GRPC_PORT);
   }
 
+  public int getMultiStageServicePort() {
+    return _serverConf.getProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, 
QueryConfig.DEFAULT_QUERY_SERVER_PORT);
+  }
+
+  public int getMultiStageMailboxPort() {
+    return _serverConf.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 
QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
+  }
+
   public PinotConfiguration getConfig(String component) {
     return _serverConf.subset(PINOT_CONFIG_PREFIX + component);
   }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 39026af155..ec1a32b3bf 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -107,41 +107,36 @@ public class ServerInstance {
     _accessControl = accessControlFactory.create();
 
     if (serverConf.isMultiStageServerEnabled()) {
-      // WorkerQueryServer initialization
-      // because worker requires both the "Netty port" for protocol transport; 
and "GRPC port" for mailbox transport,
-      // we can't enable any of the other 2 servers
-      // TODO: decouple server protocol and engine type.
+      LOGGER.info("Initializing Multi-stage query engine");
       _workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(), 
_instanceDataManager, _serverMetrics);
-      _nettyQueryServer = null;
-      _nettyTlsQueryServer = null;
-      _grpcQueryServer = null;
     } else {
       _workerQueryServer = null;
-      if (serverConf.isNettyServerEnabled()) {
-        int nettyPort = serverConf.getNettyPort();
-        LOGGER.info("Initializing Netty query server on port: {}", nettyPort);
-        _nettyQueryServer = new QueryServer(nettyPort, _queryScheduler, 
_serverMetrics, nettyConfig);
-      } else {
-        _nettyQueryServer = null;
-      }
+    }
 
-      if (serverConf.isNettyTlsServerEnabled()) {
-        int nettySecPort = serverConf.getNettyTlsPort();
-        LOGGER.info("Initializing TLS-secured Netty query server on port: {}", 
nettySecPort);
-        _nettyTlsQueryServer =
-            new QueryServer(nettySecPort, _queryScheduler, _serverMetrics, 
nettyConfig, tlsConfig, _accessControl);
-      } else {
-        _nettyTlsQueryServer = null;
-      }
-      if (serverConf.isEnableGrpcServer()) {
-        int grpcPort = serverConf.getGrpcPort();
-        LOGGER.info("Initializing gRPC query server on port: {}", grpcPort);
-        _grpcQueryServer = new GrpcQueryServer(grpcPort,
-            serverConf.isGrpcTlsServerEnabled() ? 
TlsUtils.extractTlsConfig(serverConf.getPinotConfig(),
-                CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null, 
_queryExecutor, _serverMetrics, _accessControl);
-      } else {
-        _grpcQueryServer = null;
-      }
+    if (serverConf.isNettyServerEnabled()) {
+      int nettyPort = serverConf.getNettyPort();
+      LOGGER.info("Initializing Netty query server on port: {}", nettyPort);
+      _nettyQueryServer = new QueryServer(nettyPort, _queryScheduler, 
_serverMetrics, nettyConfig);
+    } else {
+      _nettyQueryServer = null;
+    }
+
+    if (serverConf.isNettyTlsServerEnabled()) {
+      int nettySecPort = serverConf.getNettyTlsPort();
+      LOGGER.info("Initializing TLS-secured Netty query server on port: {}", 
nettySecPort);
+      _nettyTlsQueryServer =
+          new QueryServer(nettySecPort, _queryScheduler, _serverMetrics, 
nettyConfig, tlsConfig, _accessControl);
+    } else {
+      _nettyTlsQueryServer = null;
+    }
+    if (serverConf.isEnableGrpcServer()) {
+      int grpcPort = serverConf.getGrpcPort();
+      LOGGER.info("Initializing gRPC query server on port: {}", grpcPort);
+      _grpcQueryServer = new GrpcQueryServer(grpcPort,
+          serverConf.isGrpcTlsServerEnabled() ? 
TlsUtils.extractTlsConfig(serverConf.getPinotConfig(),
+              CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null, 
_queryExecutor, _serverMetrics, _accessControl);
+    } else {
+      _grpcQueryServer = null;
     }
 
     LOGGER.info("Initializing transform functions");
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index a411daec2d..dd405c898d 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -329,6 +329,14 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     int grpcPort = serverConf.isEnableGrpcServer() ? serverConf.getGrpcPort() 
: Integer.MIN_VALUE;
     updated |= updatePortIfNeeded(simpleFields, Instance.GRPC_PORT_KEY, 
grpcPort);
 
+    // Update multi-stage query engine ports
+    if (serverConf.isMultiStageServerEnabled()) {
+      updated |= updatePortIfNeeded(simpleFields,
+          Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY, 
serverConf.getMultiStageServicePort());
+      updated |= updatePortIfNeeded(simpleFields,
+          Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY, 
serverConf.getMultiStageMailboxPort());
+    }
+
     // Update environment properties
     if (_pinotEnvironmentProvider != null) {
       // Retrieve failure domain information and add to the environment 
properties map


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to