Repository: samza
Updated Branches:
  refs/heads/master e6eb13f85 -> 072457a2e


SAMZA-1447; Swapping out CLI JobStatusProvider for REST based implementation in 
samza-rest

Removing the YarnCliJobStatusProvider since forking a new shell for every 
request on the JobsResource endpoint is resource intensive.

Author: Abhishek Shivanna <[email protected]>

Reviewers: Jagadish<[email protected]>

Closes #317 from abhishekshivanna/samza-rest-oom


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/072457a2
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/072457a2
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/072457a2

Branch: refs/heads/master
Commit: 072457a2e44e5e041e87e0aafbef1dfbe332f54c
Parents: e6eb13f
Author: Abhishek Shivanna <[email protected]>
Authored: Wed Oct 11 13:57:20 2017 -0700
Committer: Jagadish <[email protected]>
Committed: Wed Oct 11 13:57:20 2017 -0700

----------------------------------------------------------------------
 .../learn/documentation/rest/JobsResource.png   | Bin 191199 -> 152138 bytes
 .../versioned/rest/resources/jobs.md            |   5 +-
 .../rest/model/yarn/YarnApplicationInfo.java    |  86 +++++++++++
 .../rest/proxy/job/SimpleYarnJobProxy.java      |   6 +-
 .../proxy/job/YarnCliJobStatusProvider.java     | 154 -------------------
 .../proxy/job/YarnRestJobStatusProvider.java    | 146 ++++++++++++++++++
 .../rest/resources/YarnJobResourceConfig.java   |  53 +++++++
 7 files changed, 293 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/072457a2/docs/img/versioned/learn/documentation/rest/JobsResource.png
----------------------------------------------------------------------
diff --git a/docs/img/versioned/learn/documentation/rest/JobsResource.png 
b/docs/img/versioned/learn/documentation/rest/JobsResource.png
index a208c3d..6eebcb4 100644
Binary files a/docs/img/versioned/learn/documentation/rest/JobsResource.png and 
b/docs/img/versioned/learn/documentation/rest/JobsResource.png differ

http://git-wip-us.apache.org/repos/asf/samza/blob/072457a2/docs/learn/documentation/versioned/rest/resources/jobs.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/rest/resources/jobs.md 
b/docs/learn/documentation/versioned/rest/resources/jobs.md
index 8282a5d..f6fa2f9 100644
--- a/docs/learn/documentation/versioned/rest/resources/jobs.md
+++ b/docs/learn/documentation/versioned/rest/resources/jobs.md
@@ -295,7 +295,7 @@ After validating each request, the JobsResource invokes the 
appropriate JobProxy
 
 The provided 
[SimpleInstallationFinder](../javadocs/org/apache/samza/rest/proxy/installation/SimpleInstallationFinder.html)
 crawls the file system, starting in the directory specified by the 
`job.installations.path` looking for valid Samza job config files. It extracts 
the `job.name` and `job.id` property values and creates an 
[InstallationRecord](../javadocs/org/apache/samza/rest/proxy/installation/InstallationRecord.html)
 for the each job instance. The InstallationRecord contains all the information 
needed to start, stop, and get the status for the job.
 
-The provided 
[YarnCliJobStatusProvider](../javadocs/org/apache/samza/rest/proxy/job/YarnCliJobStatusProvider.html)
 leverages a ScriptRunner to fetch job status using the Yarn ApplicationCLI.
+The provided 
[YarnRestJobStatusProvider](../javadocs/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.html)
 uses the Resource Manager's REST API to fetch the job status.
 
 The 
[SimpleYarnJobProxy](../javadocs/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.html)
 relies on the scripts in the InstallationRecord scriptFilePath (`/bin`) 
directory to start and stop jobs.
 
@@ -323,5 +323,8 @@ The JobsResource properties should be specified in the same 
file as the Samza RE
     <tr>
       <td>job.config.factory.class</td><td>The config factory to use for 
reading Samza job configs. This is used to fetch the job.name and job.id 
properties for each job instance in the InstallationRecord. It's also used to 
validate that a particular directory within the installations path actually 
contains Samza jobs. If not specified 
<pre>org.apache.samza.config.factories.PropertiesConfigFactory</pre> will be 
used. </td>
     </tr>
+    <tr>
+      <td>yarn.resourcemanager.api.endpoint</td><td> An optional config if 
YARN is used as the cluster manager. This provides the host and port at which 
the YARN ResourceManager REST API is exposed. If not specified 
<pre>localhost:8088</pre> will be used.</td>
+    </tr>
   </tbody>
 </table>

http://git-wip-us.apache.org/repos/asf/samza/blob/072457a2/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java
new file mode 100644
index 0000000..1c7f757
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.model.yarn;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.rest.proxy.job.JobInstance;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
+import java.util.List;
+import org.codehaus.jackson.map.annotate.JsonRootName;
+
+
+@JsonRootName("apps")
+public class YarnApplicationInfo {
+  @JsonProperty("app")
+  private List<YarnApplication> apps;
+
+  public YarnApplicationInfo() {
+  }
+
+  public YarnApplicationInfo(List<YarnApplication> apps) {
+    this.apps = apps;
+  }
+
+  /**
+   * Returns a Map with all the apps and their names as the key.
+   */
+  public Map<String, YarnApplication> getApplications() {
+    Map<String, YarnApplication> applications = new HashMap<>();
+    for (YarnApplication app: this.apps) {
+      applications.put(app.getName(), app);
+    }
+    return applications;
+  }
+
+  /**
+   * Constructs the job name used in YARN. This is the value returned by the 
"name"
+   * attribute form the Resource Manager API /ws/v1/cluster/apps.
+   *
+   * @param jobInstance the instance of the job.
+   * @return the job name to use for the job in YARN.
+   */
+  public static String getQualifiedJobName(JobInstance jobInstance) {
+    final String JOB_NAME_ID_FORMAT = "%s_%s";
+    return String.format(JOB_NAME_ID_FORMAT, jobInstance.getJobName(), 
jobInstance.getJobId());
+  }
+
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  public static class YarnApplication {
+    private String state;
+    private String name;
+
+    public YarnApplication() {
+    }
+
+    public YarnApplication(String state, String name) {
+      this.state = state;
+      this.name = name;
+    }
+
+    public String getState() {
+      return state;
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/072457a2/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
index 677be1a..fbddb30 100644
--- 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
@@ -21,6 +21,7 @@ package org.apache.samza.rest.proxy.job;
 import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.rest.model.JobStatus;
+import org.apache.samza.rest.model.yarn.YarnApplicationInfo;
 import org.apache.samza.rest.proxy.installation.InstallationFinder;
 import org.apache.samza.rest.proxy.installation.InstallationRecord;
 import org.apache.samza.rest.proxy.installation.SimpleInstallationFinder;
@@ -41,7 +42,7 @@ public class SimpleYarnJobProxy extends ScriptJobProxy {
   private static final String CONFIG_FACTORY_PARAM = 
"--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory";
   private static final String CONFIG_PATH_PARAM_FORMAT = 
"--config-path=file://%s";
 
-  private final JobStatusProvider statusProvider = new 
YarnCliJobStatusProvider(this);
+  private final JobStatusProvider statusProvider;
 
   private final InstallationFinder installFinder;
 
@@ -49,6 +50,7 @@ public class SimpleYarnJobProxy extends ScriptJobProxy {
     super(config);
     this.installFinder = new 
SimpleInstallationFinder(config.getInstallationsPath(),
                                                       
ClassLoaderHelper.fromClassName(config.getJobConfigFactory()));
+    this.statusProvider = new YarnRestJobStatusProvider(config);
   }
 
   @Override
@@ -78,7 +80,7 @@ public class SimpleYarnJobProxy extends ScriptJobProxy {
     }
 
     String scriptPath = getScriptPath(jobInstance, STOP_SCRIPT_NAME);
-    int resultCode = scriptRunner.runScript(scriptPath, 
YarnCliJobStatusProvider.getQualifiedJobName(jobInstance));
+    int resultCode = scriptRunner.runScript(scriptPath, 
YarnApplicationInfo.getQualifiedJobName(jobInstance));
     if (resultCode != 0) {
       throw new SamzaException("Failed to stop job. Result code: " + 
resultCode);
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/072457a2/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnCliJobStatusProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnCliJobStatusProvider.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnCliJobStatusProvider.java
deleted file mode 100644
index d1f34e8..0000000
--- 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnCliJobStatusProvider.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.rest.proxy.job;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.samza.SamzaException;
-import org.apache.samza.rest.model.Job;
-import org.apache.samza.rest.model.JobStatus;
-import org.apache.samza.rest.script.ScriptOutputHandler;
-import org.apache.samza.rest.script.ScriptPathProvider;
-import org.apache.samza.rest.script.ScriptRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * An implementation of the {@link JobStatusProvider} that retrieves
- * the job status from the YARN command line interface.
- */
-public class YarnCliJobStatusProvider implements JobStatusProvider {
-  private static final Logger log = 
LoggerFactory.getLogger(YarnCliJobStatusProvider.class);
-  private static final String JOB_NAME_ID_FORMAT = "%s_%s";
-  private final ScriptPathProvider scriptPathProvider;
-
-  /**
-   * Constructs the job name used in YARN. This is the value shown in the 
"Name"
-   * column of the Resource Manager UI.
-   *
-   * @param jobInstance the instance of the job.
-   * @return            the job name to use for the job in YARN.
-   */
-  public static String getQualifiedJobName(JobInstance jobInstance) {
-    return String.format(JOB_NAME_ID_FORMAT, jobInstance.getJobName(), 
jobInstance.getJobId());
-  }
-
-  /**
-   * Default constructor.
-   *
-   * @param provider a delegate that provides the path to the Samza yarn 
scripts.
-   */
-  public YarnCliJobStatusProvider(ScriptPathProvider provider) {
-    scriptPathProvider = provider;
-  }
-
-  @Override
-  public void getJobStatuses(Collection<Job> jobs)
-      throws IOException, InterruptedException {
-    if (jobs == null || jobs.isEmpty()) {
-      return;
-    }
-
-    // If the scripts are in the jobs, they will be in all job installations, 
so just pick one and get the script path.
-    Job anyJob = jobs.iterator().next();
-    String scriptPath = scriptPathProvider.getScriptPath(new 
JobInstance(anyJob.getJobName(), anyJob.getJobId()), "run-class.sh");
-
-    // We will identify jobs returned by the YARN application states by their 
qualified names, so build a map
-    // to translate back from that name to the JobInfo we wish to populate. 
This avoids parsing/delimiter issues.
-    final Map<String, Job> qualifiedJobToInfo = new HashMap<>();
-    for(Job job : jobs) {
-      qualifiedJobToInfo.put(getQualifiedJobName(new 
JobInstance(job.getJobName(), job.getJobId())), job);
-    }
-
-    // Run "application -list" command and get the YARN state for each 
application
-    ScriptRunner runner = new ScriptRunner();
-    int resultCode = runner.runScript(scriptPath, new ScriptOutputHandler() {
-      @Override
-      public void processScriptOutput(InputStream output)
-          throws IOException {
-        InputStreamReader isr = new InputStreamReader(output);
-        BufferedReader br = new BufferedReader(isr);
-        String line;
-        String APPLICATION_PREFIX = "application_";
-        log.debug("YARN status:");
-        while ((line = br.readLine()) != null) {
-          log.debug(line);
-          if (line.startsWith(APPLICATION_PREFIX)) {
-            String[] columns = line.split("\\s+");
-            String qualifiedName = columns[1];
-            String yarnState = columns[5];
-
-            JobStatus samzaStatus = 
yarnStateToSamzaStatus(YarnApplicationState.valueOf(yarnState.toUpperCase()));
-            Job job = qualifiedJobToInfo.get(qualifiedName);
-
-            // If job is null, it wasn't requested.  The default status is 
STOPPED because there could be many
-            // application attempts in that status. Only update the job status 
if it's not STOPPED.
-            if (job != null && (job.getStatusDetail() == null || samzaStatus 
!= JobStatus.STOPPED)) {
-              job.setStatusDetail(yarnState);
-              job.setStatus(samzaStatus);
-            }
-          }
-        }
-      }
-    }, "org.apache.hadoop.yarn.client.cli.ApplicationCLI", "application", 
"-list", "-appStates", "ALL");
-
-    if (resultCode != 0) {
-      throw new SamzaException("Failed to get job status. Result code: " + 
resultCode);
-    }
-  }
-
-  @Override
-  public Job getJobStatus(JobInstance jobInstance)
-      throws IOException, InterruptedException {
-    Job info = new Job(jobInstance.getJobName(), jobInstance.getJobId());
-    getJobStatuses(Collections.singletonList(info));
-    return info;
-  }
-
-  /**
-   * Translates the YARN application state to the more generic Samza job 
status.
-   *
-   * @param yarnState the YARN application state to translate.
-   * @return          the corresponding Samza job status.
-   */
-  private JobStatus yarnStateToSamzaStatus(YarnApplicationState yarnState) {
-    switch (yarnState) {
-      case RUNNING:
-        return JobStatus.STARTED;
-      case NEW:
-      case NEW_SAVING:
-      case SUBMITTED:
-      case ACCEPTED:
-        return JobStatus.STARTING;
-      case FINISHED:
-      case FAILED:
-      case KILLED:
-      default:
-        return JobStatus.STOPPED;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/072457a2/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java
new file mode 100644
index 0000000..63a1ae4
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.samza.SamzaException;
+import org.apache.samza.rest.model.yarn.YarnApplicationInfo;
+import org.apache.samza.rest.model.Job;
+import org.apache.samza.rest.model.JobStatus;
+import org.apache.samza.rest.resources.JobsResourceConfig;
+import org.apache.samza.rest.resources.YarnJobResourceConfig;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of the {@link JobStatusProvider} that retrieves
+ * the job status from the YARN REST api.
+ */
+public class YarnRestJobStatusProvider implements JobStatusProvider {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YarnRestJobStatusProvider.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private final String apiEndpoint;
+  private final HttpClient httpClient;
+
+  public YarnRestJobStatusProvider(JobsResourceConfig config) {
+    YarnJobResourceConfig yarnConfig = new YarnJobResourceConfig(config);
+    this.httpClient = new HttpClient();
+    OBJECT_MAPPER.configure(DeserializationConfig.Feature.UNWRAP_ROOT_VALUE, 
true);
+    this.apiEndpoint = String.format("http://%s/ws/v1/cluster/apps";,
+        yarnConfig.getYarnResourceManagerEndpoint());
+  }
+
+  @Override
+  public void getJobStatuses(Collection<Job> jobs)
+      throws IOException, InterruptedException {
+    if (jobs == null || jobs.isEmpty()) {
+      return;
+    }
+    try {
+      byte[] response = httpGet(apiEndpoint);
+      YarnApplicationInfo yarnApplicationInfo = 
OBJECT_MAPPER.readValue(response, YarnApplicationInfo.class);
+      Map<String, YarnApplicationInfo.YarnApplication> yarnApplications = 
yarnApplicationInfo.getApplications();
+      for (Job job: jobs) {
+        String qualifiedJobName = YarnApplicationInfo.getQualifiedJobName(new 
JobInstance(job.getJobName(), job.getJobId()));
+        YarnApplicationInfo.YarnApplication yarnApp = 
yarnApplications.get(qualifiedJobName);
+        if (yarnApp == null) {
+          job.setStatusDetail(JobStatus.UNKNOWN.toString());
+          job.setStatus(JobStatus.UNKNOWN);
+          continue;
+        }
+        JobStatus samzaStatus = 
yarnStateToSamzaStatus(YarnApplicationState.valueOf(yarnApp.getState().toUpperCase()));
+        if (job.getStatusDetail() == null || samzaStatus != JobStatus.STOPPED) 
{
+          job.setStatusDetail(yarnApp.getState());
+          job.setStatus(samzaStatus);
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to retrieve node info.", e);
+    }
+  }
+
+  @Override
+  public Job getJobStatus(JobInstance jobInstance)
+      throws IOException, InterruptedException {
+    Job info = new Job(jobInstance.getJobName(), jobInstance.getJobId());
+    getJobStatuses(Collections.singletonList(info));
+    return info;
+  }
+
+  /**
+   * Translates the YARN application state to the more generic Samza job 
status.
+   *
+   * @param yarnState the YARN application state to translate.
+   * @return the corresponding Samza job status.
+   */
+  private JobStatus yarnStateToSamzaStatus(YarnApplicationState yarnState) {
+    switch (yarnState) {
+      case RUNNING:
+        return JobStatus.STARTED;
+      case NEW:
+      case NEW_SAVING:
+      case SUBMITTED:
+      case ACCEPTED:
+        return JobStatus.STARTING;
+      case FINISHED:
+      case FAILED:
+      case KILLED:
+      default:
+        return JobStatus.STOPPED;
+    }
+  }
+
+  /**
+   * Issues a HTTP Get request to the provided url and returns the response
+   * @param requestUrl the request url
+   * @return the response
+   * @throws IOException if there are problems with the http get request.
+   */
+  private byte[] httpGet(String requestUrl)
+      throws IOException {
+    GetMethod getMethod = new GetMethod(requestUrl);
+    try {
+      int responseCode = this.httpClient.executeMethod(getMethod);
+      LOGGER.debug("Received response code: {} for the get request on the url: 
{}", responseCode, requestUrl);
+      byte[] response = getMethod.getResponseBody();
+      if (responseCode != HttpStatus.SC_OK) {
+        throw new SamzaException(
+            String.format("Received response code: %s for get request on: %s, 
with message: %s.", responseCode,
+                requestUrl, StringUtils.newStringUtf8(response)));
+      }
+      return response;
+    } finally {
+      getMethod.releaseConnection();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/072457a2/samza-rest/src/main/java/org/apache/samza/rest/resources/YarnJobResourceConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/resources/YarnJobResourceConfig.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/resources/YarnJobResourceConfig.java
new file mode 100644
index 0000000..56ff431
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/resources/YarnJobResourceConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+/**
+ * Configurations for the YARN based {@link JobsResource} endpoint.
+ */
+public class YarnJobResourceConfig extends JobsResourceConfig {
+  public YarnJobResourceConfig(JobsResourceConfig config) {
+    super(config);
+  }
+
+  /**
+   * Specifies the host and port of the YARN ResourceManager against against 
which the API
+   * calls are made.
+   */
+  private static final String CONFIG_YARN_RESOURCE_MANAGER_API_ENDPOINT = 
"yarn.resourcemanager.api.endpoint";
+
+  /**
+   * Specifies the default host and port of the YARN ResourceManager assuming 
that samza-rest is running on the
+   * Resource Manager host.
+   */
+  private static final String 
CONFIG_DEFAULT_YARN_RESOURCE_MANAGER_API_ENDPOINT = "localhost:8088";
+
+  /**
+   * Returns the configured YARN Resource Manager REST API endpoint.
+   * @see YarnJobResourceConfig#CONFIG_YARN_RESOURCE_MANAGER_API_ENDPOINT
+   * @return the ResourceManager host and port on which the REST API is 
exposed.
+   */
+  public String getYarnResourceManagerEndpoint() {
+    String api = get(CONFIG_YARN_RESOURCE_MANAGER_API_ENDPOINT);
+    if (api == null) {
+      api = CONFIG_DEFAULT_YARN_RESOURCE_MANAGER_API_ENDPOINT;
+    }
+    return api;
+  }
+}

Reply via email to