This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c4549e2e60 Fix the issue of server opening up query server prematurely
(#8785)
c4549e2e60 is described below
commit c4549e2e60e464f3224ec92752fde606fe0e5555
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu May 26 17:05:52 2022 -0700
Fix the issue of server opening up query server prematurely (#8785)
---
.../pinot/server/starter/ServerInstance.java | 78 ++++++++++++++--------
.../server/starter/helix/BaseServerStarter.java | 39 +++++------
2 files changed, 66 insertions(+), 51 deletions(-)
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 746059e3cf..3febfba096 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -63,7 +63,8 @@ public class ServerInstance {
private final GrpcQueryServer _grpcQueryServer;
private final AccessControl _accessControl;
- private boolean _started = false;
+ private boolean _dataManagerStarted = false;
+ private boolean _queryServerStarted = false;
public ServerInstance(ServerConf serverConf, HelixManager helixManager,
AccessControlFactory accessControlFactory)
throws Exception {
@@ -99,7 +100,7 @@ public class ServerInstance {
NettyConfig nettyConfig =
NettyConfig.extractNettyConfig(serverConf.getPinotConfig(),
CommonConstants.Server.SERVER_NETTY_PREFIX);
accessControlFactory.init(
-
serverConf.getPinotConfig().subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_ACCESS_CONTROL),
helixManager);
+
serverConf.getPinotConfig().subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_ACCESS_CONTROL),
helixManager);
_accessControl = accessControlFactory.create();
if (serverConf.isNettyServerEnabled()) {
@@ -143,18 +144,28 @@ public class ServerInstance {
LOGGER.info("Finish initializing server instance");
}
- public synchronized void start() {
+ public synchronized void startDataManager() {
// This method is called when Helix starts a new ZK session, and can be
called multiple times. We only need to start
- // the server instance once, and simply ignore the following invocations.
- if (_started) {
- LOGGER.info("Server instance is already running, skipping the start");
+ // the data manager once, and simply ignore the following invocations.
+ if (_dataManagerStarted) {
+ LOGGER.info("Data manager is already running, skipping the start");
return;
}
- LOGGER.info("Starting server instance");
-
- LOGGER.info("Starting instance data manager");
+ LOGGER.info("Starting data manager");
_instanceDataManager.start();
+ _dataManagerStarted = true;
+ LOGGER.info("Finish starting data manager");
+ }
+
+ public synchronized void startQueryServer() {
+ if (_queryServerStarted) {
+ LOGGER.warn("Query server is already running, skipping the start");
+ return;
+ }
+
+ LOGGER.info("Starting query server");
+
LOGGER.info("Starting query executor");
_queryExecutor.start();
LOGGER.info("Starting query scheduler");
@@ -172,39 +183,48 @@ public class ServerInstance {
_grpcQueryServer.start();
}
- _started = true;
- LOGGER.info("Finish starting server instance");
+ _dataManagerStarted = true;
+ LOGGER.info("Finish starting query server");
}
public synchronized void shutDown() {
- if (!_started) {
+ if (!_dataManagerStarted) {
LOGGER.warn("Server instance is not running, skipping the shut down");
return;
}
LOGGER.info("Shutting down server instance");
- if (_nettyTlsQueryServer != null) {
- LOGGER.info("Shutting down TLS-secured Netty query server");
- _nettyTlsQueryServer.shutDown();
- }
- if (_grpcQueryServer != null) {
- LOGGER.info("Shutting down gRPC query server");
- _grpcQueryServer.shutdown();
- }
- if (_nettyQueryServer != null) {
- LOGGER.info("Shutting down Netty query server");
- _nettyQueryServer.shutDown();
+ if (_queryServerStarted) {
+ LOGGER.info("Shutting down query server");
+
+ if (_nettyQueryServer != null) {
+ LOGGER.info("Shutting down Netty query server");
+ _nettyQueryServer.shutDown();
+ }
+ if (_nettyTlsQueryServer != null) {
+ LOGGER.info("Shutting down TLS-secured Netty query server");
+ _nettyTlsQueryServer.shutDown();
+ }
+ if (_grpcQueryServer != null) {
+ LOGGER.info("Shutting down gRPC query server");
+ _grpcQueryServer.shutdown();
+ }
+ LOGGER.info("Shutting down query scheduler");
+ _queryScheduler.stop();
+ LOGGER.info("Shutting down query executor");
+ _queryExecutor.shutDown();
+
+ _queryServerStarted = false;
+ LOGGER.info("Finish shutting down query server");
}
- LOGGER.info("Shutting down query scheduler");
- _queryScheduler.stop();
- LOGGER.info("Shutting down query executor");
- _queryExecutor.shutDown();
- LOGGER.info("Shutting down instance data manager");
+
+ LOGGER.info("Shutting down data manager");
_instanceDataManager.shutDown();
LOGGER.info("Shutting down metrics registry");
_serverMetrics.getMetricsRegistry().shutdown();
- _started = false;
+
+ _dataManagerStarted = false;
LOGGER.info("Finish shutting down server instance");
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 6f004f75d2..ee555d9dfe 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -329,7 +329,7 @@ public abstract class BaseServerStarter implements
ServiceStartable {
int grpcPort = serverConf.isEnableGrpcServer() ? serverConf.getGrpcPort()
: Integer.MIN_VALUE;
updated |= updatePortIfNeeded(simpleFields, Instance.GRPC_PORT_KEY,
grpcPort);
- // Update instance config with environment properties
+ // Update environment properties
if (_pinotEnvironmentProvider != null) {
// Retrieve failure domain information and add to the environment
properties map
String failureDomain = _pinotEnvironmentProvider.getFailureDomain();
@@ -342,6 +342,14 @@ public abstract class BaseServerStarter implements
ServiceStartable {
}
}
+ // Update system resource info (CPU, memory, etc)
+ Map<String, String> systemResourceInfoMap = new
SystemResourceInfo().toMap();
+ if
(!systemResourceInfoMap.equals(znRecord.getMapField(Instance.SYSTEM_RESOURCE_INFO_KEY)))
{
+ LOGGER.info("Updating instance: {} with system resource info: {}",
_instanceId, systemResourceInfoMap);
+ znRecord.setMapField(Instance.SYSTEM_RESOURCE_INFO_KEY,
systemResourceInfoMap);
+ updated = true;
+ }
+
// If 'shutdownInProgress' is not set (new instance, or not shut down
properly), set it to prevent brokers routing
// queries to it before finishing the startup check
if
(!Boolean.parseBoolean(simpleFields.get(Helix.IS_SHUTDOWN_IN_PROGRESS))) {
@@ -468,9 +476,9 @@ public abstract class BaseServerStarter implements
ServiceStartable {
new SegmentOnlineOfflineStateModelFactory(_instanceId,
instanceDataManager);
_helixManager.getStateMachineEngine()
.registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
stateModelFactory);
- // Start the server instance as a pre-connect callback so that it starts
after connecting to the ZK in order to
- // access the property store, but before receiving state transitions
- _helixManager.addPreConnectCallback(_serverInstance::start);
+ // Start the data manager as a pre-connect callback so that it starts
after connecting to the ZK in order to access
+ // the property store, but before receiving state transitions
+ _helixManager.addPreConnectCallback(_serverInstance::startDataManager);
LOGGER.info("Connecting Helix manager");
_helixManager.connect();
@@ -505,15 +513,17 @@ public abstract class BaseServerStarter implements
ServiceStartable {
startTimeMs +
_serverConf.getProperty(Server.CONFIG_OF_STARTUP_TIMEOUT_MS,
Server.DEFAULT_STARTUP_TIMEOUT_MS);
startupServiceStatusCheck(endTimeMs);
}
+
+ // Start the query server after finishing the service status check. If the
query server is started before all the
+ // segments are loaded, broker might not have finished processing the
callback of routing table update, and start
+ // querying the server pre-maturely.
+ _serverInstance.startQueryServer();
_helixAdmin.setConfig(_instanceConfigScope,
Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS,
Boolean.toString(false)));
// Throttling for realtime consumption is disabled up to this point to
allow maximum consumption during startup time
RealtimeConsumptionRateManager.getInstance().enableThrottling();
- // Set the system resource info (CPU, Memory, etc) in the InstanceConfig.
- setInstanceResourceInfo(_helixAdmin, _helixClusterName, _instanceId, new
SystemResourceInfo().toMap());
-
LOGGER.info("Pinot server ready");
// Create metrics for mmap stuff
@@ -741,21 +751,6 @@ public abstract class BaseServerStarter implements
ServiceStartable {
return _serverInstance;
}
- /**
- * Helper method to set system resource info into instance config.
- *
- * @param helixAdmin Helix Admin
- * @param helixClusterName Name of Helix cluster
- * @param instanceId Id of instance for which to set the system resource info
- * @param systemResourceMap Map containing system resource info
- */
- protected void setInstanceResourceInfo(HelixAdmin helixAdmin, String
helixClusterName, String instanceId,
- Map<String, String> systemResourceMap) {
- InstanceConfig instanceConfig =
helixAdmin.getInstanceConfig(helixClusterName, instanceId);
-
instanceConfig.getRecord().setMapField(Helix.Instance.SYSTEM_RESOURCE_INFO_KEY,
systemResourceMap);
- helixAdmin.setInstanceConfig(helixClusterName, instanceId, instanceConfig);
- }
-
/**
* Initialize the components to download segments from deep store. They used
to be
* initialized in SegmentFetcherAndLoader, which has been removed to
consolidate
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]