This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c4549e2e60 Fix the issue of server opening up query server prematurely 
(#8785)
c4549e2e60 is described below

commit c4549e2e60e464f3224ec92752fde606fe0e5555
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu May 26 17:05:52 2022 -0700

    Fix the issue of server opening up query server prematurely (#8785)
---
 .../pinot/server/starter/ServerInstance.java       | 78 ++++++++++++++--------
 .../server/starter/helix/BaseServerStarter.java    | 39 +++++------
 2 files changed, 66 insertions(+), 51 deletions(-)

diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 746059e3cf..3febfba096 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -63,7 +63,8 @@ public class ServerInstance {
   private final GrpcQueryServer _grpcQueryServer;
   private final AccessControl _accessControl;
 
-  private boolean _started = false;
+  private boolean _dataManagerStarted = false;
+  private boolean _queryServerStarted = false;
 
   public ServerInstance(ServerConf serverConf, HelixManager helixManager, 
AccessControlFactory accessControlFactory)
       throws Exception {
@@ -99,7 +100,7 @@ public class ServerInstance {
     NettyConfig nettyConfig =
         NettyConfig.extractNettyConfig(serverConf.getPinotConfig(), 
CommonConstants.Server.SERVER_NETTY_PREFIX);
     accessControlFactory.init(
-      
serverConf.getPinotConfig().subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_ACCESS_CONTROL),
 helixManager);
+        
serverConf.getPinotConfig().subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_ACCESS_CONTROL),
 helixManager);
     _accessControl = accessControlFactory.create();
 
     if (serverConf.isNettyServerEnabled()) {
@@ -143,18 +144,28 @@ public class ServerInstance {
     LOGGER.info("Finish initializing server instance");
   }
 
-  public synchronized void start() {
+  public synchronized void startDataManager() {
     // This method is called when Helix starts a new ZK session, and can be 
called multiple times. We only need to start
-    // the server instance once, and simply ignore the following invocations.
-    if (_started) {
-      LOGGER.info("Server instance is already running, skipping the start");
+    // the data manager once, and simply ignore the following invocations.
+    if (_dataManagerStarted) {
+      LOGGER.info("Data manager is already running, skipping the start");
       return;
     }
 
-    LOGGER.info("Starting server instance");
-
-    LOGGER.info("Starting instance data manager");
+    LOGGER.info("Starting data manager");
     _instanceDataManager.start();
+    _dataManagerStarted = true;
+    LOGGER.info("Finish starting data manager");
+  }
+
+  public synchronized void startQueryServer() {
+    if (_queryServerStarted) {
+      LOGGER.warn("Query server is already running, skipping the start");
+      return;
+    }
+
+    LOGGER.info("Starting query server");
+
     LOGGER.info("Starting query executor");
     _queryExecutor.start();
     LOGGER.info("Starting query scheduler");
@@ -172,39 +183,48 @@ public class ServerInstance {
       _grpcQueryServer.start();
     }
 
-    _started = true;
-    LOGGER.info("Finish starting server instance");
+    _dataManagerStarted = true;
+    LOGGER.info("Finish starting query server");
   }
 
   public synchronized void shutDown() {
-    if (!_started) {
+    if (!_dataManagerStarted) {
       LOGGER.warn("Server instance is not running, skipping the shut down");
       return;
     }
 
     LOGGER.info("Shutting down server instance");
 
-    if (_nettyTlsQueryServer != null) {
-      LOGGER.info("Shutting down TLS-secured Netty query server");
-      _nettyTlsQueryServer.shutDown();
-    }
-    if (_grpcQueryServer != null) {
-      LOGGER.info("Shutting down gRPC query server");
-      _grpcQueryServer.shutdown();
-    }
-    if (_nettyQueryServer != null) {
-      LOGGER.info("Shutting down Netty query server");
-      _nettyQueryServer.shutDown();
+    if (_queryServerStarted) {
+      LOGGER.info("Shutting down query server");
+
+      if (_nettyQueryServer != null) {
+        LOGGER.info("Shutting down Netty query server");
+        _nettyQueryServer.shutDown();
+      }
+      if (_nettyTlsQueryServer != null) {
+        LOGGER.info("Shutting down TLS-secured Netty query server");
+        _nettyTlsQueryServer.shutDown();
+      }
+      if (_grpcQueryServer != null) {
+        LOGGER.info("Shutting down gRPC query server");
+        _grpcQueryServer.shutdown();
+      }
+      LOGGER.info("Shutting down query scheduler");
+      _queryScheduler.stop();
+      LOGGER.info("Shutting down query executor");
+      _queryExecutor.shutDown();
+
+      _queryServerStarted = false;
+      LOGGER.info("Finish shutting down query server");
     }
-    LOGGER.info("Shutting down query scheduler");
-    _queryScheduler.stop();
-    LOGGER.info("Shutting down query executor");
-    _queryExecutor.shutDown();
-    LOGGER.info("Shutting down instance data manager");
+
+    LOGGER.info("Shutting down data manager");
     _instanceDataManager.shutDown();
     LOGGER.info("Shutting down metrics registry");
     _serverMetrics.getMetricsRegistry().shutdown();
-    _started = false;
+
+    _dataManagerStarted = false;
     LOGGER.info("Finish shutting down server instance");
   }
 
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 6f004f75d2..ee555d9dfe 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -329,7 +329,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     int grpcPort = serverConf.isEnableGrpcServer() ? serverConf.getGrpcPort() 
: Integer.MIN_VALUE;
     updated |= updatePortIfNeeded(simpleFields, Instance.GRPC_PORT_KEY, 
grpcPort);
 
-    // Update instance config with environment properties
+    // Update environment properties
     if (_pinotEnvironmentProvider != null) {
       // Retrieve failure domain information and add to the environment 
properties map
       String failureDomain = _pinotEnvironmentProvider.getFailureDomain();
@@ -342,6 +342,14 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
       }
     }
 
+    // Update system resource info (CPU, memory, etc)
+    Map<String, String> systemResourceInfoMap = new 
SystemResourceInfo().toMap();
+    if 
(!systemResourceInfoMap.equals(znRecord.getMapField(Instance.SYSTEM_RESOURCE_INFO_KEY)))
 {
+      LOGGER.info("Updating instance: {} with system resource info: {}", 
_instanceId, systemResourceInfoMap);
+      znRecord.setMapField(Instance.SYSTEM_RESOURCE_INFO_KEY, 
systemResourceInfoMap);
+      updated = true;
+    }
+
     // If 'shutdownInProgress' is not set (new instance, or not shut down 
properly), set it to prevent brokers routing
     // queries to it before finishing the startup check
     if 
(!Boolean.parseBoolean(simpleFields.get(Helix.IS_SHUTDOWN_IN_PROGRESS))) {
@@ -468,9 +476,9 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
         new SegmentOnlineOfflineStateModelFactory(_instanceId, 
instanceDataManager);
     _helixManager.getStateMachineEngine()
         
.registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
 stateModelFactory);
-    // Start the server instance as a pre-connect callback so that it starts 
after connecting to the ZK in order to
-    // access the property store, but before receiving state transitions
-    _helixManager.addPreConnectCallback(_serverInstance::start);
+    // Start the data manager as a pre-connect callback so that it starts 
after connecting to the ZK in order to access
+    // the property store, but before receiving state transitions
+    _helixManager.addPreConnectCallback(_serverInstance::startDataManager);
 
     LOGGER.info("Connecting Helix manager");
     _helixManager.connect();
@@ -505,15 +513,17 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
           startTimeMs + 
_serverConf.getProperty(Server.CONFIG_OF_STARTUP_TIMEOUT_MS, 
Server.DEFAULT_STARTUP_TIMEOUT_MS);
       startupServiceStatusCheck(endTimeMs);
     }
+
+    // Start the query server after finishing the service status check. If the 
query server is started before all the
+    // segments are loaded, broker might not have finished processing the 
callback of routing table update, and start
+    // querying the server pre-maturely.
+    _serverInstance.startQueryServer();
     _helixAdmin.setConfig(_instanceConfigScope,
         Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS, 
Boolean.toString(false)));
 
     // Throttling for realtime consumption is disabled up to this point to 
allow maximum consumption during startup time
     RealtimeConsumptionRateManager.getInstance().enableThrottling();
 
-    // Set the system resource info (CPU, Memory, etc) in the InstanceConfig.
-    setInstanceResourceInfo(_helixAdmin, _helixClusterName, _instanceId, new 
SystemResourceInfo().toMap());
-
     LOGGER.info("Pinot server ready");
 
     // Create metrics for mmap stuff
@@ -741,21 +751,6 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     return _serverInstance;
   }
 
-  /**
-   * Helper method to set system resource info into instance config.
-   *
-   * @param helixAdmin Helix Admin
-   * @param helixClusterName Name of Helix cluster
-   * @param instanceId Id of instance for which to set the system resource info
-   * @param systemResourceMap Map containing system resource info
-   */
-  protected void setInstanceResourceInfo(HelixAdmin helixAdmin, String 
helixClusterName, String instanceId,
-      Map<String, String> systemResourceMap) {
-    InstanceConfig instanceConfig = 
helixAdmin.getInstanceConfig(helixClusterName, instanceId);
-    
instanceConfig.getRecord().setMapField(Helix.Instance.SYSTEM_RESOURCE_INFO_KEY, 
systemResourceMap);
-    helixAdmin.setInstanceConfig(helixClusterName, instanceId, instanceConfig);
-  }
-
   /**
    * Initialize the components to download segments from deep store. They used 
to be
    * initialized in SegmentFetcherAndLoader, which has been removed to 
consolidate


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to