This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 81bda1d26a5c5173864e6edc3081e7135041f4f1 Author: Rong Rong <[email protected]> AuthorDate: Mon Jun 6 16:41:15 2022 -0700 fix merge conflicts --- .github/workflows/pinot_tests.yml | 1 - .../org/apache/pinot/query/QueryEnvironment.java | 8 ++- .../query/runtime/utils/ServerRequestUtils.java | 2 +- .../org/apache/pinot/server/conf/ServerConf.java | 9 ++++ .../pinot/server/starter/ServerInstance.java | 57 ++++++++++------------ .../server/starter/helix/BaseServerStarter.java | 8 +++ 6 files changed, 51 insertions(+), 34 deletions(-) diff --git a/.github/workflows/pinot_tests.yml b/.github/workflows/pinot_tests.yml index cd7faab560..e27af6f790 100644 --- a/.github/workflows/pinot_tests.yml +++ b/.github/workflows/pinot_tests.yml @@ -34,7 +34,6 @@ on: pull_request: branches: - master - - multi_stage_query_engine paths-ignore: - "contrib/**" - "docs/**" diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 3a19156287..02c66247b6 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -19,6 +19,9 @@ package org.apache.pinot.query; import java.util.Collection; +import java.util.Properties; +import org.apache.calcite.config.CalciteConnectionConfigImpl; +import org.apache.calcite.config.CalciteConnectionProperty; import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.RelOptCluster; @@ -84,7 +87,10 @@ public class QueryEnvironment { _planner = new PlannerImpl(_config); // catalog - _catalogReader = new CalciteCatalogReader(_rootSchema, _rootSchema.path(null), _typeFactory, null); + Properties catalogReaderConfigProperties = new Properties(); + catalogReaderConfigProperties.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(), "true"); + _catalogReader = new CalciteCatalogReader(_rootSchema, _rootSchema.path(null), _typeFactory, + new CalciteConnectionConfigImpl(catalogReaderConfigProperties)); _validator = new Validator(SqlStdOperatorTable.instance(), _catalogReader, _typeFactory); // optimizer rules diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java index 5f17e83305..86ec13adcf 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java @@ -20,7 +20,6 @@ package org.apache.pinot.query.runtime.utils; import java.util.Map; import java.util.stream.Collectors; -import org.apache.pinot.common.metrics.PinotMetricUtils; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.DataSource; @@ -36,6 +35,7 @@ import org.apache.pinot.query.planner.stage.ProjectNode; import org.apache.pinot.query.planner.stage.StageNode; import org.apache.pinot.query.planner.stage.TableScanNode; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; +import org.apache.pinot.spi.metrics.PinotMetricUtils; /** diff --git a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java index a1f8e910af..908eb1b484 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.apache.pinot.spi.utils.CommonConstants.Server; @@ -109,6 +110,14 @@ public class ServerConf { return _serverConf.getProperty(Server.CONFIG_OF_GRPC_PORT, Server.DEFAULT_GRPC_PORT); } + public int getMultiStageServicePort() { + return _serverConf.getProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, QueryConfig.DEFAULT_QUERY_SERVER_PORT); + } + + public int getMultiStageMailboxPort() { + return _serverConf.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT); + } + public PinotConfiguration getConfig(String component) { return _serverConf.subset(PINOT_CONFIG_PREFIX + component); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java index 39026af155..ec1a32b3bf 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java @@ -107,41 +107,36 @@ public class ServerInstance { _accessControl = accessControlFactory.create(); if (serverConf.isMultiStageServerEnabled()) { - // WorkerQueryServer initialization - // because worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport, - // we can't enable any of the other 2 servers - // TODO: decouple server protocol and engine type. + LOGGER.info("Initializing Multi-stage query engine"); _workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(), _instanceDataManager, _serverMetrics); - _nettyQueryServer = null; - _nettyTlsQueryServer = null; - _grpcQueryServer = null; } else { _workerQueryServer = null; - if (serverConf.isNettyServerEnabled()) { - int nettyPort = serverConf.getNettyPort(); - LOGGER.info("Initializing Netty query server on port: {}", nettyPort); - _nettyQueryServer = new QueryServer(nettyPort, _queryScheduler, _serverMetrics, nettyConfig); - } else { - _nettyQueryServer = null; - } + } - if (serverConf.isNettyTlsServerEnabled()) { - int nettySecPort = serverConf.getNettyTlsPort(); - LOGGER.info("Initializing TLS-secured Netty query server on port: {}", nettySecPort); - _nettyTlsQueryServer = - new QueryServer(nettySecPort, _queryScheduler, _serverMetrics, nettyConfig, tlsConfig, _accessControl); - } else { - _nettyTlsQueryServer = null; - } - if (serverConf.isEnableGrpcServer()) { - int grpcPort = serverConf.getGrpcPort(); - LOGGER.info("Initializing gRPC query server on port: {}", grpcPort); - _grpcQueryServer = new GrpcQueryServer(grpcPort, - serverConf.isGrpcTlsServerEnabled() ? TlsUtils.extractTlsConfig(serverConf.getPinotConfig(), - CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null, _queryExecutor, _serverMetrics, _accessControl); - } else { - _grpcQueryServer = null; - } + if (serverConf.isNettyServerEnabled()) { + int nettyPort = serverConf.getNettyPort(); + LOGGER.info("Initializing Netty query server on port: {}", nettyPort); + _nettyQueryServer = new QueryServer(nettyPort, _queryScheduler, _serverMetrics, nettyConfig); + } else { + _nettyQueryServer = null; + } + + if (serverConf.isNettyTlsServerEnabled()) { + int nettySecPort = serverConf.getNettyTlsPort(); + LOGGER.info("Initializing TLS-secured Netty query server on port: {}", nettySecPort); + _nettyTlsQueryServer = + new QueryServer(nettySecPort, _queryScheduler, _serverMetrics, nettyConfig, tlsConfig, _accessControl); + } else { + _nettyTlsQueryServer = null; + } + if (serverConf.isEnableGrpcServer()) { + int grpcPort = serverConf.getGrpcPort(); + LOGGER.info("Initializing gRPC query server on port: {}", grpcPort); + _grpcQueryServer = new GrpcQueryServer(grpcPort, + serverConf.isGrpcTlsServerEnabled() ? TlsUtils.extractTlsConfig(serverConf.getPinotConfig(), + CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null, _queryExecutor, _serverMetrics, _accessControl); + } else { + _grpcQueryServer = null; } LOGGER.info("Initializing transform functions"); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index a411daec2d..dd405c898d 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -329,6 +329,14 @@ public abstract class BaseServerStarter implements ServiceStartable { int grpcPort = serverConf.isEnableGrpcServer() ? serverConf.getGrpcPort() : Integer.MIN_VALUE; updated |= updatePortIfNeeded(simpleFields, Instance.GRPC_PORT_KEY, grpcPort); + // Update multi-stage query engine ports + if (serverConf.isMultiStageServerEnabled()) { + updated |= updatePortIfNeeded(simpleFields, + Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY, serverConf.getMultiStageServicePort()); + updated |= updatePortIfNeeded(simpleFields, + Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY, serverConf.getMultiStageMailboxPort()); + } + // Update environment properties if (_pinotEnvironmentProvider != null) { // Retrieve failure domain information and add to the environment properties map --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
