Repository: samza
Updated Branches:
  refs/heads/master 17e65d1cb -> 260d1ff96


http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java
new file mode 100644
index 0000000..e0224c6
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.samza.config.Config;
+import org.apache.samza.rest.SamzaRestConfig;
+
+
+/**
+ * Instantiates all the resources that are shipped with the REST service.
+ */
+public class DefaultResourceFactory implements ResourceFactory {
+  @Override
+  public List<? extends Object> getResourceInstances(Config config) {
+    return Collections.singletonList(new JobsResource(new 
JobsResourceConfig(config)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java 
b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java
new file mode 100644
index 0000000..a566db5
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import java.util.Collections;
+import javax.inject.Singleton;
+import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.samza.rest.model.Job;
+import org.apache.samza.rest.model.JobStatus;
+import org.apache.samza.rest.proxy.job.AbstractJobProxy;
+import org.apache.samza.rest.proxy.job.JobInstance;
+import org.apache.samza.rest.proxy.job.JobProxy;
+import org.apache.samza.rest.proxy.job.JobProxyFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The REST resource for jobs. Handles all requests for the jobs collection
+ * or individual job instances.
+ */
+@Singleton
+@Path("/v1/jobs")
+public class JobsResource {
+  private static final Logger log = 
LoggerFactory.getLogger(JobsResource.class);
+
+  /** The primary interface for interacting with jobs. */
+  private final JobProxy jobProxy;
+
+  /**
+   * Initializes a JobResource with {@link JobProxy} from the
+   * {@link JobProxyFactory} class specified in the configuration.
+   *
+   * @param config  the configuration containing the {@link JobProxyFactory} 
class.
+   */
+  public JobsResource(JobsResourceConfig config) {
+    jobProxy = AbstractJobProxy.fromFactory(config);
+  }
+
+  /**
+   * Gets the {@link Job} for all the jobs installed on this host.
+   *
+   * @return a {@link javax.ws.rs.core.Response.Status#OK} {@link 
javax.ws.rs.core.Response} containing a list of
+   * {@link Job} for all the installed Samza jobs installed on this host.
+   */
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getInstalledJobs() {
+    try {
+      return Response.ok(jobProxy.getAllJobStatuses()).build();
+    } catch (Exception e) {
+      log.error("Error in getInstalledJobs.", e);
+      return errorResponse(e.getMessage());
+    }
+  }
+
+  /**
+   * Gets the {@link Job} for the job instance specified by jobName and jobId 
if
+   * it is installed on this host.
+   *
+   * @param jobName the name of the job as configured in {@link 
org.apache.samza.config.JobConfig#JOB_NAME}.
+   * @param jobId   the id of the job as configured in {@link 
org.apache.samza.config.JobConfig#JOB_ID}.
+   * @return        a {@link javax.ws.rs.core.Response.Status#OK} {@link 
javax.ws.rs.core.Response}
+   *                containing a {@link Job} for the Samza job if it is
+   *                installed on this host. {@link 
javax.ws.rs.core.Response.Status#NOT_FOUND} and
+   *                {@link 
javax.ws.rs.core.Response.Status#INTERNAL_SERVER_ERROR} can occur for 
corresponding errors.
+   */
+  @GET
+  @Path("/{jobName}/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getJob(
+      @PathParam("jobName") final String jobName,
+      @PathParam("jobId") final String jobId) {
+    JobInstance jobInstance = new JobInstance(jobName, jobId);
+    try {
+      if (!jobProxy.jobExists(jobInstance)) {
+        return 
Response.status(Response.Status.NOT_FOUND).entity(Collections.singletonMap("message",
+            String.format("%s does not exist.", jobInstance))).build();
+      }
+
+      Job job = jobProxy.getJobStatus(jobInstance);
+      return Response.ok(job).build();
+    } catch (Exception e) {
+      log.error("Error in getJob.", e);
+      return errorResponse(e.getMessage());
+    }
+  }
+
+  /**
+   *
+   * @param jobName the name of the job as configured in {@link 
org.apache.samza.config.JobConfig#JOB_NAME}.
+   * @param jobId   the id of the job as configured in {@link 
org.apache.samza.config.JobConfig#JOB_ID}.
+   * @param status   the {@link JobStatus} to which the job will transition.
+   * @return        a {@link javax.ws.rs.core.Response.Status#ACCEPTED} {@link 
javax.ws.rs.core.Response}
+   *                containing a {@link Job} for the Samza job if it is
+   *                installed on this host. {@link 
javax.ws.rs.core.Response.Status#NOT_FOUND}
+   *                {@link javax.ws.rs.core.Response.Status#BAD_REQUEST} and
+   *                {@link 
javax.ws.rs.core.Response.Status#INTERNAL_SERVER_ERROR} can occur for 
corresponding errors.
+   */
+  @PUT
+  @Path("/{jobName}/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateJobStatus(
+      @PathParam("jobName") final String jobName,
+      @PathParam("jobId") final String jobId,
+      @QueryParam("status") String status) {
+    JobInstance jobInstance = new JobInstance(jobName, jobId);
+    try {
+      if (!jobProxy.jobExists(jobInstance)) {
+        return Response.status(Response.Status.NOT_FOUND).entity(Collections
+            .singletonMap("message", String.format("Job %s instance %s is not 
installed on this host.", jobName, jobId))).build();
+      }
+
+      if (status == null) {
+        throw new IllegalArgumentException("Unrecognized status parameter: " + 
status);
+      }
+
+      JobStatus samzaStatus = JobStatus.valueOf(status.toUpperCase());
+      switch (samzaStatus) {
+        case STARTED:
+          log.info("Starting {}", jobInstance);
+          jobProxy.start(jobInstance);
+          Job infoStarted = jobProxy.getJobStatus(jobInstance);
+          return Response.accepted(infoStarted).build();
+        case STOPPED:
+          log.info("Stopping {}", jobInstance);
+          jobProxy.stop(jobInstance);
+          Job infoStopped = jobProxy.getJobStatus(jobInstance);
+          return Response.accepted(infoStopped).build();
+        default:
+          throw new IllegalArgumentException("Unsupported status: " + status);
+      }
+    } catch (IllegalArgumentException e) {
+      log.info(String.format("Illegal arguments updateJobStatus. JobName:%s 
JobId:%s Status=%s", jobName, jobId, status), e);
+      return Response.status(Response.Status.BAD_REQUEST).entity(
+          Collections.singletonMap("message", e.getMessage())).build();
+    } catch (Exception e) {
+      log.error("Error in updateJobStatus.", e);
+      return errorResponse(String.format("Error type: %s message: %s", 
e.toString(), e.getMessage()));
+    }
+  }
+
+  /**
+   * Constructs a consistent format for error responses. This method should be 
used for every error case.
+   *
+   * @param message the error message to report.
+   * @return        the {@link Response} containing the error message.
+   */
+  private Response errorResponse(String message) {
+    return Response.serverError().entity(Collections.singletonMap("message", 
message)).build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java
new file mode 100644
index 0000000..527482d
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.rest.proxy.installation.InstallationRecord;
+import org.apache.samza.rest.proxy.job.JobProxy;
+import org.apache.samza.rest.proxy.job.JobProxyFactory;
+
+
+/**
+ * Configurations for the {@link JobsResource} endpoint.
+ */
+public class JobsResourceConfig extends MapConfig {
+  /**
+   * Specifies the canonical name of the {@link JobProxyFactory} class to 
produce
+   * {@link JobProxy} instances.
+   *
+   * To use your own proxy, implement the factory and specify the class for 
this config.
+   */
+  public static final String CONFIG_JOB_PROXY_FACTORY = 
"job.proxy.factory.class";
+
+  /**
+   * The path where all the Samza jobs are installed (unzipped). Each 
subdirectory of this path
+   * is expected to be a Samza job installation and corresponds to one {@link 
InstallationRecord}.
+   */
+  public static final String CONFIG_JOB_INSTALLATIONS_PATH = 
"job.installations.path";
+
+  /**
+   * Specifies the canonical name of the {@link 
org.apache.samza.config.ConfigFactory} to read the job configs.
+   */
+  public static final String CONFIG_JOB_CONFIG_FACTORY = 
"job.config.factory.class";
+
+  public JobsResourceConfig(Config config) {
+    super(config);
+  }
+
+  /**
+   * @see JobsResourceConfig#CONFIG_JOB_CONFIG_FACTORY
+   * @return the canonical name of the {@link JobProxyFactory} class to 
produce {@link JobProxy} instances.
+   */
+  public String getJobProxyFactory() {
+    return get(CONFIG_JOB_PROXY_FACTORY);
+  }
+
+  /**
+   * @see JobsResourceConfig#CONFIG_JOB_INSTALLATIONS_PATH
+   * @return the path where all the Samza jobs are installed (unzipped).
+   */
+  public String getInstallationsPath() {
+    return sanitizePath(get(CONFIG_JOB_INSTALLATIONS_PATH));
+  }
+
+  /**
+   * Ensures a usable file path when the user specifies a tilde for the home 
path.
+   *
+   * @param rawPath the original path.
+   * @return        the updated path with the tilde resolved to home.
+   */
+  private static String sanitizePath(String rawPath) {
+    if (rawPath == null) {
+      return null;
+    }
+    return rawPath.replaceFirst("^~", System.getProperty("user.home"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/resources/ResourceFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/resources/ResourceFactory.java 
b/samza-rest/src/main/java/org/apache/samza/rest/resources/ResourceFactory.java
new file mode 100644
index 0000000..be83eb6
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/resources/ResourceFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import java.util.List;
+import org.apache.samza.config.Config;
+
+
+/**
+ * Instantiates a resource using the provided config.
+ *
+ * This is used to instantiate and register a specific instance of the object 
rather than registering the class.
+ */
+public interface ResourceFactory {
+
+  /**
+   * Constructs and returns resource instances to register with the server.
+   *
+   * @param config  the server config used to initialize the objects.
+   * @return        a collection of instances to register with the server.
+   */
+  List<? extends Object> getResourceInstances(Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptOutputHandler.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptOutputHandler.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptOutputHandler.java
new file mode 100644
index 0000000..af3bf0a
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptOutputHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.script;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+
+/**
+ * A script output handler processes the stream of output from the stdout and 
stderr channels of a script.
+ */
+public interface ScriptOutputHandler {
+
+  /**
+   * Processes the script output represented by the InputStream.
+   *
+   * Implementations must fully process the stream or the script may hang.
+   *
+   * @param output the stream of output from the script.
+   * @throws IOException if there are problems reading the output.
+   */
+  void processScriptOutput(InputStream output)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptPathProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptPathProvider.java 
b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptPathProvider.java
new file mode 100644
index 0000000..dbb9849
--- /dev/null
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptPathProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.script;
+
+import java.io.FileNotFoundException;
+import org.apache.samza.rest.proxy.job.JobInstance;
+
+
+/**
+ * Defines the protocol for getting script paths.
+ */
+public interface ScriptPathProvider {
+  /**
+   * @param jobInstance             the job instance which may be used to 
access the job installation for the script.
+   * @param scriptName              the name of the script file. Not the full 
path.
+   * @return                        the full path to the specified script.
+   * @throws FileNotFoundException  if the script does not exist.
+   */
+  String getScriptPath(JobInstance jobInstance, String scriptName)
+      throws FileNotFoundException;
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java 
b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java
new file mode 100644
index 0000000..b70a0f1
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.script;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.SamzaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Runs a script process and returns the exit code.
+ *
+ * The script can be run with an output handler or with output redirected to 
console.
+ */
+public class ScriptRunner {
+  private static final Logger log = 
LoggerFactory.getLogger(ScriptRunner.class);
+  private static final int DEFAULT_SCRIPT_CMD_TIMEOUT_S = 30;
+  private int scriptTimeout = DEFAULT_SCRIPT_CMD_TIMEOUT_S;
+
+  protected long getScriptTimeoutS() {
+    return scriptTimeout;
+  }
+
+  /**
+   * Runs a script with IO inherited from the current Java process. Typically 
this redirects to console.
+   *
+   * @param scriptPath            the path to the script file.
+   * @param args                  the command line args to pass to the script.
+   * @return                      the exit code returned by the script.
+   * @throws IOException          if there was a problem running the process.
+   * @throws InterruptedException if the thread is interrupted while waiting 
for the process to finish.
+   */
+  public int runScript(String scriptPath, String... args)
+      throws IOException, InterruptedException {
+    ProcessBuilder processBuilder = getProcessBuilder(scriptPath, args);
+    Process p = processBuilder.inheritIO().start();
+
+    return waitForExitValue(p);
+  }
+
+  /**
+   * @param scriptPath            the path to the script file.
+   * @param outputHandler         the handler for any stdout and stderr 
produced by the script.
+   * @param args                  the command line args to pass to the script.
+   * @return                      the exit code returned by the script.
+   * @throws IOException          if there was a problem running the process.
+   * @throws InterruptedException if the thread is interrupted while waiting 
for the process to finish.
+   */
+  public int runScript(String scriptPath, ScriptOutputHandler outputHandler, 
String... args)
+      throws IOException, InterruptedException {
+    ProcessBuilder processBuilder = getProcessBuilder(scriptPath, args);
+    Process p = processBuilder.redirectErrorStream(true).start();
+
+    InputStream output = p.getInputStream();
+    outputHandler.processScriptOutput(output);
+
+    return waitForExitValue(p);
+  }
+
+  /**
+   * @param scriptPath  the path to the script file.
+   * @param args        the command line args to pass to the script.
+   * @return            a {@link java.lang.ProcessBuilder} for the script and 
args.
+   */
+  private ProcessBuilder getProcessBuilder(String scriptPath, String[] args) {
+    List<String> command = new ArrayList<>(args.length + 1);
+    command.add(scriptPath);
+    command.addAll(Arrays.asList(args));
+
+    log.debug("Building process with command {}", command);
+    return new ProcessBuilder(command);
+  }
+
+  /**
+   * Waits for a finite time interval for the script to complete.
+   *
+   * @param p                     the process on which this method will wait.
+   * @return                      the exit code returned by the process.
+   * @throws InterruptedException if the thread is interrupted while waiting 
for the process to finish.
+   */
+  private int waitForExitValue(final Process p)
+      throws InterruptedException {
+    log.debug("Waiting for the exit value for process {}", p);
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          p.waitFor();
+        } catch (InterruptedException ignore) {
+          return;
+        }
+      }
+    });
+
+    t.start();
+    try {
+      t.join(TimeUnit.MILLISECONDS.convert(getScriptTimeoutS(), 
TimeUnit.SECONDS));
+    } catch (InterruptedException e) {
+      t.interrupt();
+      throw new SamzaException("Timeout running shell command", e);
+    }
+
+    int exitVal = p.exitValue();
+    log.debug("Exit value {}", exitVal);
+    return exitVal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/resources/log4j.xml 
b/samza-rest/src/main/resources/log4j.xml
new file mode 100644
index 0000000..c5c1556
--- /dev/null
+++ b/samza-rest/src/main/resources/log4j.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/";>
+  <appender name="RollingAppender" 
class="org.apache.log4j.RollingFileAppender">
+    <param name="File" value="${samza.log.dir}/samza-rest-service.log" />
+    <param name="MaxFileSize" value="256MB" />
+    <param name="MaxBackupIndex" value="20" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] 
%c{1} [%p] %m%n" />
+    </layout>
+  </appender>
+
+  <logger name="org.apache.hadoop">
+    <level value="off" />
+  </logger>
+
+  <root>
+    <priority value="info" />
+    <appender-ref ref="RollingAppender"/>
+  </root>
+</log4j:configuration>

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java 
b/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
new file mode 100644
index 0000000..1da3430
--- /dev/null
+++ b/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor;
+
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.monitor.mock.ExceptionThrowingMonitor;
+import org.apache.samza.monitor.mock.InstantSchedulingProvider;
+import org.apache.samza.rest.SamzaRestConfig;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestMonitorService {
+
+    @Test
+    public void testGetMonitorsFromClassName() {
+        // Test that monitors are instantiated properly from config strings.
+        Monitor monitor = null;
+        try {
+            monitor = 
MonitorLoader.fromClassName("org.apache.samza.monitor.mock.DummyMonitor");
+        } catch (InstantiationException e) {
+            fail();
+        }
+
+        // Object should implement monitor().
+        try {
+            monitor.monitor();
+        } catch (Exception e) {
+            fail();
+        }
+    }
+
+    @Test
+    public void testMonitorExceptionIsolation() {
+        // Test that an exception from a monitor doesn't bubble up out of the 
scheduler.
+        Monitor monitor = new ExceptionThrowingMonitor();
+        InstantSchedulingProvider provider = new InstantSchedulingProvider();
+
+        // Initialize with a monitor that immediately throws an exception when 
run.
+        Map<String, String> map = new HashMap<>();
+        map.put(SamzaRestConfig.CONFIG_MONITOR_CLASSES, 
"org.apache.samza.monitor.mock.ExceptionThrowingMonitor");
+        map.put(SamzaRestConfig.CONFIG_MONITOR_INTERVAL_MS, "1");
+        SamzaRestConfig config = new SamzaRestConfig(new MapConfig(map));
+        SamzaMonitorService monitorService = new SamzaMonitorService(config, 
provider);
+
+        // This will throw if the exception isn't caught within the provider.
+        monitorService.start();
+        monitorService.stop();
+    }
+
+    @Test
+    public void testScheduledExecutorSchedulingProvider() {
+        // Test that the monitor is scheduled by the 
ScheduledExecutorSchedulingProvider
+        ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(1);
+        ScheduledExecutorSchedulingProvider provider =
+                new ScheduledExecutorSchedulingProvider(executorService);
+
+        // notifyingMonitor.monitor() should be called repeatedly.
+        final CountDownLatch wasCalledLatch = new CountDownLatch(3);
+
+        final Monitor notifyingMonitor = new Monitor() {
+            @Override
+            public void monitor() {
+                wasCalledLatch.countDown();
+            }
+        };
+
+        Runnable runnableMonitor = new Runnable() {
+            public void run() {
+                try {
+                    notifyingMonitor.monitor();
+                } catch (Exception e) {
+                    // Must be caught because they are checked in monitor()
+                    fail();
+                }
+            }
+        };
+
+        // monitor should get called every 1ms, so if await() misses the first 
call, there will be more.
+        provider.schedule(runnableMonitor, 1);
+
+        try {
+            assertTrue(wasCalledLatch.await(5l, TimeUnit.SECONDS));
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        } finally {
+            executorService.shutdownNow();
+        }
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/test/java/org/apache/samza/monitor/mock/DummyMonitor.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/test/java/org/apache/samza/monitor/mock/DummyMonitor.java 
b/samza-rest/src/test/java/org/apache/samza/monitor/mock/DummyMonitor.java
new file mode 100644
index 0000000..8621db1
--- /dev/null
+++ b/samza-rest/src/test/java/org/apache/samza/monitor/mock/DummyMonitor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor.mock;
+
+import org.apache.samza.monitor.Monitor;
+
+public class DummyMonitor implements Monitor {
+
+    public void monitor() {
+        // Do nothing!
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/test/java/org/apache/samza/monitor/mock/ExceptionThrowingMonitor.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/test/java/org/apache/samza/monitor/mock/ExceptionThrowingMonitor.java
 
b/samza-rest/src/test/java/org/apache/samza/monitor/mock/ExceptionThrowingMonitor.java
new file mode 100644
index 0000000..c4f3f73
--- /dev/null
+++ 
b/samza-rest/src/test/java/org/apache/samza/monitor/mock/ExceptionThrowingMonitor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor.mock;
+
+import org.apache.samza.monitor.Monitor;
+
+import java.io.IOException;
+
+public class ExceptionThrowingMonitor implements Monitor {
+    public void monitor() throws IOException {
+        throw new IOException("I don't know what I was expecting.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java
 
b/samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java
new file mode 100644
index 0000000..6ae80e6
--- /dev/null
+++ 
b/samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor.mock;
+
+import org.apache.samza.monitor.SchedulingProvider;
+
+/**
+ * Instead of scheduling a monitor to run, just runs it ASAP.
+ */
+public class InstantSchedulingProvider implements SchedulingProvider {
+
+    public void schedule(Runnable runnableMonitor, int interval) {
+        runnableMonitor.run();
+    }
+
+    // Nothing to stop because no deferred task was started
+    public void stop() {}
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java
 
b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java
new file mode 100644
index 0000000..7db437b
--- /dev/null
+++ 
b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Form;
+import javax.ws.rs.core.Response;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.rest.SamzaRestApplication;
+import org.apache.samza.rest.SamzaRestConfig;
+import org.apache.samza.rest.model.Job;
+import org.apache.samza.rest.model.JobStatus;
+import org.apache.samza.rest.resources.mock.MockJobProxy;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestJobsResource extends JerseyTest {
+  ObjectMapper objectMapper = SamzaObjectMapper.getObjectMapper();
+
+  @Override
+  protected Application configure() {
+    Map<String, String> map = new HashMap<>();
+    map.put(JobsResourceConfig.CONFIG_JOB_PROXY_FACTORY, 
"org.apache.samza.rest.resources.mock.MockJobProxyFactory");
+    map.put(JobsResourceConfig.CONFIG_JOB_INSTALLATIONS_PATH, ".");
+    SamzaRestConfig config = new SamzaRestConfig(new MapConfig(map));
+    return new SamzaRestApplication(config);
+  }
+
+  @Test
+   public void testGetJobs()
+      throws IOException {
+
+    Response resp = target("v1/jobs").request().get();
+    assertEquals(200, resp.getStatus());
+    final Job[] jobs = objectMapper.readValue(resp.readEntity(String.class), 
Job[].class);
+    assertEquals(4, jobs.length);
+
+    assertEquals(MockJobProxy.JOB_INSTANCE_1_NAME, jobs[0].getJobName());
+    assertEquals(MockJobProxy.JOB_INSTANCE_1_ID, jobs[0].getJobId());
+    assertStatusNotDefault(jobs[0]);
+    assertEquals(MockJobProxy.JOB_INSTANCE_2_NAME, jobs[1].getJobName());
+    assertEquals(MockJobProxy.JOB_INSTANCE_2_ID, jobs[1].getJobId());
+    assertStatusNotDefault(jobs[1]);
+    assertEquals(MockJobProxy.JOB_INSTANCE_3_NAME, jobs[2].getJobName());
+    assertEquals(MockJobProxy.JOB_INSTANCE_3_ID, jobs[2].getJobId());
+    assertStatusNotDefault(jobs[2]);
+    assertEquals(MockJobProxy.JOB_INSTANCE_4_NAME, jobs[3].getJobName());
+    assertEquals(MockJobProxy.JOB_INSTANCE_4_ID, jobs[3].getJobId());
+    assertStatusNotDefault(jobs[3]);
+    resp.close();
+  }
+
+  @Test
+   public void testPostJobs()
+      throws IOException {
+    Response resp = target("v1/jobs").request().post(Entity.text(""));
+    assertEquals(405, resp.getStatus());
+    resp.close();
+  }
+
+  @Test
+  public void testPutJobs()
+      throws IOException {
+    Response resp = target("v1/jobs").request().put(Entity.text(""));
+    assertEquals(405, resp.getStatus());
+    resp.close();
+  }
+
+  @Test
+  public void testGetJob()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s/%s", 
MockJobProxy.JOB_INSTANCE_2_NAME, 
MockJobProxy.JOB_INSTANCE_2_ID)).request().get();
+    assertEquals(200, resp.getStatus());
+    final Job job2 = objectMapper.readValue(resp.readEntity(String.class), 
Job.class);
+
+    assertEquals(MockJobProxy.JOB_INSTANCE_2_NAME, job2.getJobName());
+    assertEquals(MockJobProxy.JOB_INSTANCE_2_ID, job2.getJobId());
+    assertStatusNotDefault(job2);
+    resp.close();
+  }
+
+  @Test
+  public void testPostJob()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s/%s", 
MockJobProxy.JOB_INSTANCE_2_NAME, 
MockJobProxy.JOB_INSTANCE_2_ID)).request().post(
+        Entity.text(""));
+    assertEquals(405, resp.getStatus());
+    resp.close();
+  }
+
+  @Test
+  public void testGetJobNameNotFound()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s/%s", "BadJobName", 
MockJobProxy.JOB_INSTANCE_2_ID)).request().get();
+    assertEquals(404, resp.getStatus());
+
+    final Map<String, String> errorMessage = 
objectMapper.readValue(resp.readEntity(String.class), new 
TypeReference<Map<String, String>>() {});
+    assertTrue(errorMessage.get("message"), 
errorMessage.get("message").contains("does not exist"));
+    resp.close();
+  }
+
+  @Test
+  public void testGetJobIdNotFound()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s/%s", 
MockJobProxy.JOB_INSTANCE_2_NAME, "BadJobId")).request().get();
+    assertEquals(404, resp.getStatus());
+
+    final Map<String, String> errorMessage = 
objectMapper.readValue(resp.readEntity(String.class), new 
TypeReference<Map<String, String>>() {});
+    assertTrue(errorMessage.get("message"), 
errorMessage.get("message").contains("does not exist"));
+    resp.close();
+  }
+
+  @Test
+  public void testGetJobNameWithoutId()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s", 
MockJobProxy.JOB_INSTANCE_2_NAME)).request().get();
+    assertEquals(404, resp.getStatus());
+    resp.close();
+  }
+
+  @Test
+  public void testStartJob()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s/%s", 
MockJobProxy.JOB_INSTANCE_2_NAME, MockJobProxy.JOB_INSTANCE_2_ID))
+        .queryParam("status", "started").request().put(Entity.form(new 
Form()));
+    assertEquals(202, resp.getStatus());
+
+    final Job job2 = objectMapper.readValue(resp.readEntity(String.class), 
Job.class);
+    assertEquals(MockJobProxy.JOB_INSTANCE_2_NAME, job2.getJobName());
+    assertEquals(MockJobProxy.JOB_INSTANCE_2_ID, job2.getJobId());
+    assertStatusNotDefault(job2);
+    resp.close();
+  }
+
+  @Test
+  public void testStopJob()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s/%s", 
MockJobProxy.JOB_INSTANCE_2_NAME, MockJobProxy.JOB_INSTANCE_2_ID))
+        .queryParam("status", "stopped").request().put(Entity.form(new 
Form()));
+    assertEquals(202, resp.getStatus());
+
+    final Job job2 = objectMapper.readValue(resp.readEntity(String.class), 
Job.class);
+    assertEquals(MockJobProxy.JOB_INSTANCE_2_NAME, job2.getJobName());
+    assertEquals(MockJobProxy.JOB_INSTANCE_2_ID, job2.getJobId());
+    assertStatusNotDefault(job2);
+    resp.close();
+  }
+
+  @Test
+  public void testPutBadJobStatus()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s/%s", 
MockJobProxy.JOB_INSTANCE_2_NAME, MockJobProxy.JOB_INSTANCE_2_ID))
+        .queryParam("status", "BADSTATUS").request().put(Entity.form(new 
Form()));
+    assertEquals(400, resp.getStatus());
+
+    final Map<String, String> errorMessage = 
objectMapper.readValue(resp.readEntity(String.class), new 
TypeReference<Map<String, String>>() {});
+    assertTrue(errorMessage.get("message").contains("BADSTATUS"));
+    resp.close();
+  }
+
+  @Test
+  public void testPutMissingStatus()
+      throws IOException {
+    Response resp = target(String.format("v1/jobs/%s/%s", 
MockJobProxy.JOB_INSTANCE_2_NAME, MockJobProxy.JOB_INSTANCE_2_ID)).request()
+        .put(Entity.form(new Form()));
+    assertEquals(400, resp.getStatus());
+
+    final Map<String, String> errorMessage = 
objectMapper.readValue(resp.readEntity(String.class), new 
TypeReference<Map<String, String>>() {});
+    assertTrue(errorMessage.get("message").contains("status"));
+    resp.close();
+  }
+
+  private void assertStatusNotDefault(Job job)  {
+    // Job status should be populated, not the defaults.
+    // We're not testing whether it value matches a specific value here,
+    // just that the value reflects whatever the JobStatusProvider returns.
+    assertFalse(JobStatus.UNKNOWN == job.getStatus());
+    assertNotNull(job.getStatusDetail());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxy.java
 
b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxy.java
new file mode 100644
index 0000000..96f8db5
--- /dev/null
+++ 
b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxy.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources.mock;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+import org.apache.samza.rest.proxy.job.AbstractJobProxy;
+import org.apache.samza.rest.proxy.job.JobInstance;
+import org.apache.samza.rest.proxy.job.JobStatusProvider;
+import org.apache.samza.rest.resources.JobsResourceConfig;
+
+
+public class MockJobProxy extends AbstractJobProxy {
+
+  public static final String JOB_INSTANCE_1_NAME = "Job1";
+  public static final String JOB_INSTANCE_1_ID = "i001";
+
+  public static final String JOB_INSTANCE_2_NAME = "Job1";
+  public static final String JOB_INSTANCE_2_ID = "i002";
+
+  public static final String JOB_INSTANCE_3_NAME = "Job2";
+  public static final String JOB_INSTANCE_3_ID = "i001";
+
+  public static final String JOB_INSTANCE_4_NAME = "Job3";
+  public static final String JOB_INSTANCE_4_ID = "1";
+  /**
+   * Required constructor.
+   *
+
+   * @param config  the config containing the installations path.
+   */
+  public MockJobProxy(JobsResourceConfig config) {
+    super(config);
+  }
+
+  @Override
+  protected JobStatusProvider getJobStatusProvider() {
+    return new MockJobStatusProvider();
+  }
+
+  @Override
+  protected Set<JobInstance> getAllJobInstances() {
+    Set<JobInstance> validatedInstallations = new LinkedHashSet<>();
+
+    validatedInstallations.add(new JobInstance(JOB_INSTANCE_1_NAME, 
JOB_INSTANCE_1_ID));
+    validatedInstallations.add(new JobInstance(JOB_INSTANCE_2_NAME, 
JOB_INSTANCE_2_ID));
+    validatedInstallations.add(new JobInstance(JOB_INSTANCE_3_NAME, 
JOB_INSTANCE_3_ID));
+
+    validatedInstallations.add(new JobInstance(JOB_INSTANCE_4_NAME, 
JOB_INSTANCE_4_ID));
+
+    return validatedInstallations;
+  }
+
+  @Override
+  public void start(JobInstance jobInstance)
+      throws Exception {
+
+  }
+
+  @Override
+  public void stop(JobInstance jobInstance)
+      throws Exception {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxyFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxyFactory.java
 
b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxyFactory.java
new file mode 100644
index 0000000..03e95b1
--- /dev/null
+++ 
b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxyFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources.mock;
+
+import org.apache.samza.rest.proxy.job.JobProxy;
+import org.apache.samza.rest.proxy.job.JobProxyFactory;
+import org.apache.samza.rest.resources.JobsResourceConfig;
+
+
+public class MockJobProxyFactory implements JobProxyFactory{
+  @Override
+  public JobProxy getJobProxy(JobsResourceConfig config) {
+    return new MockJobProxy(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobStatusProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobStatusProvider.java
 
b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobStatusProvider.java
new file mode 100644
index 0000000..df0f18a
--- /dev/null
+++ 
b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobStatusProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources.mock;
+
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.samza.rest.proxy.job.JobInstance;
+import org.apache.samza.rest.proxy.job.JobStatusProvider;
+import org.apache.samza.rest.model.Job;
+import org.apache.samza.rest.model.JobStatus;
+
+
+public class MockJobStatusProvider implements JobStatusProvider {
+  @Override
+  public void getJobStatuses(Collection<Job> jobs)
+      throws IOException, InterruptedException {
+     for (Job info : jobs) {
+       setStatusStarted(info);
+     }
+  }
+
+  @Override
+  public Job getJobStatus(JobInstance jobInstance)
+      throws IOException, InterruptedException {
+    Job info = new Job(jobInstance.getJobName(), jobInstance.getJobId());
+    setStatusStarted(info);
+    return info;
+  }
+
+  private void setStatusStarted(Job info) {
+    info.setStatus(JobStatus.STARTED);
+    info.setStatusDetail("RUNNING");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-shell/src/main/bash/kill-yarn-job-by-name.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/kill-yarn-job-by-name.sh 
b/samza-shell/src/main/bash/kill-yarn-job-by-name.sh
new file mode 100755
index 0000000..06eacc7
--- /dev/null
+++ b/samza-shell/src/main/bash/kill-yarn-job-by-name.sh
@@ -0,0 +1,38 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS 
-Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+# Get the id for the app with the specified name that is also ACCEPTED or 
RUNNING status
+APP_ID=$(exec "$(dirname $0)"/run-class.sh 
org.apache.hadoop.yarn.client.cli.ApplicationCLI application -list | grep 
"[[:space:]]$1[[:space:]]" | grep "application_" | awk -F ' ' '{ print $1 }')
+echo "Job name ($1) matched app IDs: ($APP_ID)"
+
+# If the app id was not found, it either doesn't exist or was already stopped.
+if [ -z "$APP_ID" ];
+then
+  exit 0
+fi
+
+# Verify that only one application matches
+COUNT=$(echo "$APP_ID" | wc -l)
+if [ $COUNT -gt 1 ];
+then
+  exit 150
+fi
+
+# Kill the job and check the return code
+"$(dirname $0)"/kill-yarn-job.sh "$APP_ID"

http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 4c1aa10..6ea62b4 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -20,7 +20,8 @@ include \
   'samza-api',
   'samza-elasticsearch',
   'samza-log4j',
-  'samza-shell'
+  'samza-shell',
+  'samza-rest'
 
 def scalaModules = [
         'samza-core',

Reply via email to