This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 21a372b Starts Broker and Server in parallel when using ServiceManager (#5917) 21a372b is described below commit 21a372b2f58f9c6e27fe9710d6952ed519342061 Author: Adrian Cole <adrianc...@users.noreply.github.com> AuthorDate: Fri Aug 28 01:14:31 2020 +0800 Starts Broker and Server in parallel when using ServiceManager (#5917) * Starts Broker and Server in parallel when using ServiceManager Fixes #5876 * rejig * also time SM * block until services complete * getZKHelixManager can throw * Don't check helix as it spams logs * matches case format * Ensures any failure results in exit code 1 --- .../admin/command/StartServiceManagerCommand.java | 132 +++++++++++++++++---- 1 file changed, 111 insertions(+), 21 deletions(-) diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java index 4fcd6b7..caa1ca0 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java @@ -23,10 +23,15 @@ import static org.apache.pinot.common.utils.CommonConstants.Helix.PINOT_SERVICE_ import java.io.File; import java.net.SocketException; import java.net.UnknownHostException; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.controller.ControllerConf; @@ -41,13 +46,20 @@ import org.slf4j.LoggerFactory; /** - * Class to implement StartPinotService command. + * Starts services in the following order, and returns false on any startup failure: * + * <p><ol> + * <li>{@link PinotServiceManager}</li> + * <li>Bootstrap services in role {@link ServiceRole#CONTROLLER}</li> + * <li>All remaining bootstrap services in parallel</li> + * </ol> */ public class StartServiceManagerCommand extends AbstractBaseAdminCommand implements Command { private static final Logger LOGGER = LoggerFactory.getLogger(StartServiceManagerCommand.class); - private final List<Map<String, Object>> _bootstrapConfigurations = new ArrayList<>(); - private final String[] BOOTSTRAP_SERVICES = new String[]{"CONTROLLER", "BROKER", "SERVER"}; + private static final long startTick = System.nanoTime(); + private static final String[] BOOTSTRAP_SERVICES = new String[]{"CONTROLLER", "BROKER", "SERVER"}; + // multiple instances allowed per role for testing many minions + private final List<Entry<ServiceRole, Map<String, Object>>> _bootstrapConfigurations = new ArrayList<>(); @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.") private boolean _help; @@ -150,21 +162,28 @@ public class StartServiceManagerCommand extends AbstractBaseAdminCommand impleme throws Exception { try { LOGGER.info("Executing command: " + toString()); - _pinotServiceManager = new PinotServiceManager(_zkAddress, _clusterName, _port); - _pinotServiceManager.start(); + if (!startPinotService("SERVICE_MANAGER", this::startServiceManager)) { + return false; + } + if (_bootstrapConfigPaths != null) { for (String configPath : _bootstrapConfigPaths) { - _bootstrapConfigurations.add(readConfigFromFile(configPath)); + Map<String, Object> config = readConfigFromFile(configPath); + ServiceRole role = ServiceRole.valueOf(config.get(PINOT_SERVICE_ROLE).toString()); + addBootstrapService(role, config); } } else if (_bootstrapServices != null) { for (String service : _bootstrapServices) { - ServiceRole serviceRole = ServiceRole.valueOf(service.toUpperCase()); - addBootstrapService(serviceRole, getDefaultConfig(serviceRole)); + ServiceRole role = ServiceRole.valueOf(service.toUpperCase()); + Map<String, Object> config = getDefaultConfig(role); + addBootstrapService(role, config); } } - for (Map<String, Object> properties : _bootstrapConfigurations) { - startPinotService(properties); + + if (!startBootstrapServices()) { + return false; } + String pidFile = ".pinotAdminService-" + System.currentTimeMillis() + ".pid"; savePID(System.getProperty("java.io.tmpdir") + File.separator + pidFile); return true; @@ -175,6 +194,12 @@ public class StartServiceManagerCommand extends AbstractBaseAdminCommand impleme } } + private String startServiceManager() { + _pinotServiceManager = new PinotServiceManager(_zkAddress, _clusterName, _port); + _pinotServiceManager.start(); + return _pinotServiceManager.getInstanceId(); + } + private Map<String, Object> getDefaultConfig(ServiceRole serviceRole) throws SocketException, UnknownHostException { switch (serviceRole) { @@ -191,26 +216,91 @@ public class StartServiceManagerCommand extends AbstractBaseAdminCommand impleme } } - private void startPinotService(Map<String, Object> properties) { - startPinotService(ServiceRole.valueOf(properties.get(PINOT_SERVICE_ROLE).toString()), properties); + /** + * Starts a controller synchronously unless the cluster already exists. Other services start in parallel. + */ + private boolean startBootstrapServices() { + if (_bootstrapConfigurations.isEmpty()) return true; + + List<Entry<ServiceRole, Map<String, Object>>> parallelConfigs = new ArrayList<>(); + + // Start controller(s) synchronously so that other services don't fail + // + // Note: Technically, we don't need to do this if the cluster already exists, but checking the + // cluster takes time and clutters logs with errors when it doesn't exist. + for (Entry<ServiceRole, Map<String, Object>> roleToConfig : _bootstrapConfigurations) { + if (roleToConfig.getKey() == ServiceRole.CONTROLLER) { + if (!startPinotService(ServiceRole.CONTROLLER, + () -> _pinotServiceManager.startRole(ServiceRole.CONTROLLER, roleToConfig.getValue()))) { + return false; + } + } else { + parallelConfigs.add(roleToConfig); + } + } + + return startBootstrapServicesInParallel(_pinotServiceManager, parallelConfigs); } - public boolean startPinotService(ServiceRole role, Map<String, Object> properties) { + static boolean startBootstrapServicesInParallel( + PinotServiceManager pinotServiceManager, + List<Entry<ServiceRole, Map<String, Object>>> parallelConfigs + ) { + if (parallelConfigs.isEmpty()) return true; + + // True is when everything succeeded + AtomicBoolean failed = new AtomicBoolean(false); + + List<Thread> threads = new ArrayList<>(); + for (Entry<ServiceRole, Map<String, Object>> roleToConfig : parallelConfigs) { + ServiceRole role = roleToConfig.getKey(); + Map<String, Object> config = roleToConfig.getValue(); + Thread thread = new Thread("Start a Pinot [" + role + "]") { + @Override public void run() { + if (!startPinotService(role, () -> pinotServiceManager.startRole(role, config))) { + failed.set(true); + } + } + }; + threads.add(thread); + // Unhandled exceptions are likely logged, so we don't need to re-log here + thread.setUncaughtExceptionHandler((t, e) -> failed.set(true)); + thread.start(); + } + + // Block until service startup completes + for (Thread thread : threads) { + try { + thread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + return !failed.get(); + } + + private static boolean startPinotService(Object role, Callable<String> serviceStarter) { try { - String instanceId = _pinotServiceManager.startRole(role, properties); - LOGGER.info("Started Pinot [{}] Instance [{}].", role, instanceId); + LOGGER.info("Starting a Pinot [{}] at {}s since launch", role, startOffsetSeconds()); + String instanceId = serviceStarter.call(); + LOGGER.info("Started Pinot [{}] instance [{}] at {}s since launch", role, instanceId, startOffsetSeconds()); } catch (Exception e) { - LOGGER.error(String.format("Failed to start a [ %s ] Service", role), e); + LOGGER.error(String.format("Failed to start a Pinot [%s] at %s since launch", role, startOffsetSeconds()), e); return false; } return true; } - public StartServiceManagerCommand addBootstrapService(ServiceRole role, Map<String, Object> properties) { - properties.put(PINOT_SERVICE_ROLE, role.toString()); - - _bootstrapConfigurations.add(properties); - + /** Creates millis precision unit of seconds. ex 1.002 */ + private static float startOffsetSeconds() { + return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTick) / 1000f; + } + + public StartServiceManagerCommand addBootstrapService(ServiceRole role, Map<String, Object> config) { + if (role == null) throw new NullPointerException("role == null"); + config.put(PINOT_SERVICE_ROLE, role.toString()); // Ensure config has role key + _bootstrapConfigurations.add(new SimpleImmutableEntry<>(role, config)); return this; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org