mcvsubbu commented on a change in pull request #4222: Add startup/shutdown
checks for HelixServerStarter
URL: https://github.com/apache/incubator-pinot/pull/4222#discussion_r286101047
##########
File path:
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
##########
@@ -352,135 +287,160 @@ public void stop() {
_adminApiApplication.stop();
setShuttingDownStatus(true);
- // Total waiting time should include max query time.
- final long endTime = _maxShutdownWaitTimeMs + System.currentTimeMillis();
- if
(_helixServerConfig.getBoolean(CommonConstants.Server.CONFIG_OF_ENABLE_SHUTDOWN_DELAY,
true)) {
- Uninterruptibles.sleepUninterruptibly(_maxQueryTimeMs,
TimeUnit.MILLISECONDS);
+ long endTimeMs = startTimeMs +
_serverConf.getLong(CONFIG_OF_SHUTDOWN_TIMEOUT_MS, DEFAULT_SHUTDOWN_TIMEOUT_MS);
+ if (_serverConf.getBoolean(CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK,
DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK)) {
+ shutdownQueryCheck(endTimeMs);
}
- waitUntilNoIncomingQueries(System.currentTimeMillis(), endTime);
_helixManager.disconnect();
_serverInstance.shutDown();
- waitUntilNoOnlineResources(System.currentTimeMillis(), endTime);
- }
-
- private void waitUntilNoIncomingQueries(long startTime, final long endTime) {
- if (startTime >= endTime) {
- LOGGER.warn("Skip waiting until no incoming queries.");
- return;
+ if (_serverConf.getBoolean(CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK,
DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK)) {
+ shutdownResourceCheck(endTimeMs);
}
- LOGGER.info("Waiting upto {}ms until Pinot server doesn't receive any
incoming queries...", (endTime - startTime));
- long currentTime = startTime;
+ }
- while (currentTime < endTime) {
- if (noIncomingQueries(currentTime)) {
- LOGGER.info("No incoming query within {}ms. Total waiting Time: {}ms",
_checkIntervalTimeMs,
- (currentTime - startTime));
- return;
+ /**
+ * When shutting down the server, drains the queries and waits for all the
existing queries to be finished.
+ *
+ * @param endTimeMs Timeout for the check
+ */
+ private void shutdownQueryCheck(long endTimeMs) {
+ LOGGER.info("Starting shutdown query check");
+ long startTimeMs = System.currentTimeMillis();
+
+ long maxQueryTimeMs =
_serverConf.getLong(CONFIG_OF_QUERY_EXECUTOR_TIMEOUT,
DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
+ long noQueryThresholdMs =
_serverConf.getLong(CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS, maxQueryTimeMs);
+
+ // Drain queries
+ boolean queriesDrained = false;
+ long currentTimeMs;
+ while ((currentTimeMs = System.currentTimeMillis()) < endTimeMs) {
+ long latestQueryTimeMs = _serverInstance.getLatestQueryTime();
+ if (currentTimeMs >= latestQueryTimeMs + noQueryThresholdMs) {
+ LOGGER.info("Finished draining queries (no query received within {}ms)
after {}ms",
+ currentTimeMs - latestQueryTimeMs, currentTimeMs - startTimeMs);
+ queriesDrained = true;
+ break;
}
-
try {
- Thread.sleep(Math.min(_maxQueryTimeMs, (endTime - currentTime)));
+ Thread.sleep(Math.min(noQueryThresholdMs - latestQueryTimeMs,
endTimeMs - currentTimeMs));
} catch (InterruptedException e) {
- LOGGER.error("Interrupted when waiting for Pinot server not to receive
any queries.", e);
+ LOGGER.error("Got interrupted while draining queries", e);
Thread.currentThread().interrupt();
- return;
+ break;
+ }
+ }
+ if (queriesDrained) {
+ // Ensure all the existing queries are finished
+ long latestQueryFinishTimeMs = _serverInstance.getLatestQueryTime() +
maxQueryTimeMs;
+ if (latestQueryFinishTimeMs > currentTimeMs) {
+ try {
+ Thread.sleep(latestQueryFinishTimeMs - currentTimeMs);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Got interrupted while waiting for all the existing
queries to be finished", e);
+ }
}
- currentTime = System.currentTimeMillis();
+ } else {
+ LOGGER.warn("Failed to drain queries within {}ms",
System.currentTimeMillis() - startTimeMs);
}
- LOGGER.error("Reach timeout when waiting for no incoming queries! Max
waiting time: {}ms", _maxShutdownWaitTimeMs);
}
/**
- * Init a helix spectator to watch the external view updates.
+ * When shutting down the server, waits for all the resources turn OFFLINE
(all partitions served by the server are
+ * neither ONLINE or CONSUMING).
+ *
+ * @param endTimeMs Timeout for the check
*/
- private void waitUntilNoOnlineResources(long startTime, final long endTime) {
- if (startTime >= endTime) {
- LOGGER.warn("Skip waiting until no online resources.");
+ private void shutdownResourceCheck(long endTimeMs) {
+ LOGGER.info("Starting shutdown resource check");
+ long startTimeMs = System.currentTimeMillis();
+
+ if (startTimeMs >= endTimeMs) {
+ LOGGER.warn("Skipping shutdown resource check because shutdown timeout
is already reached");
return;
}
- LOGGER.info("Waiting upto {}ms until no online resources...", (endTime -
startTime));
- // Initialize a helix spectator.
- HelixManager spectatorManager =
- HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId,
InstanceType.SPECTATOR, _zkServers);
+ HelixAdmin helixAdmin = null;
try {
- spectatorManager.connect();
-
- Set<String> resources =
fetchLatestTableResources(spectatorManager.getClusterManagmentTool());
+ helixAdmin = new ZKHelixAdmin(_zkServers);
+
+ // Monitor all enabled table resources that the server serves
+ Set<String> resourcesToMonitor = new HashSet<>();
+ for (String resourceName :
helixAdmin.getResourcesInCluster(_helixClusterName)) {
+ if (TableNameBuilder.isTableResource(resourceName)) {
+ IdealState idealState =
helixAdmin.getResourceIdealState(_helixClusterName, resourceName);
+ if (idealState == null || !idealState.isEnabled()) {
+ continue;
+ }
+ for (String partition : idealState.getPartitionSet()) {
+ if (idealState.getInstanceSet(partition).contains(_instanceId)) {
+ resourcesToMonitor.add(resourceName);
+ break;
+ }
+ }
+ }
+ }
- long currentTime = startTime;
- while (currentTime < endTime) {
- if (noOnlineResources(spectatorManager, resources)) {
- LOGGER.info("No online resource within {}ms. Total waiting Time:
{}ms", _checkIntervalTimeMs,
- (currentTime - startTime));
+ long checkIntervalMs = _serverConf
+ .getLong(CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS,
DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS);
+ while (System.currentTimeMillis() < endTimeMs) {
+ Iterator<String> iterator = resourcesToMonitor.iterator();
+ while (iterator.hasNext()) {
+ if (isResourceOffline(helixAdmin, iterator.next())) {
+ iterator.remove();
+ } else {
+ // Do not check remaining resources if one resource is not OFFLINE
+ break;
+ }
+ }
+ if (resourcesToMonitor.isEmpty()) {
+ LOGGER.info("All resources are OFFLINE after {}ms",
System.currentTimeMillis() - startTimeMs);
return;
}
-
try {
- Thread.sleep(Math.min(_checkIntervalTimeMs, (endTime -
currentTime)));
+ Thread.sleep(Math.min(checkIntervalMs, endTimeMs -
System.currentTimeMillis()));
} catch (InterruptedException e) {
- LOGGER.error("Interrupted when waiting for no online resources.", e);
+ LOGGER.warn("Got interrupted while waiting for all resources
OFFLINE", e);
Review comment:
return if we got interrupted
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]