Repository: samza Updated Branches: refs/heads/master 17e65d1cb -> 260d1ff96
http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java new file mode 100644 index 0000000..e0224c6 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.resources; + +import java.util.Collections; +import java.util.List; +import org.apache.samza.config.Config; +import org.apache.samza.rest.SamzaRestConfig; + + +/** + * Instantiates all the resources that are shipped with the REST service. + */ +public class DefaultResourceFactory implements ResourceFactory { + @Override + public List<? extends Object> getResourceInstances(Config config) { + return Collections.singletonList(new JobsResource(new JobsResourceConfig(config))); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java new file mode 100644 index 0000000..a566db5 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.resources; + +import java.util.Collections; +import javax.inject.Singleton; +import javax.ws.rs.GET; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.samza.rest.model.Job; +import org.apache.samza.rest.model.JobStatus; +import org.apache.samza.rest.proxy.job.AbstractJobProxy; +import org.apache.samza.rest.proxy.job.JobInstance; +import org.apache.samza.rest.proxy.job.JobProxy; +import org.apache.samza.rest.proxy.job.JobProxyFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The REST resource for jobs. Handles all requests for the jobs collection + * or individual job instances. + */ +@Singleton +@Path("/v1/jobs") +public class JobsResource { + private static final Logger log = LoggerFactory.getLogger(JobsResource.class); + + /** The primary interface for interacting with jobs. */ + private final JobProxy jobProxy; + + /** + * Initializes a JobResource with {@link JobProxy} from the + * {@link JobProxyFactory} class specified in the configuration. + * + * @param config the configuration containing the {@link JobProxyFactory} class. + */ + public JobsResource(JobsResourceConfig config) { + jobProxy = AbstractJobProxy.fromFactory(config); + } + + /** + * Gets the {@link Job} for all the jobs installed on this host. + * + * @return a {@link javax.ws.rs.core.Response.Status#OK} {@link javax.ws.rs.core.Response} containing a list of + * {@link Job} for all the installed Samza jobs installed on this host. + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response getInstalledJobs() { + try { + return Response.ok(jobProxy.getAllJobStatuses()).build(); + } catch (Exception e) { + log.error("Error in getInstalledJobs.", e); + return errorResponse(e.getMessage()); + } + } + + /** + * Gets the {@link Job} for the job instance specified by jobName and jobId if + * it is installed on this host. + * + * @param jobName the name of the job as configured in {@link org.apache.samza.config.JobConfig#JOB_NAME}. + * @param jobId the id of the job as configured in {@link org.apache.samza.config.JobConfig#JOB_ID}. + * @return a {@link javax.ws.rs.core.Response.Status#OK} {@link javax.ws.rs.core.Response} + * containing a {@link Job} for the Samza job if it is + * installed on this host. {@link javax.ws.rs.core.Response.Status#NOT_FOUND} and + * {@link javax.ws.rs.core.Response.Status#INTERNAL_SERVER_ERROR} can occur for corresponding errors. + */ + @GET + @Path("/{jobName}/{jobId}") + @Produces(MediaType.APPLICATION_JSON) + public Response getJob( + @PathParam("jobName") final String jobName, + @PathParam("jobId") final String jobId) { + JobInstance jobInstance = new JobInstance(jobName, jobId); + try { + if (!jobProxy.jobExists(jobInstance)) { + return Response.status(Response.Status.NOT_FOUND).entity(Collections.singletonMap("message", + String.format("%s does not exist.", jobInstance))).build(); + } + + Job job = jobProxy.getJobStatus(jobInstance); + return Response.ok(job).build(); + } catch (Exception e) { + log.error("Error in getJob.", e); + return errorResponse(e.getMessage()); + } + } + + /** + * + * @param jobName the name of the job as configured in {@link org.apache.samza.config.JobConfig#JOB_NAME}. + * @param jobId the id of the job as configured in {@link org.apache.samza.config.JobConfig#JOB_ID}. + * @param status the {@link JobStatus} to which the job will transition. + * @return a {@link javax.ws.rs.core.Response.Status#ACCEPTED} {@link javax.ws.rs.core.Response} + * containing a {@link Job} for the Samza job if it is + * installed on this host. {@link javax.ws.rs.core.Response.Status#NOT_FOUND} + * {@link javax.ws.rs.core.Response.Status#BAD_REQUEST} and + * {@link javax.ws.rs.core.Response.Status#INTERNAL_SERVER_ERROR} can occur for corresponding errors. + */ + @PUT + @Path("/{jobName}/{jobId}") + @Produces(MediaType.APPLICATION_JSON) + public Response updateJobStatus( + @PathParam("jobName") final String jobName, + @PathParam("jobId") final String jobId, + @QueryParam("status") String status) { + JobInstance jobInstance = new JobInstance(jobName, jobId); + try { + if (!jobProxy.jobExists(jobInstance)) { + return Response.status(Response.Status.NOT_FOUND).entity(Collections + .singletonMap("message", String.format("Job %s instance %s is not installed on this host.", jobName, jobId))).build(); + } + + if (status == null) { + throw new IllegalArgumentException("Unrecognized status parameter: " + status); + } + + JobStatus samzaStatus = JobStatus.valueOf(status.toUpperCase()); + switch (samzaStatus) { + case STARTED: + log.info("Starting {}", jobInstance); + jobProxy.start(jobInstance); + Job infoStarted = jobProxy.getJobStatus(jobInstance); + return Response.accepted(infoStarted).build(); + case STOPPED: + log.info("Stopping {}", jobInstance); + jobProxy.stop(jobInstance); + Job infoStopped = jobProxy.getJobStatus(jobInstance); + return Response.accepted(infoStopped).build(); + default: + throw new IllegalArgumentException("Unsupported status: " + status); + } + } catch (IllegalArgumentException e) { + log.info(String.format("Illegal arguments updateJobStatus. JobName:%s JobId:%s Status=%s", jobName, jobId, status), e); + return Response.status(Response.Status.BAD_REQUEST).entity( + Collections.singletonMap("message", e.getMessage())).build(); + } catch (Exception e) { + log.error("Error in updateJobStatus.", e); + return errorResponse(String.format("Error type: %s message: %s", e.toString(), e.getMessage())); + } + } + + /** + * Constructs a consistent format for error responses. This method should be used for every error case. + * + * @param message the error message to report. + * @return the {@link Response} containing the error message. + */ + private Response errorResponse(String message) { + return Response.serverError().entity(Collections.singletonMap("message", message)).build(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java new file mode 100644 index 0000000..527482d --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.resources; + +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.rest.proxy.installation.InstallationRecord; +import org.apache.samza.rest.proxy.job.JobProxy; +import org.apache.samza.rest.proxy.job.JobProxyFactory; + + +/** + * Configurations for the {@link JobsResource} endpoint. + */ +public class JobsResourceConfig extends MapConfig { + /** + * Specifies the canonical name of the {@link JobProxyFactory} class to produce + * {@link JobProxy} instances. + * + * To use your own proxy, implement the factory and specify the class for this config. + */ + public static final String CONFIG_JOB_PROXY_FACTORY = "job.proxy.factory.class"; + + /** + * The path where all the Samza jobs are installed (unzipped). Each subdirectory of this path + * is expected to be a Samza job installation and corresponds to one {@link InstallationRecord}. + */ + public static final String CONFIG_JOB_INSTALLATIONS_PATH = "job.installations.path"; + + /** + * Specifies the canonical name of the {@link org.apache.samza.config.ConfigFactory} to read the job configs. + */ + public static final String CONFIG_JOB_CONFIG_FACTORY = "job.config.factory.class"; + + public JobsResourceConfig(Config config) { + super(config); + } + + /** + * @see JobsResourceConfig#CONFIG_JOB_CONFIG_FACTORY + * @return the canonical name of the {@link JobProxyFactory} class to produce {@link JobProxy} instances. + */ + public String getJobProxyFactory() { + return get(CONFIG_JOB_PROXY_FACTORY); + } + + /** + * @see JobsResourceConfig#CONFIG_JOB_INSTALLATIONS_PATH + * @return the path where all the Samza jobs are installed (unzipped). + */ + public String getInstallationsPath() { + return sanitizePath(get(CONFIG_JOB_INSTALLATIONS_PATH)); + } + + /** + * Ensures a usable file path when the user specifies a tilde for the home path. + * + * @param rawPath the original path. + * @return the updated path with the tilde resolved to home. + */ + private static String sanitizePath(String rawPath) { + if (rawPath == null) { + return null; + } + return rawPath.replaceFirst("^~", System.getProperty("user.home")); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/resources/ResourceFactory.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/ResourceFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/ResourceFactory.java new file mode 100644 index 0000000..be83eb6 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/ResourceFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.resources; + +import java.util.List; +import org.apache.samza.config.Config; + + +/** + * Instantiates a resource using the provided config. + * + * This is used to instantiate and register a specific instance of the object rather than registering the class. + */ +public interface ResourceFactory { + + /** + * Constructs and returns resource instances to register with the server. + * + * @param config the server config used to initialize the objects. + * @return a collection of instances to register with the server. + */ + List<? extends Object> getResourceInstances(Config config); +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptOutputHandler.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptOutputHandler.java b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptOutputHandler.java new file mode 100644 index 0000000..af3bf0a --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptOutputHandler.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.script; + +import java.io.IOException; +import java.io.InputStream; + + +/** + * A script output handler processes the stream of output from the stdout and stderr channels of a script. + */ +public interface ScriptOutputHandler { + + /** + * Processes the script output represented by the InputStream. + * + * Implementations must fully process the stream or the script may hang. + * + * @param output the stream of output from the script. + * @throws IOException if there are problems reading the output. + */ + void processScriptOutput(InputStream output) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptPathProvider.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptPathProvider.java b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptPathProvider.java new file mode 100644 index 0000000..dbb9849 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptPathProvider.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.script; + +import java.io.FileNotFoundException; +import org.apache.samza.rest.proxy.job.JobInstance; + + +/** + * Defines the protocol for getting script paths. + */ +public interface ScriptPathProvider { + /** + * @param jobInstance the job instance which may be used to access the job installation for the script. + * @param scriptName the name of the script file. Not the full path. + * @return the full path to the specified script. + * @throws FileNotFoundException if the script does not exist. + */ + String getScriptPath(JobInstance jobInstance, String scriptName) + throws FileNotFoundException; +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java new file mode 100644 index 0000000..b70a0f1 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.script; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.samza.SamzaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Runs a script process and returns the exit code. + * + * The script can be run with an output handler or with output redirected to console. + */ +public class ScriptRunner { + private static final Logger log = LoggerFactory.getLogger(ScriptRunner.class); + private static final int DEFAULT_SCRIPT_CMD_TIMEOUT_S = 30; + private int scriptTimeout = DEFAULT_SCRIPT_CMD_TIMEOUT_S; + + protected long getScriptTimeoutS() { + return scriptTimeout; + } + + /** + * Runs a script with IO inherited from the current Java process. Typically this redirects to console. + * + * @param scriptPath the path to the script file. + * @param args the command line args to pass to the script. + * @return the exit code returned by the script. + * @throws IOException if there was a problem running the process. + * @throws InterruptedException if the thread is interrupted while waiting for the process to finish. + */ + public int runScript(String scriptPath, String... args) + throws IOException, InterruptedException { + ProcessBuilder processBuilder = getProcessBuilder(scriptPath, args); + Process p = processBuilder.inheritIO().start(); + + return waitForExitValue(p); + } + + /** + * @param scriptPath the path to the script file. + * @param outputHandler the handler for any stdout and stderr produced by the script. + * @param args the command line args to pass to the script. + * @return the exit code returned by the script. + * @throws IOException if there was a problem running the process. + * @throws InterruptedException if the thread is interrupted while waiting for the process to finish. + */ + public int runScript(String scriptPath, ScriptOutputHandler outputHandler, String... args) + throws IOException, InterruptedException { + ProcessBuilder processBuilder = getProcessBuilder(scriptPath, args); + Process p = processBuilder.redirectErrorStream(true).start(); + + InputStream output = p.getInputStream(); + outputHandler.processScriptOutput(output); + + return waitForExitValue(p); + } + + /** + * @param scriptPath the path to the script file. + * @param args the command line args to pass to the script. + * @return a {@link java.lang.ProcessBuilder} for the script and args. + */ + private ProcessBuilder getProcessBuilder(String scriptPath, String[] args) { + List<String> command = new ArrayList<>(args.length + 1); + command.add(scriptPath); + command.addAll(Arrays.asList(args)); + + log.debug("Building process with command {}", command); + return new ProcessBuilder(command); + } + + /** + * Waits for a finite time interval for the script to complete. + * + * @param p the process on which this method will wait. + * @return the exit code returned by the process. + * @throws InterruptedException if the thread is interrupted while waiting for the process to finish. + */ + private int waitForExitValue(final Process p) + throws InterruptedException { + log.debug("Waiting for the exit value for process {}", p); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + p.waitFor(); + } catch (InterruptedException ignore) { + return; + } + } + }); + + t.start(); + try { + t.join(TimeUnit.MILLISECONDS.convert(getScriptTimeoutS(), TimeUnit.SECONDS)); + } catch (InterruptedException e) { + t.interrupt(); + throw new SamzaException("Timeout running shell command", e); + } + + int exitVal = p.exitValue(); + log.debug("Exit value {}", exitVal); + return exitVal; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/resources/log4j.xml b/samza-rest/src/main/resources/log4j.xml new file mode 100644 index 0000000..c5c1556 --- /dev/null +++ b/samza-rest/src/main/resources/log4j.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +--> +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + <appender name="RollingAppender" class="org.apache.log4j.RollingFileAppender"> + <param name="File" value="${samza.log.dir}/samza-rest-service.log" /> + <param name="MaxFileSize" value="256MB" /> + <param name="MaxBackupIndex" value="20" /> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" /> + </layout> + </appender> + + <logger name="org.apache.hadoop"> + <level value="off" /> + </logger> + + <root> + <priority value="info" /> + <appender-ref ref="RollingAppender"/> + </root> +</log4j:configuration> http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java b/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java new file mode 100644 index 0000000..1da3430 --- /dev/null +++ b/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.monitor; + +import org.apache.samza.config.MapConfig; +import org.apache.samza.monitor.mock.ExceptionThrowingMonitor; +import org.apache.samza.monitor.mock.InstantSchedulingProvider; +import org.apache.samza.rest.SamzaRestConfig; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.fail; + +public class TestMonitorService { + + @Test + public void testGetMonitorsFromClassName() { + // Test that monitors are instantiated properly from config strings. + Monitor monitor = null; + try { + monitor = MonitorLoader.fromClassName("org.apache.samza.monitor.mock.DummyMonitor"); + } catch (InstantiationException e) { + fail(); + } + + // Object should implement monitor(). + try { + monitor.monitor(); + } catch (Exception e) { + fail(); + } + } + + @Test + public void testMonitorExceptionIsolation() { + // Test that an exception from a monitor doesn't bubble up out of the scheduler. + Monitor monitor = new ExceptionThrowingMonitor(); + InstantSchedulingProvider provider = new InstantSchedulingProvider(); + + // Initialize with a monitor that immediately throws an exception when run. + Map<String, String> map = new HashMap<>(); + map.put(SamzaRestConfig.CONFIG_MONITOR_CLASSES, "org.apache.samza.monitor.mock.ExceptionThrowingMonitor"); + map.put(SamzaRestConfig.CONFIG_MONITOR_INTERVAL_MS, "1"); + SamzaRestConfig config = new SamzaRestConfig(new MapConfig(map)); + SamzaMonitorService monitorService = new SamzaMonitorService(config, provider); + + // This will throw if the exception isn't caught within the provider. + monitorService.start(); + monitorService.stop(); + } + + @Test + public void testScheduledExecutorSchedulingProvider() { + // Test that the monitor is scheduled by the ScheduledExecutorSchedulingProvider + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + ScheduledExecutorSchedulingProvider provider = + new ScheduledExecutorSchedulingProvider(executorService); + + // notifyingMonitor.monitor() should be called repeatedly. + final CountDownLatch wasCalledLatch = new CountDownLatch(3); + + final Monitor notifyingMonitor = new Monitor() { + @Override + public void monitor() { + wasCalledLatch.countDown(); + } + }; + + Runnable runnableMonitor = new Runnable() { + public void run() { + try { + notifyingMonitor.monitor(); + } catch (Exception e) { + // Must be caught because they are checked in monitor() + fail(); + } + } + }; + + // monitor should get called every 1ms, so if await() misses the first call, there will be more. + provider.schedule(runnableMonitor, 1); + + try { + assertTrue(wasCalledLatch.await(5l, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + executorService.shutdownNow(); + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/test/java/org/apache/samza/monitor/mock/DummyMonitor.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/mock/DummyMonitor.java b/samza-rest/src/test/java/org/apache/samza/monitor/mock/DummyMonitor.java new file mode 100644 index 0000000..8621db1 --- /dev/null +++ b/samza-rest/src/test/java/org/apache/samza/monitor/mock/DummyMonitor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.monitor.mock; + +import org.apache.samza.monitor.Monitor; + +public class DummyMonitor implements Monitor { + + public void monitor() { + // Do nothing! + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/test/java/org/apache/samza/monitor/mock/ExceptionThrowingMonitor.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/mock/ExceptionThrowingMonitor.java b/samza-rest/src/test/java/org/apache/samza/monitor/mock/ExceptionThrowingMonitor.java new file mode 100644 index 0000000..c4f3f73 --- /dev/null +++ b/samza-rest/src/test/java/org/apache/samza/monitor/mock/ExceptionThrowingMonitor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.monitor.mock; + +import org.apache.samza.monitor.Monitor; + +import java.io.IOException; + +public class ExceptionThrowingMonitor implements Monitor { + public void monitor() throws IOException { + throw new IOException("I don't know what I was expecting."); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java b/samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java new file mode 100644 index 0000000..6ae80e6 --- /dev/null +++ b/samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.monitor.mock; + +import org.apache.samza.monitor.SchedulingProvider; + +/** + * Instead of scheduling a monitor to run, just runs it ASAP. + */ +public class InstantSchedulingProvider implements SchedulingProvider { + + public void schedule(Runnable runnableMonitor, int interval) { + runnableMonitor.run(); + } + + // Nothing to stop because no deferred task was started + public void stop() {} +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java new file mode 100644 index 0000000..7db437b --- /dev/null +++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.resources; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Form; +import javax.ws.rs.core.Response; +import org.apache.samza.config.MapConfig; +import org.apache.samza.rest.SamzaRestApplication; +import org.apache.samza.rest.SamzaRestConfig; +import org.apache.samza.rest.model.Job; +import org.apache.samza.rest.model.JobStatus; +import org.apache.samza.rest.resources.mock.MockJobProxy; +import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + + +public class TestJobsResource extends JerseyTest { + ObjectMapper objectMapper = SamzaObjectMapper.getObjectMapper(); + + @Override + protected Application configure() { + Map<String, String> map = new HashMap<>(); + map.put(JobsResourceConfig.CONFIG_JOB_PROXY_FACTORY, "org.apache.samza.rest.resources.mock.MockJobProxyFactory"); + map.put(JobsResourceConfig.CONFIG_JOB_INSTALLATIONS_PATH, "."); + SamzaRestConfig config = new SamzaRestConfig(new MapConfig(map)); + return new SamzaRestApplication(config); + } + + @Test + public void testGetJobs() + throws IOException { + + Response resp = target("v1/jobs").request().get(); + assertEquals(200, resp.getStatus()); + final Job[] jobs = objectMapper.readValue(resp.readEntity(String.class), Job[].class); + assertEquals(4, jobs.length); + + assertEquals(MockJobProxy.JOB_INSTANCE_1_NAME, jobs[0].getJobName()); + assertEquals(MockJobProxy.JOB_INSTANCE_1_ID, jobs[0].getJobId()); + assertStatusNotDefault(jobs[0]); + assertEquals(MockJobProxy.JOB_INSTANCE_2_NAME, jobs[1].getJobName()); + assertEquals(MockJobProxy.JOB_INSTANCE_2_ID, jobs[1].getJobId()); + assertStatusNotDefault(jobs[1]); + assertEquals(MockJobProxy.JOB_INSTANCE_3_NAME, jobs[2].getJobName()); + assertEquals(MockJobProxy.JOB_INSTANCE_3_ID, jobs[2].getJobId()); + assertStatusNotDefault(jobs[2]); + assertEquals(MockJobProxy.JOB_INSTANCE_4_NAME, jobs[3].getJobName()); + assertEquals(MockJobProxy.JOB_INSTANCE_4_ID, jobs[3].getJobId()); + assertStatusNotDefault(jobs[3]); + resp.close(); + } + + @Test + public void testPostJobs() + throws IOException { + Response resp = target("v1/jobs").request().post(Entity.text("")); + assertEquals(405, resp.getStatus()); + resp.close(); + } + + @Test + public void testPutJobs() + throws IOException { + Response resp = target("v1/jobs").request().put(Entity.text("")); + assertEquals(405, resp.getStatus()); + resp.close(); + } + + @Test + public void testGetJob() + throws IOException { + Response resp = target(String.format("v1/jobs/%s/%s", MockJobProxy.JOB_INSTANCE_2_NAME, MockJobProxy.JOB_INSTANCE_2_ID)).request().get(); + assertEquals(200, resp.getStatus()); + final Job job2 = objectMapper.readValue(resp.readEntity(String.class), Job.class); + + assertEquals(MockJobProxy.JOB_INSTANCE_2_NAME, job2.getJobName()); + assertEquals(MockJobProxy.JOB_INSTANCE_2_ID, job2.getJobId()); + assertStatusNotDefault(job2); + resp.close(); + } + + @Test + public void testPostJob() + throws IOException { + Response resp = target(String.format("v1/jobs/%s/%s", MockJobProxy.JOB_INSTANCE_2_NAME, MockJobProxy.JOB_INSTANCE_2_ID)).request().post( + Entity.text("")); + assertEquals(405, resp.getStatus()); + resp.close(); + } + + @Test + public void testGetJobNameNotFound() + throws IOException { + Response resp = target(String.format("v1/jobs/%s/%s", "BadJobName", MockJobProxy.JOB_INSTANCE_2_ID)).request().get(); + assertEquals(404, resp.getStatus()); + + final Map<String, String> errorMessage = objectMapper.readValue(resp.readEntity(String.class), new TypeReference<Map<String, String>>() {}); + assertTrue(errorMessage.get("message"), errorMessage.get("message").contains("does not exist")); + resp.close(); + } + + @Test + public void testGetJobIdNotFound() + throws IOException { + Response resp = target(String.format("v1/jobs/%s/%s", MockJobProxy.JOB_INSTANCE_2_NAME, "BadJobId")).request().get(); + assertEquals(404, resp.getStatus()); + + final Map<String, String> errorMessage = objectMapper.readValue(resp.readEntity(String.class), new TypeReference<Map<String, String>>() {}); + assertTrue(errorMessage.get("message"), errorMessage.get("message").contains("does not exist")); + resp.close(); + } + + @Test + public void testGetJobNameWithoutId() + throws IOException { + Response resp = target(String.format("v1/jobs/%s", MockJobProxy.JOB_INSTANCE_2_NAME)).request().get(); + assertEquals(404, resp.getStatus()); + resp.close(); + } + + @Test + public void testStartJob() + throws IOException { + Response resp = target(String.format("v1/jobs/%s/%s", MockJobProxy.JOB_INSTANCE_2_NAME, MockJobProxy.JOB_INSTANCE_2_ID)) + .queryParam("status", "started").request().put(Entity.form(new Form())); + assertEquals(202, resp.getStatus()); + + final Job job2 = objectMapper.readValue(resp.readEntity(String.class), Job.class); + assertEquals(MockJobProxy.JOB_INSTANCE_2_NAME, job2.getJobName()); + assertEquals(MockJobProxy.JOB_INSTANCE_2_ID, job2.getJobId()); + assertStatusNotDefault(job2); + resp.close(); + } + + @Test + public void testStopJob() + throws IOException { + Response resp = target(String.format("v1/jobs/%s/%s", MockJobProxy.JOB_INSTANCE_2_NAME, MockJobProxy.JOB_INSTANCE_2_ID)) + .queryParam("status", "stopped").request().put(Entity.form(new Form())); + assertEquals(202, resp.getStatus()); + + final Job job2 = objectMapper.readValue(resp.readEntity(String.class), Job.class); + assertEquals(MockJobProxy.JOB_INSTANCE_2_NAME, job2.getJobName()); + assertEquals(MockJobProxy.JOB_INSTANCE_2_ID, job2.getJobId()); + assertStatusNotDefault(job2); + resp.close(); + } + + @Test + public void testPutBadJobStatus() + throws IOException { + Response resp = target(String.format("v1/jobs/%s/%s", MockJobProxy.JOB_INSTANCE_2_NAME, MockJobProxy.JOB_INSTANCE_2_ID)) + .queryParam("status", "BADSTATUS").request().put(Entity.form(new Form())); + assertEquals(400, resp.getStatus()); + + final Map<String, String> errorMessage = objectMapper.readValue(resp.readEntity(String.class), new TypeReference<Map<String, String>>() {}); + assertTrue(errorMessage.get("message").contains("BADSTATUS")); + resp.close(); + } + + @Test + public void testPutMissingStatus() + throws IOException { + Response resp = target(String.format("v1/jobs/%s/%s", MockJobProxy.JOB_INSTANCE_2_NAME, MockJobProxy.JOB_INSTANCE_2_ID)).request() + .put(Entity.form(new Form())); + assertEquals(400, resp.getStatus()); + + final Map<String, String> errorMessage = objectMapper.readValue(resp.readEntity(String.class), new TypeReference<Map<String, String>>() {}); + assertTrue(errorMessage.get("message").contains("status")); + resp.close(); + } + + private void assertStatusNotDefault(Job job) { + // Job status should be populated, not the defaults. + // We're not testing whether it value matches a specific value here, + // just that the value reflects whatever the JobStatusProvider returns. + assertFalse(JobStatus.UNKNOWN == job.getStatus()); + assertNotNull(job.getStatusDetail()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxy.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxy.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxy.java new file mode 100644 index 0000000..96f8db5 --- /dev/null +++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxy.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.resources.mock; + +import java.util.LinkedHashSet; +import java.util.Set; +import org.apache.samza.rest.proxy.job.AbstractJobProxy; +import org.apache.samza.rest.proxy.job.JobInstance; +import org.apache.samza.rest.proxy.job.JobStatusProvider; +import org.apache.samza.rest.resources.JobsResourceConfig; + + +public class MockJobProxy extends AbstractJobProxy { + + public static final String JOB_INSTANCE_1_NAME = "Job1"; + public static final String JOB_INSTANCE_1_ID = "i001"; + + public static final String JOB_INSTANCE_2_NAME = "Job1"; + public static final String JOB_INSTANCE_2_ID = "i002"; + + public static final String JOB_INSTANCE_3_NAME = "Job2"; + public static final String JOB_INSTANCE_3_ID = "i001"; + + public static final String JOB_INSTANCE_4_NAME = "Job3"; + public static final String JOB_INSTANCE_4_ID = "1"; + /** + * Required constructor. + * + + * @param config the config containing the installations path. + */ + public MockJobProxy(JobsResourceConfig config) { + super(config); + } + + @Override + protected JobStatusProvider getJobStatusProvider() { + return new MockJobStatusProvider(); + } + + @Override + protected Set<JobInstance> getAllJobInstances() { + Set<JobInstance> validatedInstallations = new LinkedHashSet<>(); + + validatedInstallations.add(new JobInstance(JOB_INSTANCE_1_NAME, JOB_INSTANCE_1_ID)); + validatedInstallations.add(new JobInstance(JOB_INSTANCE_2_NAME, JOB_INSTANCE_2_ID)); + validatedInstallations.add(new JobInstance(JOB_INSTANCE_3_NAME, JOB_INSTANCE_3_ID)); + + validatedInstallations.add(new JobInstance(JOB_INSTANCE_4_NAME, JOB_INSTANCE_4_ID)); + + return validatedInstallations; + } + + @Override + public void start(JobInstance jobInstance) + throws Exception { + + } + + @Override + public void stop(JobInstance jobInstance) + throws Exception { + + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxyFactory.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxyFactory.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxyFactory.java new file mode 100644 index 0000000..03e95b1 --- /dev/null +++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobProxyFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.resources.mock; + +import org.apache.samza.rest.proxy.job.JobProxy; +import org.apache.samza.rest.proxy.job.JobProxyFactory; +import org.apache.samza.rest.resources.JobsResourceConfig; + + +public class MockJobProxyFactory implements JobProxyFactory{ + @Override + public JobProxy getJobProxy(JobsResourceConfig config) { + return new MockJobProxy(config); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobStatusProvider.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobStatusProvider.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobStatusProvider.java new file mode 100644 index 0000000..df0f18a --- /dev/null +++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockJobStatusProvider.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.resources.mock; + +import java.io.IOException; +import java.util.Collection; +import org.apache.samza.rest.proxy.job.JobInstance; +import org.apache.samza.rest.proxy.job.JobStatusProvider; +import org.apache.samza.rest.model.Job; +import org.apache.samza.rest.model.JobStatus; + + +public class MockJobStatusProvider implements JobStatusProvider { + @Override + public void getJobStatuses(Collection<Job> jobs) + throws IOException, InterruptedException { + for (Job info : jobs) { + setStatusStarted(info); + } + } + + @Override + public Job getJobStatus(JobInstance jobInstance) + throws IOException, InterruptedException { + Job info = new Job(jobInstance.getJobName(), jobInstance.getJobId()); + setStatusStarted(info); + return info; + } + + private void setStatusStarted(Job info) { + info.setStatus(JobStatus.STARTED); + info.setStatusDetail("RUNNING"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-shell/src/main/bash/kill-yarn-job-by-name.sh ---------------------------------------------------------------------- diff --git a/samza-shell/src/main/bash/kill-yarn-job-by-name.sh b/samza-shell/src/main/bash/kill-yarn-job-by-name.sh new file mode 100755 index 0000000..06eacc7 --- /dev/null +++ b/samza-shell/src/main/bash/kill-yarn-job-by-name.sh @@ -0,0 +1,38 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml" + +# Get the id for the app with the specified name that is also ACCEPTED or RUNNING status +APP_ID=$(exec "$(dirname $0)"/run-class.sh org.apache.hadoop.yarn.client.cli.ApplicationCLI application -list | grep "[[:space:]]$1[[:space:]]" | grep "application_" | awk -F ' ' '{ print $1 }') +echo "Job name ($1) matched app IDs: ($APP_ID)" + +# If the app id was not found, it either doesn't exist or was already stopped. +if [ -z "$APP_ID" ]; +then + exit 0 +fi + +# Verify that only one application matches +COUNT=$(echo "$APP_ID" | wc -l) +if [ $COUNT -gt 1 ]; +then + exit 150 +fi + +# Kill the job and check the return code +"$(dirname $0)"/kill-yarn-job.sh "$APP_ID" http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index 4c1aa10..6ea62b4 100644 --- a/settings.gradle +++ b/settings.gradle @@ -20,7 +20,8 @@ include \ 'samza-api', 'samza-elasticsearch', 'samza-log4j', - 'samza-shell' + 'samza-shell', + 'samza-rest' def scalaModules = [ 'samza-core',
