SAMZA-975 - Initial Samza REST Implementation

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/260d1ff9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/260d1ff9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/260d1ff9

Branch: refs/heads/master
Commit: 260d1ff963eaa6ea6b0eb57ea9585a4751f40ddd
Parents: 17e65d1
Author: Jacob Maes <[email protected]>
Authored: Mon Aug 22 23:38:13 2016 -0700
Committer: Navina Ramesh <[email protected]>
Committed: Mon Aug 22 23:38:13 2016 -0700

----------------------------------------------------------------------
 LICENSE                                         |   7 +
 build.gradle                                    |  58 +++++
 gradle/dependency-versions.gradle               |   1 +
 .../src/main/bash/run-samza-rest-service.sh     |  22 ++
 .../src/main/config/samza-rest.properties       |  23 ++
 .../java/org/apache/samza/monitor/Monitor.java  |  39 ++++
 .../org/apache/samza/monitor/MonitorLoader.java |  43 ++++
 .../samza/monitor/SamzaMonitorService.java      |  95 +++++++++
 .../ScheduledExecutorSchedulingProvider.java    |  40 ++++
 .../samza/monitor/SchedulingProvider.java       |  35 ++++
 .../apache/samza/rest/SamzaRestApplication.java |  94 +++++++++
 .../org/apache/samza/rest/SamzaRestConfig.java  | 129 ++++++++++++
 .../org/apache/samza/rest/SamzaRestService.java | 159 ++++++++++++++
 .../java/org/apache/samza/rest/model/Job.java   |  81 +++++++
 .../org/apache/samza/rest/model/JobStatus.java  |  53 +++++
 .../proxy/installation/InstallationFinder.java  |  47 +++++
 .../proxy/installation/InstallationRecord.java  |  70 +++++++
 .../installation/SimpleInstallationFinder.java  | 131 ++++++++++++
 .../samza/rest/proxy/job/AbstractJobProxy.java  | 139 ++++++++++++
 .../samza/rest/proxy/job/JobInstance.java       |  78 +++++++
 .../apache/samza/rest/proxy/job/JobProxy.java   |  78 +++++++
 .../samza/rest/proxy/job/JobProxyFactory.java   |  41 ++++
 .../samza/rest/proxy/job/JobStatusProvider.java |  49 +++++
 .../samza/rest/proxy/job/ScriptJobProxy.java    |  71 +++++++
 .../rest/proxy/job/SimpleYarnJobProxy.java      | 114 ++++++++++
 .../proxy/job/SimpleYarnJobProxyFactory.java    |  35 ++++
 .../proxy/job/YarnCliJobStatusProvider.java     | 154 ++++++++++++++
 .../rest/resources/DefaultResourceFactory.java  |  35 ++++
 .../samza/rest/resources/JobsResource.java      | 175 ++++++++++++++++
 .../rest/resources/JobsResourceConfig.java      |  84 ++++++++
 .../samza/rest/resources/ResourceFactory.java   |  39 ++++
 .../samza/rest/script/ScriptOutputHandler.java  |  40 ++++
 .../samza/rest/script/ScriptPathProvider.java   |  37 ++++
 .../apache/samza/rest/script/ScriptRunner.java  | 129 ++++++++++++
 samza-rest/src/main/resources/log4j.xml         |  41 ++++
 .../samza/monitor/TestMonitorService.java       | 115 ++++++++++
 .../apache/samza/monitor/mock/DummyMonitor.java |  28 +++
 .../monitor/mock/ExceptionThrowingMonitor.java  |  29 +++
 .../monitor/mock/InstantSchedulingProvider.java |  34 +++
 .../samza/rest/resources/TestJobsResource.java  | 209 +++++++++++++++++++
 .../samza/rest/resources/mock/MockJobProxy.java |  81 +++++++
 .../resources/mock/MockJobProxyFactory.java     |  31 +++
 .../resources/mock/MockJobStatusProvider.java   |  50 +++++
 .../src/main/bash/kill-yarn-job-by-name.sh      |  38 ++++
 settings.gradle                                 |   3 +-
 45 files changed, 3083 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index e1439fe..1d81cd3 100644
--- a/LICENSE
+++ b/LICENSE
@@ -252,3 +252,10 @@ SIL Open Font License (OFT) - http://scripts.sil.org/OFL/
 - Font-awesome font files v4.0.3 (http://fortawesome.github.io/Font-Awesome/)
 - Ropa Sans fonts v1.1 - Copyright (c) 2011, Botjo Nikoltchev, with Reserved 
Font Name "Ropa Sans"
 
+-----------------------------------------------------------------------
+ The CDDL License
+-----------------------------------------------------------------------
+
+The Apache Samza project bundles the following files under the CDDL License
+
+- Jersey v2.22.1 (https://jersey.java.net/license.html) - Copyright (c) 
2010-2015 Oracle Corporation
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 1d4eb74..004c81e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -469,6 +469,64 @@ project(":samza-hdfs_$scalaVersion") {
     }
 }
 
+project(":samza-rest") {
+  apply plugin: 'java'
+
+  dependencies {
+    compile project(":samza-shell")
+    compile project(":samza-core_$scalaVersion")
+
+    runtime "org.slf4j:slf4j-log4j12:$slf4jVersion"
+    compile "com.google.guava:guava:$guavaVersion"
+    compile "org.glassfish.jersey.core:jersey-server:$jerseyVersion"
+    compile 
"org.glassfish.jersey.containers:jersey-container-servlet-core:$jerseyVersion"
+    compile 
"org.glassfish.jersey.containers:jersey-container-jetty-http:$jerseyVersion"
+    compile "org.glassfish.jersey.media:jersey-media-moxy:$jerseyVersion"
+    compile "org.eclipse.jetty.aggregate:jetty-all:$jettyVersion"
+    compile("org.apache.hadoop:hadoop-yarn-client:$yarnVersion") {
+      exclude module: 'slf4j-log4j12'
+      exclude module: 'servlet-api'
+      exclude group: 'com.sun.jersey'
+    }
+    runtime("org.apache.hadoop:hadoop-yarn-api:$yarnVersion") {
+      exclude module: 'slf4j-log4j12'
+      exclude module: 'servlet-api'
+      exclude group: 'com.sun.jersey'
+    }
+
+    testCompile "junit:junit:$junitVersion"
+    testCompile 
"org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-grizzly2:$jerseyVersion"
+  }
+
+  tasks.create(name: "releaseRestServiceTar", type: Tar) {
+    description 'Build a tarball containing the samza-rest component and its 
dependencies'
+    compression = Compression.GZIP
+    from(file("$projectDir/src/main/config")) { into "config/" }
+    from(file("$projectDir/src/main/resources/log4j.xml")) { into "bin/" }
+    from(file("$projectDir/src/main/bash/run-samza-rest-service.sh")) { into 
"bin/" }
+    from(project(':samza-shell').file("src/main/bash/run-class.sh")) { into 
"bin/" }
+    from '../LICENSE'
+    from '../NOTICE'
+    from(configurations.runtime) { into("lib/") }
+    from(jar) { into("lib/") }
+  }
+
+  tasks.create(name: "restTarGz", type: Tar) {
+    description 'Build a tarball containing the samza-rest supplementary files'
+    compression = Compression.GZIP
+    from 'src/main/bash'
+    from 'src/main/resources'
+    from(project(':samza-shell').file("src/main/bash/run-class.sh"))
+  }
+
+  artifacts {
+    archives(restTarGz) {
+      name 'samza-rest-scripts'
+      classifier 'dist'
+    }
+  }
+}
+
 project(":samza-test_$scalaVersion") {
   apply plugin: 'scala'
   apply plugin: 'checkstyle'

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle 
b/gradle/dependency-versions.gradle
index 47c71bf..8c757b9 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -18,6 +18,7 @@
  */
  ext {
   elasticsearchVersion = "1.5.1"
+  jerseyVersion = "2.22.1"
   jodaTimeVersion = "2.2"
   joptSimpleVersion = "3.2"
   jacksonVersion = "1.9.13"

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/bash/run-samza-rest-service.sh
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/bash/run-samza-rest-service.sh 
b/samza-rest/src/main/bash/run-samza-rest-service.sh
new file mode 100755
index 0000000..bd52afd
--- /dev/null
+++ b/samza-rest/src/main/bash/run-samza-rest-service.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS 
-Dlog4j.configuration=file:$(dirname $0)/log4j.xml"
+[[ -z "$SAMZA_LOG_DIR" ]] && export SAMZA_LOG_DIR="$PWD/logs"
+
+exec $(dirname $0)/run-class.sh org.apache.samza.rest.SamzaRestService "$@"

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/config/samza-rest.properties
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/config/samza-rest.properties 
b/samza-rest/src/main/config/samza-rest.properties
new file mode 100644
index 0000000..7be0b47
--- /dev/null
+++ b/samza-rest/src/main/config/samza-rest.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Service port. Set to 0 for a dynamic port.
+services.rest.port=9139
+
+# JobsResource
+job.proxy.factory.class=org.apache.samza.rest.proxy.job.SimpleYarnJobProxyFactory
+job.installations.path=/export/content/samza/deploy/

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/monitor/Monitor.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/Monitor.java 
b/samza-rest/src/main/java/org/apache/samza/monitor/Monitor.java
new file mode 100644
index 0000000..d69df5f
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/Monitor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor;
+
+/**
+ * A Monitor is a class implementing some functionality that should be done 
every N milliseconds on a YARN RM or NM.
+ * Classes specified in the config will have their monitor() method called at 
a configurable interval.
+ * For example, one could implement a Monitor that checks for leaked 
containers and kills them, ensuring that
+ * no leaked container survives on a NodeManager host for more than N ms 
(where N is the monitor run interval.)
+ *
+ * Implementations can override .toString() for better logging.
+ */
+public interface Monitor {
+
+    /**
+     * Do the work of the monitor. Because this can be arbitrary behavior up 
to and including script execution,
+     * IPC-related IOExceptions and concurrency-related InterruptedExceptions 
are caught by the SamzaMonitorService.
+     * @throws Exception if there was any problem running the monitor.
+     */
+    void monitor()
+        throws Exception;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java 
b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
new file mode 100644
index 0000000..75f3867
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor;
+
+import java.lang.reflect.Constructor;
+
+class MonitorLoader {
+
+    private MonitorLoader() {}
+
+    public static Monitor fromClassName(String monitorClassName)
+        throws InstantiationException {
+        Object monitorObject;
+        try {
+            Class<?> klass = Class.forName(monitorClassName);
+            Constructor<?> constructor = klass.getConstructor();
+            monitorObject = constructor.newInstance();
+        } catch (Exception e) {
+            throw (InstantiationException)
+                new InstantiationException("Unable to instantiate " + 
monitorClassName).initCause(e);
+        }
+        if (!(monitorObject instanceof Monitor)) {
+            throw new InstantiationException(monitorClassName + " is not an 
instance of Monitor");
+        }
+        return (Monitor) monitorObject;
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java 
b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
new file mode 100644
index 0000000..2f4d9dd
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor;
+
+import org.apache.samza.rest.SamzaRestConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * The class responsible for handling long-running/scheduled monitors in Samza 
REST.
+ * Takes a SamzaRestConfig object in the constructor and handles instantiation 
of
+ * monitors and scheduling them to run based on the properties in the config.
+ */
+public class SamzaMonitorService {
+
+    private static final Logger log = 
LoggerFactory.getLogger(SamzaMonitorService.class);
+
+    private final SchedulingProvider scheduler;
+    private final SamzaRestConfig config;
+
+    public SamzaMonitorService(SamzaRestConfig config, SchedulingProvider 
schedulingProvider) {
+        this.scheduler = schedulingProvider;
+        this.config = config;
+    }
+
+    public void start() {
+        List<Monitor> monitors = getMonitorsFromConfig(config);
+        int monitorRunInterval = config.getConfigMonitorIntervalMs();
+        for (Monitor monitor : monitors) {
+            log.debug("Scheduling monitor {} to run every {}ms", monitor, 
monitorRunInterval);
+            this.scheduler.schedule(getRunnable(monitor), monitorRunInterval);
+        }
+    }
+
+    public void stop() {
+        this.scheduler.stop();
+    }
+
+    private Runnable getRunnable(final Monitor monitor) {
+        return new Runnable() {
+            public void run() {
+                try {
+                    monitor.monitor();
+                } catch (IOException e) {
+                    log.warn("Caught IOException during " + monitor.toString() 
+ ".monitor()", e);
+                } catch (InterruptedException e) {
+                    log.warn("Caught InterruptedException during " + 
monitor.toString() + ".monitor()", e);
+                } catch (Exception e) {
+                    log.warn("Unexpected exception during {}.monitor()", 
monitor, e);
+                }
+            }
+        };
+    }
+
+    /**
+     * Get all the registered monitors for the service.
+     * @return a list of Monitor objects ready to be scheduled.
+     */
+    private static List<Monitor> getMonitorsFromConfig(SamzaRestConfig config) 
{
+        List<String> classNames = config.getConfigMonitorClassList();
+        List<Monitor> monitors = new ArrayList<>();
+
+        for (String name: classNames) {
+            try {
+                Monitor monitor = MonitorLoader.fromClassName(name);
+                monitors.add(monitor);
+            } catch (InstantiationException e) {
+                log.warn("Unable to instantiate monitor " + name, e);
+            }
+        }
+        return monitors;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java
 
b/samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java
new file mode 100644
index 0000000..c0c448c
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+public class ScheduledExecutorSchedulingProvider implements SchedulingProvider 
{
+
+    private final ScheduledExecutorService scheduler;
+
+    public ScheduledExecutorSchedulingProvider(ScheduledExecutorService 
scheduler) {
+        this.scheduler = scheduler;
+    }
+
+    public void schedule(Runnable runnable, int interval) {
+        this.scheduler.scheduleAtFixedRate(runnable, 0, interval, 
MILLISECONDS);
+    }
+
+    public void stop() {
+        this.scheduler.shutdownNow();
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java 
b/samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java
new file mode 100644
index 0000000..aea1a92
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Provides scheduling functionality to the SamzaMonitorService.
+ */
+public interface SchedulingProvider {
+    /* Schedule a the given Runnable to run() every INTERVAL ms. */
+    void schedule(Runnable runnable, int intervalMs);
+
+    /* Stop any future executions of any scheduled tasks. */
+    void stop();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java 
b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
new file mode 100644
index 0000000..61f3c46
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.rest.resources.DefaultResourceFactory;
+import org.apache.samza.rest.resources.ResourceFactory;
+import org.codehaus.jackson.jaxrs.JacksonJsonProvider;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Samza REST implementation of the JAX-RS {@link 
javax.ws.rs.core.Application} model.
+ */
+public class SamzaRestApplication extends ResourceConfig {
+
+  private static final Logger log = 
LoggerFactory.getLogger(SamzaRestApplication.class);
+
+  public SamzaRestApplication(SamzaRestConfig config) {
+    register(JacksonJsonProvider.class);
+    registerConfiguredResources(config);
+  }
+
+  /**
+   * Registers resources specified in the config. If there are no factories
+   * or resources specified in the config, it uses the
+   * {@link org.apache.samza.rest.resources.DefaultResourceFactory}
+   *
+   * @param config  the config to pass to the factories.
+   */
+  private void registerConfiguredResources(SamzaRestConfig config) {
+    try {
+      // Use default if there were no configured resources or factories
+      if (config.getResourceFactoryClassNames().isEmpty() && 
config.getResourceClassNames().isEmpty()) {
+        log.info("No resource factories or classes configured. Using 
DefaultResourceFactory.");
+        registerInstances(new 
DefaultResourceFactory().getResourceInstances(config).toArray());
+        return;
+      }
+
+      for (String factoryClassName : config.getResourceFactoryClassNames()) {
+        log.info("Invoking factory {}", factoryClassName);
+        registerInstances(instantiateFactoryResources(factoryClassName, 
config).toArray());
+      }
+
+      for (String resourceClassName : config.getResourceClassNames()) {
+        log.info("Using resource class {}", resourceClassName);
+        register(Class.forName(resourceClassName));
+      }
+    } catch (Throwable t) {
+      throw new SamzaException(t);
+    }
+  }
+
+  /**
+   * Passes the specified config to the specified factory to instantiate its 
resources.
+   *
+   * @param factoryClassName  the name of a class that implements {@link 
ResourceFactory}
+   * @param config            the config to pass to the factory
+   * @return                  a collection of resources returned by the 
factory.
+   * @throws InstantiationException
+   */
+  private Collection<? extends Object> instantiateFactoryResources(String 
factoryClassName, Config config)
+      throws InstantiationException {
+    try {
+      Class factoryCls = Class.forName(factoryClassName);
+      ResourceFactory factory = (ResourceFactory) factoryCls.newInstance();
+      return factory.getResourceInstances(config);
+    } catch (Exception e) {
+      throw (InstantiationException)
+          new InstantiationException("Unable to instantiate " + 
factoryClassName).initCause(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java 
b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java
new file mode 100644
index 0000000..6f5c10a
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+
+
+/**
+ * The set of configurations required by the core components of the {@link 
org.apache.samza.rest.SamzaRestService}.
+ * Other configurations (e.g. from {@link org.apache.samza.config.JobConfig}) 
may also be used by some of the
+ * implementation classes.
+ */
+public class SamzaRestConfig extends MapConfig {
+  /**
+   * Specifies a comma-delimited list of class names that implement 
ResourceFactory.
+   * These factories will be used to create specific instances of resources, 
passing the server config.
+   */
+  public static final String CONFIG_REST_RESOURCE_FACTORIES = 
"rest.resource.factory.classes";
+
+  /**
+   * Specifies a comma-delimited list of class names of resources to register 
with the server.
+   */
+  public static final String CONFIG_REST_RESOURCE_CLASSES = 
"rest.resource.classes";
+
+  /**
+   * Specifies a comma-delimited list of class names corresponding to Monitor 
implementations.
+   * These will be instantiated and scheduled to run periodically at runtime.
+   * Note that you must include the ENTIRE package name (org.apache.samza...).
+   */
+  public static final String CONFIG_MONITOR_CLASSES = "monitor.classes";
+
+  /**
+   * Specifies the interval at which each registered Monitor's monitor method 
will be called.
+   */
+  public static final String CONFIG_MONITOR_INTERVAL_MS = 
"monitor.run.interval.ms";
+
+  /**
+   * Monitors run every 60s by default
+   */
+  private static final int DEFAULT_MONITOR_INTERVAL = 60000;
+
+  /**
+   * The port number to use for the HTTP server or 0 to dynamically choose a 
port.
+   */
+  public static final String CONFIG_SAMZA_REST_SERVICE_PORT = 
"services.rest.port";
+
+  public SamzaRestConfig(Config config) {
+    super(config);
+  }
+
+  /**
+   * @see SamzaRestConfig#CONFIG_SAMZA_REST_SERVICE_PORT
+   * @return  the port number to use for the HTTP server or 0 to dynamically 
choose a port.
+   */
+  public int getPort() {
+    return getInt(CONFIG_SAMZA_REST_SERVICE_PORT);
+  }
+
+  /**
+   * @see SamzaRestConfig#CONFIG_REST_RESOURCE_FACTORIES
+   * @return a list of class names as Strings corresponding to factories
+   *          that Samza REST should use to instantiate and register resources
+   *          or an empty list if none were configured.
+   */
+  public List<String> getResourceFactoryClassNames() {
+    return parseCommaDelimitedStrings(get(CONFIG_REST_RESOURCE_FACTORIES));
+  }
+
+  /**
+   * @see SamzaRestConfig#CONFIG_REST_RESOURCE_CLASSES
+   * @return a list of class names as Strings corresponding to resource classes
+   *          that Samza REST should register or an empty list if none were 
configured.
+   */
+  public List<String> getResourceClassNames() {
+    return parseCommaDelimitedStrings(get(CONFIG_REST_RESOURCE_CLASSES));
+  }
+
+  /**
+   * @see SamzaRestConfig#CONFIG_MONITOR_CLASSES
+   * @return a list of class names as Strings corresponding to Monitors that
+   *          Samza REST should schedule or an empty list if none were 
configured.
+   */
+  public List<String> getConfigMonitorClassList() {
+    return parseCommaDelimitedStrings(get(CONFIG_MONITOR_CLASSES));
+  }
+
+  /**
+   * @see SamzaRestConfig#CONFIG_MONITOR_INTERVAL_MS
+   * @return an integer number of milliseconds, the period at which to 
schedule monitor runs.
+   */
+  public int getConfigMonitorIntervalMs() {
+    return getInt(CONFIG_MONITOR_INTERVAL_MS, DEFAULT_MONITOR_INTERVAL);
+  }
+
+  /**
+   * Parses a string containing a set of comma-delimited strings. Whitespace 
is ignored.
+   * If the input string is null or empty, an empty list is returned.
+   *
+   * @param commaDelimitedStrings the string to parse.
+   * @return                      the list of strings parsed from the input or 
an empty list if none.
+   */
+  private static List<String> parseCommaDelimitedStrings(String 
commaDelimitedStrings) {
+    if (commaDelimitedStrings == null || 
commaDelimitedStrings.trim().isEmpty()) {
+      return Collections.emptyList();
+    }
+    return Arrays.asList(commaDelimitedStrings.split("\\s*,\\s*"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java 
b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
new file mode 100644
index 0000000..5b34da8
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest;
+
+import joptsimple.OptionSet;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.monitor.SamzaMonitorService;
+import org.apache.samza.monitor.ScheduledExecutorSchedulingProvider;
+import org.apache.samza.util.CommandLine;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.jersey.servlet.ServletContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.Servlet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+
+/**
+ * The main Java class for the Samza REST API. It runs an embedded Jetty 
server so it can be deployed as a Jar file.
+ *
+ * This class can be started from the command line by providing the 
--config-path parameter to a Samza REST config file
+ * which will be used to configure the default resources exposed by the API.
+ *
+ * It can also be managed programmatically using the
+ * {@link 
org.apache.samza.rest.SamzaRestService#addServlet(javax.servlet.Servlet, 
String)},
+ * {@link #start()} and {@link #stop()} methods.
+ */
+public class SamzaRestService {
+
+  private static final Logger log = 
LoggerFactory.getLogger(SamzaRestService.class);
+
+  private final Server server;
+  private final ServletContextHandler context;
+
+
+  public SamzaRestService(SamzaRestConfig config) {
+    log.info("Creating new SamzaRestService with config: {}", config);
+    server = new Server(config.getPort());
+
+    context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+    context.setContextPath("/");
+    server.setHandler(context);
+  }
+
+  /**
+   * Command line interface to run the server.
+   *
+   * @param args arguments supported by {@link 
org.apache.samza.util.CommandLine}.
+   *             In particular, --config-path and --config-factory are used to 
read the Samza REST config file.
+   * @throws Exception if the server could not be successfully started.
+   */
+  public static void main(String[] args)
+      throws Exception {
+    try {
+      SamzaRestConfig config = parseConfig(args);
+      SamzaRestService restService = new SamzaRestService(config);
+
+      // Add applications
+      SamzaRestApplication samzaRestApplication = new 
SamzaRestApplication(config);
+      ServletContainer container = new ServletContainer(samzaRestApplication);
+      restService.addServlet(container, "/*");
+
+      // Schedule monitors to run
+      ScheduledExecutorService schedulingService = 
Executors.newScheduledThreadPool(1);
+      ScheduledExecutorSchedulingProvider schedulingProvider = new 
ScheduledExecutorSchedulingProvider(schedulingService);
+      SamzaMonitorService monitorService = new SamzaMonitorService(config, 
schedulingProvider);
+      monitorService.start();
+
+      restService.runBlocking();
+      monitorService.stop();
+    } catch (Throwable t) {
+      log.error("Exception in main.", t);
+    }
+  }
+
+  /**
+   * Reads a {@link org.apache.samza.config.Config} from command line 
parameters.
+   * @param args  the command line parameters supported by {@link 
org.apache.samza.util.CommandLine}.
+   * @return      the parsed {@link org.apache.samza.config.Config}.
+   */
+  private static SamzaRestConfig parseConfig(String[] args) {
+    CommandLine cmd = new CommandLine();
+    OptionSet options = cmd.parser().parse(args);
+    MapConfig cfg = cmd.loadConfig(options);
+    return new SamzaRestConfig(new MapConfig(cfg));
+  }
+
+  /**
+   * Adds the specified {@link javax.servlet.Servlet} to the server at the 
specified path.
+   * @param servlet the {@link javax.servlet.Servlet} to be added.
+   * @param path    the path for the servlet.
+   */
+  public void addServlet(Servlet servlet, String path) {
+    log.info("Adding servlet {} for path {}", servlet, path);
+    ServletHolder holder = new ServletHolder(servlet);
+    context.addServlet(holder, path);
+    holder.setInitOrder(0);
+  }
+
+  /**
+   * Runs the server and waits for it to finish.
+   *
+   * @throws Exception if the server could not be successfully started.
+   */
+  private void runBlocking()
+      throws Exception {
+    try {
+      start();
+      server.join();
+    } finally {
+      server.destroy();
+      log.info("Server terminated.");
+    }
+  }
+
+  /**
+   * Starts the server asynchronously. To stop the server, see {@link #stop()}.
+   *
+   * @throws Exception if the server could not be successfully started.
+   */
+  public void start()
+      throws Exception {
+    log.info("Starting server on port {}", 
server.getConnectors()[0].getPort());
+    server.start();
+    log.info("Server is running");
+  }
+
+  /**
+   * Stops the server.
+   *
+   * @throws Exception if the server could not be successfully stopped.
+   */
+  public void stop()
+      throws Exception {
+    log.info("Stopping server");
+    server.stop();
+    log.info("Server is stopped");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/model/Job.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/Job.java 
b/samza-rest/src/main/java/org/apache/samza/rest/model/Job.java
new file mode 100644
index 0000000..e540635
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/model/Job.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.model;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * The client view of a job. Includes the job name, id, and status.
+ *
+ * The minimum goal is to provide enough information to execute REST commands 
on that job, so the name and id are required.
+ */
+public class Job {
+  private String jobName;
+  private String jobId;
+  private JobStatus status = JobStatus.UNKNOWN; // started, stopped, or 
starting
+  private String statusDetail; // Detailed status e.g. for YARN it could be 
ACCEPTED, RUNNING, etc.
+
+  public Job(String jobName, String jobId) {
+    this.jobName = jobName;
+    this.jobId = jobId;
+  }
+
+  public Job(@JsonProperty("jobName") String jobName, 
@JsonProperty("jobId")String jobId, @JsonProperty("status") JobStatus status, 
@JsonProperty("statusDetail") String statusDetail) {
+    this.jobName = jobName;
+    this.jobId = jobId;
+    this.status = status;
+    this.statusDetail = statusDetail;
+  }
+
+  public Job() {
+  }
+
+  public String getJobId() {
+    return jobId;
+  }
+
+  public void setJobId(String jobId) {
+    this.jobId = jobId;
+  }
+
+  public String getJobName() {
+    return jobName;
+  }
+
+  public void setJobName(String jobName) {
+    this.jobName = jobName;
+  }
+
+  public JobStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(JobStatus status) {
+    this.status = status;
+  }
+
+  public String getStatusDetail() {
+    return statusDetail;
+  }
+
+  public void setStatusDetail(String statusDetail) {
+    this.statusDetail = statusDetail;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/model/JobStatus.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/model/JobStatus.java 
b/samza-rest/src/main/java/org/apache/samza/rest/model/JobStatus.java
new file mode 100644
index 0000000..8c655d8
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/model/JobStatus.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.model;
+
+/**
+ * The abstract status of the job, irrespective of the status in any 
underlying cluster management system (e.g. YARN).
+ * This status is the client view of the job status.
+ */
+public enum JobStatus {
+
+    /** Job is in the process of starting but is not yet running. */
+    STARTING("starting"),
+
+    /** Job has been started. */
+    STARTED("started"),
+
+    /** Job has been stopped. */
+    STOPPED("stopped"),
+
+    /** Job status is unknown. */
+    UNKNOWN("unknown");
+
+  private final String stringVal;
+
+  JobStatus(final String stringVal) {
+    this.stringVal = stringVal;
+  }
+
+  @Override
+  public String toString() {
+    return stringVal;
+  }
+
+  public boolean hasBeenStarted() {
+    return !(this == STOPPED || this == UNKNOWN);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationFinder.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationFinder.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationFinder.java
new file mode 100644
index 0000000..1ac49ae
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationFinder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.installation;
+
+import java.util.Map;
+import org.apache.samza.rest.proxy.job.JobInstance;
+
+
+/**
+ * Finds all the installed jobs. For example, one implementation may take an 
installation root directory
+ * and scan all subdirectories for job installations.
+ *
+ * Provides a map from a {@link JobInstance} to its {@link InstallationRecord} 
based on the structure within the
+ * installation directory. Implementations of this interface should 
encapsulate any custom installation
+ * structure such that the resulting {@link InstallationRecord} simply 
contains the locations of the files
+ * needed to control the job.
+ */
+public interface InstallationFinder {
+
+  /**
+   * @param jobInstance the job to check.
+   * @return            <code>true</code> if a job with the specified name and 
id is installed on the local host.
+   */
+  boolean isInstalled(JobInstance jobInstance);
+
+  /**
+   * @return  a map from each {@link JobInstance} to the corresponding {@link 
InstallationRecord}
+   *          for each Samza installation found in the installRoot.
+   */
+  Map<JobInstance, InstallationRecord> getAllInstalledJobs();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationRecord.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationRecord.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationRecord.java
new file mode 100644
index 0000000..27b1ab8
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationRecord.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.installation;
+
+import org.apache.samza.rest.proxy.job.JobInstance;
+
+
+/**
+ * Represents an installation of one Samza job instance on a file system.
+ *
+ * This class does not hard code any knowledge about the structure of a Samza 
job installation. Rather, it
+ * just points to the relevant paths within the installation. The structure is 
resolved by an implementation
+ * of the {@link InstallationFinder} interface.
+ */
+public class InstallationRecord extends JobInstance {
+
+  private final String rootPath;
+  private final String configFilePath;
+  private final String binPath;
+
+  public InstallationRecord(String jobName, String jobId, String rootPath, 
String configFilePath, String binPath) {
+    super(jobName, jobId);
+    this.rootPath = rootPath;
+    this.configFilePath = configFilePath;
+    this.binPath = binPath;
+  }
+
+  /**
+   * @return  the path of the config file for the job.
+   */
+  public String getConfigFilePath() {
+    return configFilePath;
+  }
+
+  /**
+   * @return  the path of the directory containing the scripts.
+   */
+  public String getScriptFilePath() {
+    return binPath;
+  }
+
+  /**
+   * @return  the root path of the installed Samza job on the file system. 
This path may be in common with
+   *          other job instances if, for example, there are multiple configs 
defining separate instances.
+   */
+  public String getRootPath() {
+    return rootPath;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Job %s installed at %s", super.toString(), 
getRootPath());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/SimpleInstallationFinder.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/SimpleInstallationFinder.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/SimpleInstallationFinder.java
new file mode 100644
index 0000000..0adad5b
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/SimpleInstallationFinder.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.installation;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigFactory;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.rest.proxy.job.JobInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A simple default implementation of {@link InstallationFinder}.
+ *
+ * Assumes that one or more Samza jobs are contained in each sub directory of 
the provided installationsPath.
+ * Each sub directory is also expected to contian a bin directory and a config 
directory containing one or
+ * more job config files.
+ */
+public class SimpleInstallationFinder implements InstallationFinder {
+  private static final Logger log = 
LoggerFactory.getLogger(SimpleInstallationFinder.class);
+
+  protected static final String BIN_SUBPATH = "bin";
+  protected static final String CFG_SUBPATH = "config";
+
+  protected final String installationsPath;
+  protected final ConfigFactory jobConfigFactory;
+
+  /**
+   * Required constructor.
+   *
+   * @param installationsPath the root path where all Samza jobs are installed.
+   * @param jobConfigFactory  the {@link ConfigFactory} to use to read the job 
configs.
+   */
+  public SimpleInstallationFinder(String installationsPath, ConfigFactory 
jobConfigFactory) {
+    this.installationsPath = installationsPath;
+    this.jobConfigFactory = jobConfigFactory;
+  }
+
+  @Override
+  public boolean isInstalled(JobInstance jobInstance) {
+    return getAllInstalledJobs().containsKey(jobInstance);
+  }
+
+  @Override
+  public Map<JobInstance, InstallationRecord> getAllInstalledJobs() {
+    Map<JobInstance, InstallationRecord> installations = new HashMap<>();
+    for (File jobInstallPath : new File(installationsPath).listFiles()) {
+      if (!jobInstallPath.isDirectory()) {
+        continue;
+      }
+
+      findJobInstances(jobInstallPath, installations);
+    }
+    return installations;
+  }
+
+  /**
+   * Finds all the job instances in the specified path and adds a 
corresponding {@link JobInstance} and
+   * {@link InstallationRecord} for each instance.
+   *
+   * @param jobInstallPath  the path to search for job instances.
+   * @param jobs            the map to which the job instances will be added.
+   */
+  private void findJobInstances(final File jobInstallPath, final 
Map<JobInstance, InstallationRecord> jobs) {
+    try {
+      String jobInstallCanonPath = jobInstallPath.getCanonicalPath();
+      File configPath = Paths.get(jobInstallCanonPath, CFG_SUBPATH).toFile();
+      if (!(configPath.exists() && configPath.isDirectory())) {
+        log.debug("Config path not found: " + configPath);
+        return;
+      }
+
+      for (File configFile : configPath.listFiles()) {
+
+        if (configFile.isFile()) {
+
+          String configFilePath = configFile.getCanonicalPath();
+          Config config = jobConfigFactory.getConfig(new URI("file://" + 
configFilePath));
+
+          if (config.containsKey(JobConfig.JOB_NAME()) && 
config.containsKey(JobConfig.STREAM_JOB_FACTORY_CLASS())) {
+
+            String jobName = config.get(JobConfig.JOB_NAME());
+            String jobId = config.get(JobConfig.JOB_ID(), "1");
+            JobInstance jobInstance = new JobInstance(jobName, jobId);
+
+            if (jobs.containsKey(jobInstance)) {
+              throw new IllegalStateException(
+                  String.format("Found more than one job config with 
jobName:%s and jobId:%s", jobName, jobId));
+            }
+            InstallationRecord jobInstall =
+                new InstallationRecord(jobName, jobId, jobInstallCanonPath, 
configFilePath, getBinPath(jobInstallCanonPath));
+            jobs.put(jobInstance, jobInstall);
+          }
+        }
+      }
+    } catch (Exception e) {
+      throw new SamzaException("Exception finding job instance in path: " + 
jobInstallPath, e);
+    }
+  }
+
+  /**
+   * @param jobInstallPath  the root path of the job installation.
+   * @return                the bin directory within the job installation.
+   */
+  private String getBinPath(String jobInstallPath) {
+    return Paths.get(jobInstallPath, BIN_SUBPATH).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
new file mode 100644
index 0000000..bcc88d0
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ConfigFactory;
+import org.apache.samza.config.factories.PropertiesConfigFactory;
+import org.apache.samza.rest.model.Job;
+import org.apache.samza.rest.model.JobStatus;
+import org.apache.samza.rest.resources.JobsResourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implements a subset of the {@link JobProxy} interface with the default, 
cluster-agnostic,
+ * implementations. Subclasses are expected to override these default methods 
where necessary.
+ */
+public abstract class AbstractJobProxy implements JobProxy {
+  private static final Logger log = 
LoggerFactory.getLogger(AbstractJobProxy.class);
+
+  protected final JobsResourceConfig config;
+
+  /**
+   * Creates a new JobProxy instance from the factory class specified in the 
config.
+   *
+   * @param config  the config containing the job proxy factory property.
+   * @return        the JobProxy produced by the factory.
+   */
+  public static JobProxy fromFactory(JobsResourceConfig config) {
+    String jobProxyFactory = config.getJobProxyFactory();
+    if (jobProxyFactory != null && !jobProxyFactory.isEmpty()) {
+      try {
+        Class factoryCls = Class.forName(jobProxyFactory);
+        JobProxyFactory factory = (JobProxyFactory) factoryCls.newInstance();
+        return factory.getJobProxy(config);
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    } else {
+      throw new SamzaException("Missing config: " + 
JobsResourceConfig.CONFIG_JOB_PROXY_FACTORY);
+    }
+  }
+
+  /**
+   * Required constructor.
+   *
+   * @param config  the config containing the installations path.
+   */
+  public AbstractJobProxy(JobsResourceConfig config) {
+    this.config = config;
+  }
+
+  @Override
+  public List<Job> getAllJobStatuses()
+      throws IOException, InterruptedException {
+    List<Job> allJobs = new ArrayList<>();
+    Collection<JobInstance> jobInstances = getAllJobInstances();
+    for(JobInstance jobInstance : jobInstances) {
+        allJobs.add(new Job(jobInstance.getJobName(), jobInstance.getJobId()));
+    }
+    getJobStatusProvider().getJobStatuses(allJobs);
+
+    return allJobs;
+  }
+
+  /**
+   * Convenience method to get the Samza job status from the name and id.
+   *
+   * @param jobInstance           the instance of the job.
+   * @return                      the current Samza status for the job.
+   * @throws IOException          if there was a problem executing the command 
to get the status.
+   * @throws InterruptedException if the thread was interrupted while waiting 
for the status result.
+   */
+  protected JobStatus getJobSamzaStatus(JobInstance jobInstance)
+      throws IOException, InterruptedException {
+    return getJobStatus(jobInstance).getStatus();
+  }
+
+  @Override
+  public Job getJobStatus(JobInstance jobInstance)
+      throws IOException, InterruptedException {
+    return getJobStatusProvider().getJobStatus(jobInstance);
+  }
+
+  @Override
+  public boolean jobExists(JobInstance jobInstance) {
+    return getAllJobInstances().contains(jobInstance);
+  }
+
+  /**
+   * @return the {@link ConfigFactory} to use to read job configuration files.
+   */
+  protected ConfigFactory getJobConfigFactory() {
+    String configFactoryClassName = 
config.get(JobsResourceConfig.CONFIG_JOB_CONFIG_FACTORY);
+    if (configFactoryClassName == null) {
+      configFactoryClassName = 
PropertiesConfigFactory.class.getCanonicalName();
+      log.warn("{} not specified. Defaulting to {}", 
JobsResourceConfig.CONFIG_JOB_CONFIG_FACTORY, configFactoryClassName);
+    }
+
+    try {
+      Class factoryCls = Class.forName(configFactoryClassName);
+      return (ConfigFactory) factoryCls.newInstance();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+  /**
+   * @return the {@link JobStatusProvider} to use in retrieving the job status.
+   */
+  protected abstract JobStatusProvider getJobStatusProvider();
+
+  /**
+   * @return all available job instances.
+   */
+  protected abstract Set<JobInstance> getAllJobInstances();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobInstance.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobInstance.java 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobInstance.java
new file mode 100644
index 0000000..97599c2
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobInstance.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Allows us to encapsulate the jobName,jobId tuple as one entity.
+ */
+public class JobInstance {
+
+  private final String jobName;
+  private final String jobId;
+
+  /**
+   * Required constructor.
+   *
+   * @param jobName the name of the job.
+   * @param jobId   the id of the job.
+   */
+  public JobInstance(String jobName, String jobId) {
+    this.jobName = Preconditions.checkNotNull(jobName);
+    this.jobId = Preconditions.checkNotNull(jobId);
+  }
+
+  /**
+   * @return  the name of the job.
+   */
+  public String getJobName() {
+    return jobName;
+  }
+
+  /**
+   * @return  the id of the job.
+   */
+  public String getJobId() {
+    return jobId;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 139;
+    int hash = prime * jobName.hashCode() + prime * jobId.hashCode();
+    return hash;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof JobInstance)) {
+      return false;
+    }
+
+    JobInstance otherJob = (JobInstance) other;
+    return this.jobName.equals(otherJob.jobName) && 
this.jobId.equals(otherJob.jobId);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("jobName:%s jobId:%s", jobName, jobId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxy.java 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxy.java
new file mode 100644
index 0000000..7e168d7
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxy.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.samza.rest.model.Job;
+import org.apache.samza.rest.model.JobStatus;
+
+
+/**
+ * Job proxy is the primary abstraction used by the REST API to interact with 
jobs.
+ *
+ * Concrete implementations of this interface may vary in the APIs they use to 
discover, start, stop, and stat jobs,
+ * how to interact with the installed jobs on disk, etc.
+ */
+public interface JobProxy {
+  /**
+   * @param jobInstance the instance of the job
+   * @return            true if the job exists and can be started, stopped, 
etc.
+   */
+  boolean jobExists(JobInstance jobInstance);
+
+  /**
+   * @return                      a {@link Job} for each Samza job instance 
installed on this host.
+   * @throws IOException          if there was a problem executing the command 
to get the status.
+   * @throws InterruptedException if the thread was interrupted while waiting 
for the status result.
+   */
+  List<Job> getAllJobStatuses()
+      throws IOException, InterruptedException;
+
+  /**
+   * @param jobInstance           the instance of the job for which the status 
is needed.
+   * @return                      a {@link Job} containing
+   *                              the status for the job specified by jobName 
and jobId.
+   * @throws IOException          if there was a problem executing the command 
to get the status.
+   * @throws InterruptedException if the thread was interrupted while waiting 
for the status result.
+   */
+  Job getJobStatus(JobInstance jobInstance)
+      throws IOException, InterruptedException;
+
+  /**
+   * Starts the job instance specified by jobName and jobId. When this method 
returns, the status of the job
+   * should be {@link JobStatus#STARTING} or
+   * {@link JobStatus#STARTED} depending on the implementation.
+   *
+   * @param jobInstance the instance of the job to start.
+   * @throws Exception  if the job could not be successfully started.
+   */
+  void start(JobInstance jobInstance)
+      throws Exception;
+
+  /**
+   * Stops the job instance specified by jobName and jobId. When this method 
returns, the status of the job
+   * should be {@link JobStatus#STOPPED}.
+   *
+   * @param jobInstance the instance of the job to stop.
+   * @throws Exception  if the job could not be successfully stopped.
+   */
+  void stop(JobInstance jobInstance)
+      throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java
new file mode 100644
index 0000000..067711a
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import org.apache.samza.rest.resources.JobsResourceConfig;
+
+
+/**
+ * Simple factory interface to produce instances of {@link JobProxy},
+ * depending on the implementation.
+ *
+ * To use a custom {@link JobProxy}, create an implementation of that 
interface, an implementation
+ * of this interface which instantiates the custom proxy and finally reference 
the custom factory
+ * in the config {@link JobsResourceConfig#CONFIG_JOB_PROXY_FACTORY}.
+ */
+public interface JobProxyFactory {
+
+  /**
+   * Creates a new {@link JobProxy} and initializes it with the specified 
config.
+   *
+   * @param config  the {@link org.apache.samza.rest.SamzaRestConfig} to pass 
to the proxy.
+   * @return        the created proxy.
+   */
+  JobProxy getJobProxy(JobsResourceConfig config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobStatusProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobStatusProvider.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobStatusProvider.java
new file mode 100644
index 0000000..23a7f73
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobStatusProvider.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.samza.rest.model.Job;
+
+
+/**
+ * Interface for getting job status independent of the underlying cluster 
implementation.
+ */
+public interface JobStatusProvider {
+  /**
+   * Populates the status* fields of each {@link Job} in the provided 
Collection.
+   *
+   * @param jobs                  the collection of {@link Job} for which the 
status is needed.
+   * @throws IOException          if there was a problem executing the command 
to get the status.
+   * @throws InterruptedException if the thread was interrupted while waiting 
for the status result.
+   */
+  void getJobStatuses(Collection<Job> jobs)
+      throws IOException, InterruptedException;
+
+  /**
+   * @param jobInstance           the instance of the job.
+   * @return                      a {@link Job} containing
+   *                              the status for the job specified by jobName 
and jobId.
+   * @throws IOException          if there was a problem executing the command 
to get the status.
+   * @throws InterruptedException if the thread was interrupted while waiting 
for the status result.
+   */
+  Job getJobStatus(JobInstance jobInstance)
+      throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/ScriptJobProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/ScriptJobProxy.java 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/ScriptJobProxy.java
new file mode 100644
index 0000000..2d14366
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/ScriptJobProxy.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.file.Paths;
+import org.apache.samza.rest.proxy.installation.InstallationFinder;
+import org.apache.samza.rest.proxy.installation.InstallationRecord;
+import org.apache.samza.rest.resources.JobsResourceConfig;
+import org.apache.samza.rest.script.ScriptPathProvider;
+import org.apache.samza.rest.script.ScriptRunner;
+
+/**
+ * Extends {@link AbstractJobProxy} with some script support functionality.
+ */
+public abstract class ScriptJobProxy extends AbstractJobProxy implements 
ScriptPathProvider {
+
+  protected final ScriptRunner scriptRunner = new ScriptRunner();
+
+  /**
+   * Required constructor.
+   *
+   * @param config  the config which specifies the path to the Samza framework 
installation.
+   */
+  public ScriptJobProxy(JobsResourceConfig config) {
+    super(config);
+  }
+
+  /**
+   * Constructs the path to the specified script within the job installation.
+   *
+   * @param jobInstance             the instance of the job.
+   * @param scriptName              the name of the script.
+   * @return                        the full path to the script.
+   * @throws FileNotFoundException  if the job installation path doesn't exist.
+   */
+  public String getScriptPath(JobInstance jobInstance, String scriptName)
+      throws FileNotFoundException {
+    String scriptPath;
+    InstallationRecord jobInstallation = 
getInstallationFinder().getAllInstalledJobs().get(jobInstance);
+    scriptPath = Paths.get(jobInstallation.getScriptFilePath(), 
scriptName).toString();
+
+    File scriptFile = new File(scriptPath);
+    if (!scriptFile.exists()) {
+      throw new FileNotFoundException("Script does not exist: " + scriptPath);
+    }
+    return scriptPath;
+  }
+
+  /**
+   * @return the {@link InstallationFinder} which will be used to find jobs 
installed on this machine.
+   */
+  protected abstract InstallationFinder getInstallationFinder();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
new file mode 100644
index 0000000..a935c98
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.rest.model.JobStatus;
+import org.apache.samza.rest.proxy.installation.InstallationFinder;
+import org.apache.samza.rest.proxy.installation.InstallationRecord;
+import org.apache.samza.rest.proxy.installation.SimpleInstallationFinder;
+import org.apache.samza.rest.resources.JobsResourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Extends the {@link ScriptJobProxy} with methods specific to simple Samza 
deployments.
+ */
+public class SimpleYarnJobProxy extends ScriptJobProxy {
+  private static final Logger log = 
LoggerFactory.getLogger(SimpleYarnJobProxy.class);
+
+  private static final String START_SCRIPT_NAME = "run-job.sh";
+  private static final String STOP_SCRIPT_NAME = "kill-yarn-job-by-name.sh";
+
+  private static final String CONFIG_FACTORY_PARAM = 
"--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory";
+  private static final String CONFIG_PATH_PARAM_FORMAT = 
"--config-path=file://%s";
+
+  private final JobStatusProvider statusProvider = new 
YarnCliJobStatusProvider(this);
+
+  private final InstallationFinder installFinder;
+
+  public SimpleYarnJobProxy(JobsResourceConfig config) {
+    super(config);
+
+    installFinder = new 
SimpleInstallationFinder(config.getInstallationsPath(), getJobConfigFactory());
+  }
+
+  @Override
+  public void start(JobInstance jobInstance)
+      throws Exception {
+    JobStatus currentStatus = getJobSamzaStatus(jobInstance);
+    if (currentStatus.hasBeenStarted()) {
+      log.info("Job {} will not be started because it is currently {}.", 
jobInstance, currentStatus.toString());
+      return;
+    }
+
+    String scriptPath = getScriptPath(jobInstance, START_SCRIPT_NAME);
+    int resultCode = scriptRunner.runScript(scriptPath, CONFIG_FACTORY_PARAM,
+        generateConfigPathParameter(jobInstance));
+    if (resultCode != 0) {
+      throw new SamzaException("Failed to start job. Result code: " + 
resultCode);
+    }
+  }
+
+  @Override
+  public void stop(JobInstance jobInstance)
+      throws Exception {
+    JobStatus currentStatus = getJobSamzaStatus(jobInstance);
+    if (!currentStatus.hasBeenStarted()) {
+      log.info("Job {} will not be stopped because it is currently {}.", 
jobInstance, currentStatus.toString());
+      return;
+    }
+
+    String scriptPath = getScriptPath(jobInstance, STOP_SCRIPT_NAME);
+    int resultCode = scriptRunner.runScript(scriptPath, 
YarnCliJobStatusProvider.getQualifiedJobName(jobInstance));
+    if (resultCode != 0) {
+      throw new SamzaException("Failed to stop job. Result code: " + 
resultCode);
+    }
+  }
+
+  /**
+   * Generates the command line argument which specifies the path to the 
config file for the job.
+   *
+   * @param jobInstance the instance of the job.
+   * @return            the --config-path command line argument.
+   */
+  private String generateConfigPathParameter(JobInstance jobInstance) {
+    InstallationRecord record = 
installFinder.getAllInstalledJobs().get(jobInstance);
+    return String.format(CONFIG_PATH_PARAM_FORMAT, record.getConfigFilePath());
+  }
+
+  /**
+   * @return the {@link JobStatusProvider} to use for retrieving job status.
+   */
+  public JobStatusProvider getJobStatusProvider() {
+    return statusProvider;
+  }
+
+  @Override
+  protected Set<JobInstance> getAllJobInstances() {
+    return installFinder.getAllInstalledJobs().keySet();
+  }
+
+  @Override
+  protected InstallationFinder getInstallationFinder() {
+    return installFinder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java
new file mode 100644
index 0000000..11d93d4
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import org.apache.samza.rest.resources.JobsResourceConfig;
+
+
+/**
+ * Factory to produce SimpleJobProxy instances.
+ *
+ * See {@link 
AbstractJobProxy#fromFactory(org.apache.samza.rest.resources.JobsResourceConfig)}
+ */
+public class SimpleYarnJobProxyFactory implements JobProxyFactory {
+
+  @Override
+  public JobProxy getJobProxy(JobsResourceConfig config) {
+    return new SimpleYarnJobProxy(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnCliJobStatusProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnCliJobStatusProvider.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnCliJobStatusProvider.java
new file mode 100644
index 0000000..d1f34e8
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnCliJobStatusProvider.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.samza.SamzaException;
+import org.apache.samza.rest.model.Job;
+import org.apache.samza.rest.model.JobStatus;
+import org.apache.samza.rest.script.ScriptOutputHandler;
+import org.apache.samza.rest.script.ScriptPathProvider;
+import org.apache.samza.rest.script.ScriptRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of the {@link JobStatusProvider} that retrieves
+ * the job status from the YARN command line interface.
+ */
+public class YarnCliJobStatusProvider implements JobStatusProvider {
+  private static final Logger log = 
LoggerFactory.getLogger(YarnCliJobStatusProvider.class);
+  private static final String JOB_NAME_ID_FORMAT = "%s_%s";
+  private final ScriptPathProvider scriptPathProvider;
+
+  /**
+   * Constructs the job name used in YARN. This is the value shown in the 
"Name"
+   * column of the Resource Manager UI.
+   *
+   * @param jobInstance the instance of the job.
+   * @return            the job name to use for the job in YARN.
+   */
+  public static String getQualifiedJobName(JobInstance jobInstance) {
+    return String.format(JOB_NAME_ID_FORMAT, jobInstance.getJobName(), 
jobInstance.getJobId());
+  }
+
+  /**
+   * Default constructor.
+   *
+   * @param provider a delegate that provides the path to the Samza yarn 
scripts.
+   */
+  public YarnCliJobStatusProvider(ScriptPathProvider provider) {
+    scriptPathProvider = provider;
+  }
+
+  @Override
+  public void getJobStatuses(Collection<Job> jobs)
+      throws IOException, InterruptedException {
+    if (jobs == null || jobs.isEmpty()) {
+      return;
+    }
+
+    // If the scripts are in the jobs, they will be in all job installations, 
so just pick one and get the script path.
+    Job anyJob = jobs.iterator().next();
+    String scriptPath = scriptPathProvider.getScriptPath(new 
JobInstance(anyJob.getJobName(), anyJob.getJobId()), "run-class.sh");
+
+    // We will identify jobs returned by the YARN application states by their 
qualified names, so build a map
+    // to translate back from that name to the JobInfo we wish to populate. 
This avoids parsing/delimiter issues.
+    final Map<String, Job> qualifiedJobToInfo = new HashMap<>();
+    for(Job job : jobs) {
+      qualifiedJobToInfo.put(getQualifiedJobName(new 
JobInstance(job.getJobName(), job.getJobId())), job);
+    }
+
+    // Run "application -list" command and get the YARN state for each 
application
+    ScriptRunner runner = new ScriptRunner();
+    int resultCode = runner.runScript(scriptPath, new ScriptOutputHandler() {
+      @Override
+      public void processScriptOutput(InputStream output)
+          throws IOException {
+        InputStreamReader isr = new InputStreamReader(output);
+        BufferedReader br = new BufferedReader(isr);
+        String line;
+        String APPLICATION_PREFIX = "application_";
+        log.debug("YARN status:");
+        while ((line = br.readLine()) != null) {
+          log.debug(line);
+          if (line.startsWith(APPLICATION_PREFIX)) {
+            String[] columns = line.split("\\s+");
+            String qualifiedName = columns[1];
+            String yarnState = columns[5];
+
+            JobStatus samzaStatus = 
yarnStateToSamzaStatus(YarnApplicationState.valueOf(yarnState.toUpperCase()));
+            Job job = qualifiedJobToInfo.get(qualifiedName);
+
+            // If job is null, it wasn't requested.  The default status is 
STOPPED because there could be many
+            // application attempts in that status. Only update the job status 
if it's not STOPPED.
+            if (job != null && (job.getStatusDetail() == null || samzaStatus 
!= JobStatus.STOPPED)) {
+              job.setStatusDetail(yarnState);
+              job.setStatus(samzaStatus);
+            }
+          }
+        }
+      }
+    }, "org.apache.hadoop.yarn.client.cli.ApplicationCLI", "application", 
"-list", "-appStates", "ALL");
+
+    if (resultCode != 0) {
+      throw new SamzaException("Failed to get job status. Result code: " + 
resultCode);
+    }
+  }
+
+  @Override
+  public Job getJobStatus(JobInstance jobInstance)
+      throws IOException, InterruptedException {
+    Job info = new Job(jobInstance.getJobName(), jobInstance.getJobId());
+    getJobStatuses(Collections.singletonList(info));
+    return info;
+  }
+
+  /**
+   * Translates the YARN application state to the more generic Samza job 
status.
+   *
+   * @param yarnState the YARN application state to translate.
+   * @return          the corresponding Samza job status.
+   */
+  private JobStatus yarnStateToSamzaStatus(YarnApplicationState yarnState) {
+    switch (yarnState) {
+      case RUNNING:
+        return JobStatus.STARTED;
+      case NEW:
+      case NEW_SAVING:
+      case SUBMITTED:
+      case ACCEPTED:
+        return JobStatus.STARTING;
+      case FINISHED:
+      case FAILED:
+      case KILLED:
+      default:
+        return JobStatus.STOPPED;
+    }
+  }
+}

Reply via email to