Repository: hive
Updated Branches:
  refs/heads/master 8fd767079 -> af3b21239


HIVE-15217: Add watch mode to llap status tool (Prasanth Jayachandran
reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/af3b2123
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/af3b2123
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/af3b2123

Branch: refs/heads/master
Commit: af3b212395c400aaa653e2d31124d24658f1aa8b
Parents: 8fd7670
Author: Prasanth Jayachandran <[email protected]>
Authored: Thu Nov 24 06:30:38 2016 -0600
Committer: Prasanth Jayachandran <[email protected]>
Committed: Thu Nov 24 06:30:46 2016 -0600

----------------------------------------------------------------------
 .../llap/cli/LlapStatusOptionsProcessor.java    |  72 +++-
 .../hive/llap/cli/LlapStatusServiceDriver.java  | 376 +++++++++++--------
 2 files changed, 278 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/af3b2123/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java
index 306391b..a501b6c 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.llap.cli;
 
+import java.util.Arrays;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -27,17 +28,15 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class LlapStatusOptionsProcessor {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(LlapStatusOptionsProcessor.class);
-
   private static final String LLAPSTATUS_CONSTANT = "llapstatus";
 
   private static final long FIND_YARN_APP_TIMEOUT_MS = 20 * 1000l; // 
20seconds to wait for app to be visible
 
+  private static final long DEFAULT_STATUS_REFRESH_INTERVAL_MS = 1 * 1000l; // 
1 seconds wait until subsequent status
+  private static final long DEFAULT_WATCH_MODE_TIMEOUT_MS = 5 * 60 * 1000l; // 
5 minutes timeout for watch mode
   enum OptionConstants {
 
     NAME("name", 'n', "LLAP cluster name", true),
@@ -45,6 +44,14 @@ public class LlapStatusOptionsProcessor {
         "Amount of time(s) that the tool will sleep to wait for the YARN 
application to start. negative values=wait forever, 0=Do not wait. default=" +
             TimeUnit.SECONDS.convert(FIND_YARN_APP_TIMEOUT_MS, 
TimeUnit.MILLISECONDS) + "s", true),
     OUTPUT_FILE("outputFile", 'o', "File to which output should be written 
(Default stdout)", true),
+    WATCH_UNTIL_STATUS_CHANGE("watchUntil", 'w', "Watch until LLAP application 
status changes to the specified " +
+      "desired state before printing to console. Accepted values are " + 
Arrays.toString(LlapStatusServiceDriver.State
+      .values()), true),
+    STATUS_REFRESH_INTERVAL("refreshInterval", 'i', "Amount of time in seconds 
to wait until subsequent status checks" +
+      " in watch mode. Valid only for watch mode. (Default " +
+      TimeUnit.SECONDS.convert(DEFAULT_STATUS_REFRESH_INTERVAL_MS, 
TimeUnit.MILLISECONDS) + "s)", true),
+    WATCH_MODE_TIMEOUT("watchTimeout", 't', "Exit watch mode if the desired 
state is not attained until the specified" +
+      " timeout. (Default " + 
TimeUnit.SECONDS.convert(DEFAULT_WATCH_MODE_TIMEOUT_MS, TimeUnit.MILLISECONDS) 
+"s)", true),
     HIVECONF("hiveconf", null, "Use value for given property. Overridden by 
explicit parameters", "property=value", 2),
     HELP("help", 'H', "Print help information", false);
 
@@ -94,17 +101,26 @@ public class LlapStatusOptionsProcessor {
     private final Properties conf;
     private final long findAppTimeoutMs;
     private final String outputFile;
+    private final long refreshIntervalMs;
+    private final LlapStatusServiceDriver.State watchUntil;
+    private final long watchTimeout;
+
+    public LlapStatusOptions(String name) {
+      this(name, new Properties(), FIND_YARN_APP_TIMEOUT_MS, null, 
DEFAULT_STATUS_REFRESH_INTERVAL_MS, null,
+        DEFAULT_WATCH_MODE_TIMEOUT_MS);
+    }
 
     public LlapStatusOptions(String name, Properties hiveProperties, long 
findAppTimeoutMs,
-                             String outputFile) {
+                             String outputFile, long refreshIntervalMs,
+                             final LlapStatusServiceDriver.State watchUntil,
+                             final long watchTimeoutMs) {
       this.name = name;
       this.conf = hiveProperties;
       this.findAppTimeoutMs = findAppTimeoutMs;
       this.outputFile = outputFile;
-    }
-
-    public LlapStatusOptions(String name) {
-      this(name, new Properties(), FIND_YARN_APP_TIMEOUT_MS, null);
+      this.refreshIntervalMs = refreshIntervalMs;
+      this.watchUntil = watchUntil;
+      this.watchTimeout = watchTimeoutMs;
     }
 
     public String getName() {
@@ -122,6 +138,18 @@ public class LlapStatusOptionsProcessor {
     public String getOutputFile() {
       return outputFile;
     }
+
+    public long getRefreshIntervalMs() {
+      return refreshIntervalMs;
+    }
+
+    public LlapStatusServiceDriver.State getWatchUntilState() {
+      return watchUntil;
+    }
+
+    public long getWatchTimeoutMs() {
+      return watchTimeout;
+    }
   }
 
   private final Options options = new Options();
@@ -170,7 +198,31 @@ public class LlapStatusOptionsProcessor {
       outputFile = 
commandLine.getOptionValue(OptionConstants.OUTPUT_FILE.getLongOpt());
     }
 
-    return new LlapStatusOptions(name, hiveConf, findAppTimeoutMs, outputFile);
+    long refreshIntervalMs = DEFAULT_STATUS_REFRESH_INTERVAL_MS;
+    if 
(commandLine.hasOption(OptionConstants.STATUS_REFRESH_INTERVAL.getLongOpt())) {
+      long refreshIntervalSec = 
Long.parseLong(commandLine.getOptionValue(OptionConstants.STATUS_REFRESH_INTERVAL
+        .getLongOpt()));
+      if (refreshIntervalSec <= 0) {
+        throw new IllegalArgumentException("Refresh interval should be >0");
+      }
+      refreshIntervalMs = TimeUnit.MILLISECONDS.convert(refreshIntervalSec, 
TimeUnit.SECONDS);
+    }
+
+    LlapStatusServiceDriver.State watchUntil = null;
+    if 
(commandLine.hasOption(OptionConstants.WATCH_UNTIL_STATUS_CHANGE.getLongOpt())) 
{
+      String watchUntilStr = 
commandLine.getOptionValue(OptionConstants.WATCH_UNTIL_STATUS_CHANGE.getLongOpt());
+      watchUntil = LlapStatusServiceDriver.State.valueOf(watchUntilStr);
+    }
+
+    long watchTimeoutMs = DEFAULT_WATCH_MODE_TIMEOUT_MS;
+    if 
(commandLine.hasOption(OptionConstants.WATCH_MODE_TIMEOUT.getLongOpt())) {
+      long watchTimeoutSec = 
Long.parseLong(commandLine.getOptionValue(OptionConstants.WATCH_MODE_TIMEOUT.getLongOpt()));
+      if (watchTimeoutSec <= 0) {
+        throw new IllegalArgumentException("Watch timeout should be >0");
+      }
+      watchTimeoutMs = TimeUnit.MILLISECONDS.convert(watchTimeoutSec, 
TimeUnit.SECONDS);
+    }
+    return new LlapStatusOptions(name, hiveConf, findAppTimeoutMs, outputFile, 
refreshIntervalMs, watchUntil, watchTimeoutMs);
   }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/af3b2123/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
index 0efe545..97a131e 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
@@ -24,12 +24,14 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintWriter;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.StringUtils;
@@ -99,20 +101,57 @@ public class LlapStatusServiceDriver {
 
   private static final String CONFIG_LLAP_ZK_REGISTRY_TIMEOUT_MS =
       CONF_PREFIX + "zk-registry.timeout-ms";
-  private static final long CONFIG_LLAP_ZK_REGISTRY_TIMEOUT_MS_DEFAULT = 
10000l;
+  private static final long CONFIG_LLAP_ZK_REGISTRY_TIMEOUT_MS_DEFAULT = 
20000l;
 
-
-  private static final String AM_KEY = "slider-appmaster";
   private static final String LLAP_KEY = "LLAP";
-
   private final Configuration conf;
   private final Clock clock = new SystemClock();
+  private String appName = null;
+  private SliderClient sliderClient = null;
+  private Configuration llapRegistryConf = null;
+  private LlapRegistryService llapRegistry = null;
+
   @VisibleForTesting
-  final AppStatusBuilder appStatusBuilder = new AppStatusBuilder();
+  AppStatusBuilder appStatusBuilder;
 
   public LlapStatusServiceDriver() {
     SessionState ss = SessionState.get();
     conf = (ss != null) ? ss.getConf() : new HiveConf(SessionState.class);
+    setupConf();
+  }
+
+  private void setupConf() {
+    for (String f : LlapDaemonConfiguration.DAEMON_CONFIGS) {
+      conf.addResource(f);
+    }
+    conf.reloadConfiguration();
+
+    // Setup timeouts for various services.
+
+    // Once we move to a Hadoop-2.8 dependency, the following paramteer can be 
used.
+    // 
conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC);
+    conf.set("yarn.timeline-service.entity-group-fs-store.retry-policy-spec",
+      conf.get(CONFIG_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC,
+        
CONFIG_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC_DEFAULT));
+
+    conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
+      conf.getLong(CONFIG_YARN_RM_TIMEOUT_MAX_WAIT_MS,
+        CONFIG_YARN_RM_TIMEOUT_MAX_WAIT_MS_DEFAULT));
+    conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
+      conf.getLong(CONFIG_YARN_RM_RETRY_INTERVAL_MS, 
CONFIG_YARN_RM_RETRY_INTERVAL_MS_DEFAULT));
+
+    
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+      conf.getInt(CONFIG_IPC_CLIENT_CONNECT_MAX_RETRIES,
+        CONFIG_IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT));
+    
conf.setLong(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY,
+      conf.getLong(CONFIG_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS,
+        CONFIG_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS_DEFAULT));
+
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, 
(conf
+      .getLong(CONFIG_LLAP_ZK_REGISTRY_TIMEOUT_MS, 
CONFIG_LLAP_ZK_REGISTRY_TIMEOUT_MS_DEFAULT) +
+      "ms"));
+
+    llapRegistryConf = new Configuration(conf);
   }
 
   /**
@@ -135,66 +174,37 @@ public class LlapStatusServiceDriver {
   }
 
   public int run(LlapStatusOptions options) {
-
-    SliderClient sliderClient = null;
+    appStatusBuilder = new AppStatusBuilder();
     try {
+      if (appName == null) {
+        // user provided configs
+        for (Map.Entry<Object, Object> props : options.getConf().entrySet()) {
+          conf.set((String) props.getKey(), (String) props.getValue());
+        }
 
-      for (String f : LlapDaemonConfiguration.DAEMON_CONFIGS) {
-        conf.addResource(f);
-      }
-      conf.reloadConfiguration();
-      for (Map.Entry<Object, Object> props : options.getConf().entrySet()) {
-        conf.set((String) props.getKey(), (String) props.getValue());
-      }
-
-      // Setup timeouts for various services.
-
-      // Once we move to a Hadoop-2.8 dependency, the following paramteer can 
be used.
-      // 
conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC);
-      conf.set("yarn.timeline-service.entity-group-fs-store.retry-policy-spec",
-          
conf.get(CONFIG_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC,
-              
CONFIG_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC_DEFAULT));
-
-      conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
-          conf.getLong(CONFIG_YARN_RM_TIMEOUT_MAX_WAIT_MS,
-              CONFIG_YARN_RM_TIMEOUT_MAX_WAIT_MS_DEFAULT));
-      conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
-          conf.getLong(CONFIG_YARN_RM_RETRY_INTERVAL_MS, 
CONFIG_YARN_RM_RETRY_INTERVAL_MS_DEFAULT));
-
-      
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
-          conf.getInt(CONFIG_IPC_CLIENT_CONNECT_MAX_RETRIES,
-              CONFIG_IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT));
-      
conf.setLong(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY,
-          conf.getLong(CONFIG_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS,
-              CONFIG_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS_DEFAULT));
-
-      HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, 
(conf
-          .getLong(CONFIG_LLAP_ZK_REGISTRY_TIMEOUT_MS, 
CONFIG_LLAP_ZK_REGISTRY_TIMEOUT_MS_DEFAULT) +
-          "ms"));
-
-
-
-      String appName;
-      appName = options.getName();
-      if (StringUtils.isEmpty(appName)) {
-        appName = HiveConf.getVar(conf, 
HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
-        if (appName.startsWith("@") && appName.length() > 1) {
-          // This is a valid slider app name. Parse it out.
-          appName = appName.substring(1);
-        } else {
-          // Invalid app name. Checked later.
-          appName = null;
+        appName = options.getName();
+        if (StringUtils.isEmpty(appName)) {
+          appName = HiveConf.getVar(conf, 
HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
+          if (appName.startsWith("@") && appName.length() > 1) {
+            // This is a valid slider app name. Parse it out.
+            appName = appName.substring(1);
+          } else {
+            // Invalid app name. Checked later.
+            appName = null;
+          }
         }
-      }
-      if (StringUtils.isEmpty(appName)) {
-        String message =
+        if (StringUtils.isEmpty(appName)) {
+          String message =
             "Invalid app name. This must be setup via config or passed in as a 
parameter." +
-                " This tool works with clusters deployed by Slider/YARN";
-        LOG.info(message);
-        return ExitCode.INCORRECT_USAGE.getInt();
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Using appName: {}", appName);
+              " This tool works with clusters deployed by Slider/YARN";
+          LOG.info(message);
+          return ExitCode.INCORRECT_USAGE.getInt();
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Using appName: {}", appName);
+        }
+
+        
llapRegistryConf.set(HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + 
appName);
       }
 
       try {
@@ -205,7 +215,7 @@ public class LlapStatusServiceDriver {
       }
 
       // Get the App report from YARN
-      ApplicationReport appReport = null;
+      ApplicationReport appReport;
       try {
         appReport = getAppReport(appName, sliderClient, 
options.getFindAppTimeoutMs());
       } catch (LlapStatusCliException e) {
@@ -225,7 +235,7 @@ public class LlapStatusServiceDriver {
       if (ret != ExitCode.SUCCESS) {
         return ret.getInt();
       } else if (EnumSet.of(State.APP_NOT_FOUND, State.COMPLETE, 
State.LAUNCHING)
-          .contains(appStatusBuilder.getState())) {
+        .contains(appStatusBuilder.getState())) {
         return ExitCode.SUCCESS.getInt();
       } else {
         // Get information from slider.
@@ -238,24 +248,22 @@ public class LlapStatusServiceDriver {
         }
       }
 
-      if (ret !=ExitCode.SUCCESS ) {
+      if (ret != ExitCode.SUCCESS) {
         return ret.getInt();
       } else {
         try {
-          ret = populateAppStatusFromLlapRegistry(appName, appStatusBuilder);
+          ret = populateAppStatusFromLlapRegistry(appStatusBuilder);
         } catch (LlapStatusCliException e) {
           logError(e);
           return e.getExitCode().getInt();
         }
       }
+
       return ret.getInt();
-    }finally {
+    } finally {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Final AppState: " + appStatusBuilder.toString());
       }
-      if (sliderClient != null) {
-        sliderClient.stop();
-      }
     }
   }
 
@@ -274,7 +282,10 @@ public class LlapStatusServiceDriver {
   }
 
   private SliderClient createSliderClient() throws LlapStatusCliException {
-    SliderClient sliderClient;
+    if (sliderClient != null) {
+      return sliderClient;
+    }
+
     try {
       sliderClient = new SliderClient() {
         @Override
@@ -285,17 +296,16 @@ public class LlapStatusServiceDriver {
       };
       Configuration sliderClientConf = new Configuration(conf);
       sliderClientConf = sliderClient.bindArgs(sliderClientConf,
-          new String[] { "help" });
+        new String[]{"help"});
       sliderClient.init(sliderClientConf);
       sliderClient.start();
       return sliderClient;
     } catch (Exception e) {
       throw new 
LlapStatusCliException(ExitCode.SLIDER_CLIENT_ERROR_CREATE_FAILED,
-          "Failed to create slider client", e);
+        "Failed to create slider client", e);
     }
   }
 
-
   private ApplicationReport getAppReport(String appName, SliderClient 
sliderClient,
                                          long timeoutMs) throws 
LlapStatusCliException {
 
@@ -466,94 +476,87 @@ public class LlapStatusServiceDriver {
 
 
   /**
-   *
-   * @param appName
    * @param appStatusBuilder
    * @return an ExitCode. An ExitCode other than ExitCode.SUCCESS implies 
future progress not possible
    * @throws LlapStatusCliException
    */
-  private ExitCode populateAppStatusFromLlapRegistry(String appName, 
AppStatusBuilder appStatusBuilder) throws
-      LlapStatusCliException {
-    Configuration llapRegistryConf= new Configuration(conf);
-    llapRegistryConf
-        .set(HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + 
appName);
-    LlapRegistryService llapRegistry;
-    try {
-      llapRegistry = LlapRegistryService.getClient(llapRegistryConf);
-    } catch (Exception e) {
-      throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR,
-          "Failed to create llap registry client", e);
-    }
-    try {
-      Collection<ServiceInstance> serviceInstances;
+  private ExitCode populateAppStatusFromLlapRegistry(AppStatusBuilder 
appStatusBuilder) throws
+    LlapStatusCliException {
+
+    if (llapRegistry == null) {
       try {
-        serviceInstances = llapRegistry.getInstances().getAll();
-      } catch (IOException e) {
-        throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR, "Failed 
to get instances from llap registry", e);
+        llapRegistry = LlapRegistryService.getClient(llapRegistryConf);
+      } catch (Exception e) {
+        throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR,
+          "Failed to create llap registry client", e);
       }
+    }
 
-      if (serviceInstances == null || serviceInstances.isEmpty()) {
-        LOG.info("No information found in the LLAP registry");
-        appStatusBuilder.setLiveInstances(0);
-        appStatusBuilder.setState(State.LAUNCHING);
-        appStatusBuilder.clearLlapInstances();
-        return ExitCode.SUCCESS;
-      } else {
-        // Tracks instances known by both slider and llap.
-        List<LlapInstance> validatedInstances = new LinkedList<>();
-        List<String> llapExtraInstances = new LinkedList<>();
-
-        for (ServiceInstance serviceInstance : serviceInstances) {
-          String containerIdString = serviceInstance.getProperties().get(
-              HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
-
-          LlapInstance llapInstance = 
appStatusBuilder.removeAndgetLlapInstanceForContainer(
-              containerIdString);
-          if (llapInstance != null) {
-            llapInstance.setMgmtPort(serviceInstance.getManagementPort());
-            llapInstance.setRpcPort(serviceInstance.getRpcPort());
-            llapInstance.setShufflePort(serviceInstance.getShufflePort());
-            llapInstance.setWebUrl(serviceInstance.getServicesAddress());
-            llapInstance.setStatusUrl(serviceInstance.getServicesAddress() + 
"/status");
-            validatedInstances.add(llapInstance);
-          } else {
-            // This likely indicates that an instance has recently restarted
-            // (the old instance has not been unregistered), and the new 
instances has not registered yet.
-            llapExtraInstances.add(containerIdString);
-            // This instance will not be added back, since it's services are 
not up yet.
-          }
-
-        }
+    Collection<ServiceInstance> serviceInstances;
+    try {
+      serviceInstances = llapRegistry.getInstances().getAll();
+    } catch (IOException e) {
+      throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR, "Failed 
to get instances from llap registry", e);
+    }
 
-        appStatusBuilder.setLiveInstances(validatedInstances.size());
-        if (validatedInstances.size() >= 
appStatusBuilder.getDesiredInstances()) {
-          appStatusBuilder.setState(State.RUNNING_ALL);
-          if (validatedInstances.size() > 
appStatusBuilder.getDesiredInstances()) {
-            LOG.warn("Found more entries in LLAP registry, as compared to 
desired entries");
-          }
+    if (serviceInstances == null || serviceInstances.isEmpty()) {
+      LOG.info("No information found in the LLAP registry");
+      appStatusBuilder.setLiveInstances(0);
+      appStatusBuilder.setState(State.LAUNCHING);
+      appStatusBuilder.clearLlapInstances();
+      return ExitCode.SUCCESS;
+    } else {
+      // Tracks instances known by both slider and llap.
+      List<LlapInstance> validatedInstances = new LinkedList<>();
+      List<String> llapExtraInstances = new LinkedList<>();
+
+      for (ServiceInstance serviceInstance : serviceInstances) {
+        String containerIdString = serviceInstance.getProperties().get(
+          HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
+
+        LlapInstance llapInstance = 
appStatusBuilder.removeAndgetLlapInstanceForContainer(
+          containerIdString);
+        if (llapInstance != null) {
+          llapInstance.setMgmtPort(serviceInstance.getManagementPort());
+          llapInstance.setRpcPort(serviceInstance.getRpcPort());
+          llapInstance.setShufflePort(serviceInstance.getShufflePort());
+          llapInstance.setWebUrl(serviceInstance.getServicesAddress());
+          llapInstance.setStatusUrl(serviceInstance.getServicesAddress() + 
"/status");
+          validatedInstances.add(llapInstance);
         } else {
-          appStatusBuilder.setState(State.RUNNING_PARTIAL);
+          // This likely indicates that an instance has recently restarted
+          // (the old instance has not been unregistered), and the new 
instances has not registered yet.
+          llapExtraInstances.add(containerIdString);
+          // This instance will not be added back, since it's services are not 
up yet.
         }
 
-        // At this point, everything that can be consumed from 
AppStatusBuilder has been consumed.
-        // Debug only
-        if (appStatusBuilder.allInstances().size() > 0) {
-          // Containers likely to come up soon.
-          LOG.debug("Potential instances starting up: {}", 
appStatusBuilder.allInstances());
-        }
-        if (llapExtraInstances.size() > 0) {
-          // Old containers which are likely shutting down
-          LOG.debug("Instances likely to shutdown soon: {}", 
llapExtraInstances);
-        }
+      }
 
-        
appStatusBuilder.clearAndAddPreviouslyKnownInstances(validatedInstances);
+      appStatusBuilder.setLiveInstances(validatedInstances.size());
+      if (validatedInstances.size() >= appStatusBuilder.getDesiredInstances()) 
{
+        appStatusBuilder.setState(State.RUNNING_ALL);
+        if (validatedInstances.size() > 
appStatusBuilder.getDesiredInstances()) {
+          LOG.warn("Found more entries in LLAP registry, as compared to 
desired entries");
+        }
+      } else {
+        appStatusBuilder.setState(State.RUNNING_PARTIAL);
+      }
 
+      // At this point, everything that can be consumed from AppStatusBuilder 
has been consumed.
+      // Debug only
+      if (appStatusBuilder.allInstances().size() > 0) {
+        // Containers likely to come up soon.
+        LOG.debug("Potential instances starting up: {}", 
appStatusBuilder.allInstances());
       }
-      return ExitCode.SUCCESS;
-    } finally {
-      llapRegistry.stop();
-    }
+      if (llapExtraInstances.size() > 0) {
+        // Old containers which are likely shutting down
+        LOG.debug("Instances likely to shutdown soon: {}", llapExtraInstances);
+      }
+
+      appStatusBuilder.clearAndAddPreviouslyKnownInstances(validatedInstances);
 
+    }
+    return ExitCode.SUCCESS;
   }
 
 
@@ -564,8 +567,8 @@ public class LlapStatusServiceDriver {
     private String originalConfigurationPath;
     private String generatedConfigurationPath;
 
-    private Integer desiredInstances;
-    private Integer liveInstances;
+    private int desiredInstances = -1;
+    private int liveInstances = -1;
 
     private Long appStartTime;
     private Long appFinishTime;
@@ -658,11 +661,11 @@ public class LlapStatusServiceDriver {
       return generatedConfigurationPath;
     }
 
-    public Integer getDesiredInstances() {
+    public int getDesiredInstances() {
       return desiredInstances;
     }
 
-    public Integer getLiveInstances() {
+    public int getLiveInstances() {
       return liveInstances;
     }
 
@@ -916,7 +919,7 @@ public class LlapStatusServiceDriver {
 
 
   public static void main(String[] args) {
-    LOG.info("LLAP status invoked with arguments = {}", args);
+    LOG.info("LLAP status invoked with arguments = {}", Arrays.toString(args));
     int ret = ExitCode.SUCCESS.getInt();
 
     LlapStatusServiceDriver statusServiceDriver = null;
@@ -925,6 +928,7 @@ public class LlapStatusServiceDriver {
       statusServiceDriver = new LlapStatusServiceDriver();
       options = statusServiceDriver.parseOptions(args);
     } catch (Throwable t) {
+      statusServiceDriver.close();
       logError(t);
       if (t instanceof LlapStatusCliException) {
         LlapStatusCliException ce = (LlapStatusCliException) t;
@@ -934,20 +938,62 @@ public class LlapStatusServiceDriver {
       }
     }
     if (ret != 0 || options == null) { // Failure / help
+      if (statusServiceDriver != null) {
+        statusServiceDriver.close();
+      }
       System.exit(ret);
     }
 
-    try {
-      ret = statusServiceDriver.run(options);
-      if (ret == ExitCode.SUCCESS.getInt()) {
-        try (OutputStream os = options.getOutputFile() == null ? System.out :
-            new BufferedOutputStream(
-                new FileOutputStream(options.getOutputFile())); PrintWriter pw 
= new PrintWriter(
-            os)) {
-          statusServiceDriver.outputJson(pw);
+    final long refreshInterval = options.getRefreshIntervalMs();
+    final State watchUntilState = options.getWatchUntilState();
+    final long watchTimeout = options.getWatchTimeoutMs();
+    long numAttempts = watchTimeout / refreshInterval;
+    State currentState = null;
+    try (OutputStream os = options.getOutputFile() == null ? System.out :
+      new BufferedOutputStream(new FileOutputStream(options.getOutputFile()));
+         PrintWriter pw = new PrintWriter(os)) {
+
+      LOG.info("Configured refresh interval: {}s. Watch timeout: {}s. Attempts 
remaining: {}",
+        TimeUnit.SECONDS.convert(refreshInterval, TimeUnit.MILLISECONDS),
+        TimeUnit.SECONDS.convert(watchTimeout, TimeUnit.MILLISECONDS),
+        numAttempts);
+      while (numAttempts > 0) {
+        try {
+          ret = statusServiceDriver.run(options);
+          if (ret == ExitCode.SUCCESS.getInt()) {
+            if (watchUntilState != null) {
+              currentState = statusServiceDriver.appStatusBuilder.state;
+              if (!currentState.equals(watchUntilState)) {
+                LOG.warn("Current state: {}. Desired state: {}. {}/{} 
instances.", currentState, watchUntilState,
+                  statusServiceDriver.appStatusBuilder.getLiveInstances(),
+                  statusServiceDriver.appStatusBuilder.getDesiredInstances());
+                numAttempts--;
+                continue;
+              }
+            }
+            // desired state attained. print and break out of loop
+            statusServiceDriver.outputJson(pw);
+            os.flush();
+            pw.flush();
+          }
+          break;
+        } finally {
+          if (watchUntilState != null) {
+            try {
+              Thread.sleep(refreshInterval);
+            } catch (InterruptedException e) {
+              // ignore
+            }
+          } else {
+            // reported once, so break
+            break;
+          }
         }
       }
-
+      if (numAttempts == 0 && watchUntilState != null && currentState!= null 
&& !currentState.equals(watchUntilState)) {
+        LOG.info("Watch timeout {}s exhausted before desired state {} is 
attained.",
+          TimeUnit.SECONDS.convert(watchTimeout, TimeUnit.MILLISECONDS), 
watchUntilState);
+      }
     } catch (Throwable t) {
       logError(t);
       if (t instanceof LlapStatusCliException) {
@@ -958,10 +1004,20 @@ public class LlapStatusServiceDriver {
       }
     } finally {
       LOG.info("LLAP status finished");
+      statusServiceDriver.close();
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Completed processing - exiting with " + ret);
     }
     System.exit(ret);
   }
+
+  private void close() {
+    if (sliderClient != null) {
+      sliderClient.stop();
+    }
+    if (llapRegistry != null) {
+      llapRegistry.stop();
+    }
+  }
 }

Reply via email to