SAMZA-975 - Initial Samza REST Implementation
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/260d1ff9 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/260d1ff9 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/260d1ff9 Branch: refs/heads/master Commit: 260d1ff963eaa6ea6b0eb57ea9585a4751f40ddd Parents: 17e65d1 Author: Jacob Maes <[email protected]> Authored: Mon Aug 22 23:38:13 2016 -0700 Committer: Navina Ramesh <[email protected]> Committed: Mon Aug 22 23:38:13 2016 -0700 ---------------------------------------------------------------------- LICENSE | 7 + build.gradle | 58 +++++ gradle/dependency-versions.gradle | 1 + .../src/main/bash/run-samza-rest-service.sh | 22 ++ .../src/main/config/samza-rest.properties | 23 ++ .../java/org/apache/samza/monitor/Monitor.java | 39 ++++ .../org/apache/samza/monitor/MonitorLoader.java | 43 ++++ .../samza/monitor/SamzaMonitorService.java | 95 +++++++++ .../ScheduledExecutorSchedulingProvider.java | 40 ++++ .../samza/monitor/SchedulingProvider.java | 35 ++++ .../apache/samza/rest/SamzaRestApplication.java | 94 +++++++++ .../org/apache/samza/rest/SamzaRestConfig.java | 129 ++++++++++++ .../org/apache/samza/rest/SamzaRestService.java | 159 ++++++++++++++ .../java/org/apache/samza/rest/model/Job.java | 81 +++++++ .../org/apache/samza/rest/model/JobStatus.java | 53 +++++ .../proxy/installation/InstallationFinder.java | 47 +++++ .../proxy/installation/InstallationRecord.java | 70 +++++++ .../installation/SimpleInstallationFinder.java | 131 ++++++++++++ .../samza/rest/proxy/job/AbstractJobProxy.java | 139 ++++++++++++ .../samza/rest/proxy/job/JobInstance.java | 78 +++++++ .../apache/samza/rest/proxy/job/JobProxy.java | 78 +++++++ .../samza/rest/proxy/job/JobProxyFactory.java | 41 ++++ .../samza/rest/proxy/job/JobStatusProvider.java | 49 +++++ .../samza/rest/proxy/job/ScriptJobProxy.java | 71 +++++++ .../rest/proxy/job/SimpleYarnJobProxy.java | 114 ++++++++++ .../proxy/job/SimpleYarnJobProxyFactory.java | 35 ++++ .../proxy/job/YarnCliJobStatusProvider.java | 154 ++++++++++++++ .../rest/resources/DefaultResourceFactory.java | 35 ++++ .../samza/rest/resources/JobsResource.java | 175 ++++++++++++++++ .../rest/resources/JobsResourceConfig.java | 84 ++++++++ .../samza/rest/resources/ResourceFactory.java | 39 ++++ .../samza/rest/script/ScriptOutputHandler.java | 40 ++++ .../samza/rest/script/ScriptPathProvider.java | 37 ++++ .../apache/samza/rest/script/ScriptRunner.java | 129 ++++++++++++ samza-rest/src/main/resources/log4j.xml | 41 ++++ .../samza/monitor/TestMonitorService.java | 115 ++++++++++ .../apache/samza/monitor/mock/DummyMonitor.java | 28 +++ .../monitor/mock/ExceptionThrowingMonitor.java | 29 +++ .../monitor/mock/InstantSchedulingProvider.java | 34 +++ .../samza/rest/resources/TestJobsResource.java | 209 +++++++++++++++++++ .../samza/rest/resources/mock/MockJobProxy.java | 81 +++++++ .../resources/mock/MockJobProxyFactory.java | 31 +++ .../resources/mock/MockJobStatusProvider.java | 50 +++++ .../src/main/bash/kill-yarn-job-by-name.sh | 38 ++++ settings.gradle | 3 +- 45 files changed, 3083 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/LICENSE ---------------------------------------------------------------------- diff --git a/LICENSE b/LICENSE index e1439fe..1d81cd3 100644 --- a/LICENSE +++ b/LICENSE @@ -252,3 +252,10 @@ SIL Open Font License (OFT) - http://scripts.sil.org/OFL/ - Font-awesome font files v4.0.3 (http://fortawesome.github.io/Font-Awesome/) - Ropa Sans fonts v1.1 - Copyright (c) 2011, Botjo Nikoltchev, with Reserved Font Name "Ropa Sans" +----------------------------------------------------------------------- + The CDDL License +----------------------------------------------------------------------- + +The Apache Samza project bundles the following files under the CDDL License + +- Jersey v2.22.1 (https://jersey.java.net/license.html) - Copyright (c) 2010-2015 Oracle Corporation \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 1d4eb74..004c81e 100644 --- a/build.gradle +++ b/build.gradle @@ -469,6 +469,64 @@ project(":samza-hdfs_$scalaVersion") { } } +project(":samza-rest") { + apply plugin: 'java' + + dependencies { + compile project(":samza-shell") + compile project(":samza-core_$scalaVersion") + + runtime "org.slf4j:slf4j-log4j12:$slf4jVersion" + compile "com.google.guava:guava:$guavaVersion" + compile "org.glassfish.jersey.core:jersey-server:$jerseyVersion" + compile "org.glassfish.jersey.containers:jersey-container-servlet-core:$jerseyVersion" + compile "org.glassfish.jersey.containers:jersey-container-jetty-http:$jerseyVersion" + compile "org.glassfish.jersey.media:jersey-media-moxy:$jerseyVersion" + compile "org.eclipse.jetty.aggregate:jetty-all:$jettyVersion" + compile("org.apache.hadoop:hadoop-yarn-client:$yarnVersion") { + exclude module: 'slf4j-log4j12' + exclude module: 'servlet-api' + exclude group: 'com.sun.jersey' + } + runtime("org.apache.hadoop:hadoop-yarn-api:$yarnVersion") { + exclude module: 'slf4j-log4j12' + exclude module: 'servlet-api' + exclude group: 'com.sun.jersey' + } + + testCompile "junit:junit:$junitVersion" + testCompile "org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-grizzly2:$jerseyVersion" + } + + tasks.create(name: "releaseRestServiceTar", type: Tar) { + description 'Build a tarball containing the samza-rest component and its dependencies' + compression = Compression.GZIP + from(file("$projectDir/src/main/config")) { into "config/" } + from(file("$projectDir/src/main/resources/log4j.xml")) { into "bin/" } + from(file("$projectDir/src/main/bash/run-samza-rest-service.sh")) { into "bin/" } + from(project(':samza-shell').file("src/main/bash/run-class.sh")) { into "bin/" } + from '../LICENSE' + from '../NOTICE' + from(configurations.runtime) { into("lib/") } + from(jar) { into("lib/") } + } + + tasks.create(name: "restTarGz", type: Tar) { + description 'Build a tarball containing the samza-rest supplementary files' + compression = Compression.GZIP + from 'src/main/bash' + from 'src/main/resources' + from(project(':samza-shell').file("src/main/bash/run-class.sh")) + } + + artifacts { + archives(restTarGz) { + name 'samza-rest-scripts' + classifier 'dist' + } + } +} + project(":samza-test_$scalaVersion") { apply plugin: 'scala' apply plugin: 'checkstyle' http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 47c71bf..8c757b9 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -18,6 +18,7 @@ */ ext { elasticsearchVersion = "1.5.1" + jerseyVersion = "2.22.1" jodaTimeVersion = "2.2" joptSimpleVersion = "3.2" jacksonVersion = "1.9.13" http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/bash/run-samza-rest-service.sh ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/bash/run-samza-rest-service.sh b/samza-rest/src/main/bash/run-samza-rest-service.sh new file mode 100755 index 0000000..bd52afd --- /dev/null +++ b/samza-rest/src/main/bash/run-samza-rest-service.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j.xml" +[[ -z "$SAMZA_LOG_DIR" ]] && export SAMZA_LOG_DIR="$PWD/logs" + +exec $(dirname $0)/run-class.sh org.apache.samza.rest.SamzaRestService "$@" http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/config/samza-rest.properties ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/config/samza-rest.properties b/samza-rest/src/main/config/samza-rest.properties new file mode 100644 index 0000000..7be0b47 --- /dev/null +++ b/samza-rest/src/main/config/samza-rest.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Service port. Set to 0 for a dynamic port. +services.rest.port=9139 + +# JobsResource +job.proxy.factory.class=org.apache.samza.rest.proxy.job.SimpleYarnJobProxyFactory +job.installations.path=/export/content/samza/deploy/ http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/monitor/Monitor.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/Monitor.java b/samza-rest/src/main/java/org/apache/samza/monitor/Monitor.java new file mode 100644 index 0000000..d69df5f --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/monitor/Monitor.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.monitor; + +/** + * A Monitor is a class implementing some functionality that should be done every N milliseconds on a YARN RM or NM. + * Classes specified in the config will have their monitor() method called at a configurable interval. + * For example, one could implement a Monitor that checks for leaked containers and kills them, ensuring that + * no leaked container survives on a NodeManager host for more than N ms (where N is the monitor run interval.) + * + * Implementations can override .toString() for better logging. + */ +public interface Monitor { + + /** + * Do the work of the monitor. Because this can be arbitrary behavior up to and including script execution, + * IPC-related IOExceptions and concurrency-related InterruptedExceptions are caught by the SamzaMonitorService. + * @throws Exception if there was any problem running the monitor. + */ + void monitor() + throws Exception; + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java new file mode 100644 index 0000000..75f3867 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.monitor; + +import java.lang.reflect.Constructor; + +class MonitorLoader { + + private MonitorLoader() {} + + public static Monitor fromClassName(String monitorClassName) + throws InstantiationException { + Object monitorObject; + try { + Class<?> klass = Class.forName(monitorClassName); + Constructor<?> constructor = klass.getConstructor(); + monitorObject = constructor.newInstance(); + } catch (Exception e) { + throw (InstantiationException) + new InstantiationException("Unable to instantiate " + monitorClassName).initCause(e); + } + if (!(monitorObject instanceof Monitor)) { + throw new InstantiationException(monitorClassName + " is not an instance of Monitor"); + } + return (Monitor) monitorObject; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java new file mode 100644 index 0000000..2f4d9dd --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.monitor; + +import org.apache.samza.rest.SamzaRestConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + + +/** + * The class responsible for handling long-running/scheduled monitors in Samza REST. + * Takes a SamzaRestConfig object in the constructor and handles instantiation of + * monitors and scheduling them to run based on the properties in the config. + */ +public class SamzaMonitorService { + + private static final Logger log = LoggerFactory.getLogger(SamzaMonitorService.class); + + private final SchedulingProvider scheduler; + private final SamzaRestConfig config; + + public SamzaMonitorService(SamzaRestConfig config, SchedulingProvider schedulingProvider) { + this.scheduler = schedulingProvider; + this.config = config; + } + + public void start() { + List<Monitor> monitors = getMonitorsFromConfig(config); + int monitorRunInterval = config.getConfigMonitorIntervalMs(); + for (Monitor monitor : monitors) { + log.debug("Scheduling monitor {} to run every {}ms", monitor, monitorRunInterval); + this.scheduler.schedule(getRunnable(monitor), monitorRunInterval); + } + } + + public void stop() { + this.scheduler.stop(); + } + + private Runnable getRunnable(final Monitor monitor) { + return new Runnable() { + public void run() { + try { + monitor.monitor(); + } catch (IOException e) { + log.warn("Caught IOException during " + monitor.toString() + ".monitor()", e); + } catch (InterruptedException e) { + log.warn("Caught InterruptedException during " + monitor.toString() + ".monitor()", e); + } catch (Exception e) { + log.warn("Unexpected exception during {}.monitor()", monitor, e); + } + } + }; + } + + /** + * Get all the registered monitors for the service. + * @return a list of Monitor objects ready to be scheduled. + */ + private static List<Monitor> getMonitorsFromConfig(SamzaRestConfig config) { + List<String> classNames = config.getConfigMonitorClassList(); + List<Monitor> monitors = new ArrayList<>(); + + for (String name: classNames) { + try { + Monitor monitor = MonitorLoader.fromClassName(name); + monitors.add(monitor); + } catch (InstantiationException e) { + log.warn("Unable to instantiate monitor " + name, e); + } + } + return monitors; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java b/samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java new file mode 100644 index 0000000..c0c448c --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.monitor; + +import java.util.concurrent.ScheduledExecutorService; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +public class ScheduledExecutorSchedulingProvider implements SchedulingProvider { + + private final ScheduledExecutorService scheduler; + + public ScheduledExecutorSchedulingProvider(ScheduledExecutorService scheduler) { + this.scheduler = scheduler; + } + + public void schedule(Runnable runnable, int interval) { + this.scheduler.scheduleAtFixedRate(runnable, 0, interval, MILLISECONDS); + } + + public void stop() { + this.scheduler.shutdownNow(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java b/samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java new file mode 100644 index 0000000..aea1a92 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.monitor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Provides scheduling functionality to the SamzaMonitorService. + */ +public interface SchedulingProvider { + /* Schedule a the given Runnable to run() every INTERVAL ms. */ + void schedule(Runnable runnable, int intervalMs); + + /* Stop any future executions of any scheduled tasks. */ + void stop(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java new file mode 100644 index 0000000..61f3c46 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest; + +import java.util.Collection; +import java.util.List; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.rest.resources.DefaultResourceFactory; +import org.apache.samza.rest.resources.ResourceFactory; +import org.codehaus.jackson.jaxrs.JacksonJsonProvider; +import org.glassfish.jersey.server.ResourceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Samza REST implementation of the JAX-RS {@link javax.ws.rs.core.Application} model. + */ +public class SamzaRestApplication extends ResourceConfig { + + private static final Logger log = LoggerFactory.getLogger(SamzaRestApplication.class); + + public SamzaRestApplication(SamzaRestConfig config) { + register(JacksonJsonProvider.class); + registerConfiguredResources(config); + } + + /** + * Registers resources specified in the config. If there are no factories + * or resources specified in the config, it uses the + * {@link org.apache.samza.rest.resources.DefaultResourceFactory} + * + * @param config the config to pass to the factories. + */ + private void registerConfiguredResources(SamzaRestConfig config) { + try { + // Use default if there were no configured resources or factories + if (config.getResourceFactoryClassNames().isEmpty() && config.getResourceClassNames().isEmpty()) { + log.info("No resource factories or classes configured. Using DefaultResourceFactory."); + registerInstances(new DefaultResourceFactory().getResourceInstances(config).toArray()); + return; + } + + for (String factoryClassName : config.getResourceFactoryClassNames()) { + log.info("Invoking factory {}", factoryClassName); + registerInstances(instantiateFactoryResources(factoryClassName, config).toArray()); + } + + for (String resourceClassName : config.getResourceClassNames()) { + log.info("Using resource class {}", resourceClassName); + register(Class.forName(resourceClassName)); + } + } catch (Throwable t) { + throw new SamzaException(t); + } + } + + /** + * Passes the specified config to the specified factory to instantiate its resources. + * + * @param factoryClassName the name of a class that implements {@link ResourceFactory} + * @param config the config to pass to the factory + * @return a collection of resources returned by the factory. + * @throws InstantiationException + */ + private Collection<? extends Object> instantiateFactoryResources(String factoryClassName, Config config) + throws InstantiationException { + try { + Class factoryCls = Class.forName(factoryClassName); + ResourceFactory factory = (ResourceFactory) factoryCls.newInstance(); + return factory.getResourceInstances(config); + } catch (Exception e) { + throw (InstantiationException) + new InstantiationException("Unable to instantiate " + factoryClassName).initCause(e); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java new file mode 100644 index 0000000..6f5c10a --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; + + +/** + * The set of configurations required by the core components of the {@link org.apache.samza.rest.SamzaRestService}. + * Other configurations (e.g. from {@link org.apache.samza.config.JobConfig}) may also be used by some of the + * implementation classes. + */ +public class SamzaRestConfig extends MapConfig { + /** + * Specifies a comma-delimited list of class names that implement ResourceFactory. + * These factories will be used to create specific instances of resources, passing the server config. + */ + public static final String CONFIG_REST_RESOURCE_FACTORIES = "rest.resource.factory.classes"; + + /** + * Specifies a comma-delimited list of class names of resources to register with the server. + */ + public static final String CONFIG_REST_RESOURCE_CLASSES = "rest.resource.classes"; + + /** + * Specifies a comma-delimited list of class names corresponding to Monitor implementations. + * These will be instantiated and scheduled to run periodically at runtime. + * Note that you must include the ENTIRE package name (org.apache.samza...). + */ + public static final String CONFIG_MONITOR_CLASSES = "monitor.classes"; + + /** + * Specifies the interval at which each registered Monitor's monitor method will be called. + */ + public static final String CONFIG_MONITOR_INTERVAL_MS = "monitor.run.interval.ms"; + + /** + * Monitors run every 60s by default + */ + private static final int DEFAULT_MONITOR_INTERVAL = 60000; + + /** + * The port number to use for the HTTP server or 0 to dynamically choose a port. + */ + public static final String CONFIG_SAMZA_REST_SERVICE_PORT = "services.rest.port"; + + public SamzaRestConfig(Config config) { + super(config); + } + + /** + * @see SamzaRestConfig#CONFIG_SAMZA_REST_SERVICE_PORT + * @return the port number to use for the HTTP server or 0 to dynamically choose a port. + */ + public int getPort() { + return getInt(CONFIG_SAMZA_REST_SERVICE_PORT); + } + + /** + * @see SamzaRestConfig#CONFIG_REST_RESOURCE_FACTORIES + * @return a list of class names as Strings corresponding to factories + * that Samza REST should use to instantiate and register resources + * or an empty list if none were configured. + */ + public List<String> getResourceFactoryClassNames() { + return parseCommaDelimitedStrings(get(CONFIG_REST_RESOURCE_FACTORIES)); + } + + /** + * @see SamzaRestConfig#CONFIG_REST_RESOURCE_CLASSES + * @return a list of class names as Strings corresponding to resource classes + * that Samza REST should register or an empty list if none were configured. + */ + public List<String> getResourceClassNames() { + return parseCommaDelimitedStrings(get(CONFIG_REST_RESOURCE_CLASSES)); + } + + /** + * @see SamzaRestConfig#CONFIG_MONITOR_CLASSES + * @return a list of class names as Strings corresponding to Monitors that + * Samza REST should schedule or an empty list if none were configured. + */ + public List<String> getConfigMonitorClassList() { + return parseCommaDelimitedStrings(get(CONFIG_MONITOR_CLASSES)); + } + + /** + * @see SamzaRestConfig#CONFIG_MONITOR_INTERVAL_MS + * @return an integer number of milliseconds, the period at which to schedule monitor runs. + */ + public int getConfigMonitorIntervalMs() { + return getInt(CONFIG_MONITOR_INTERVAL_MS, DEFAULT_MONITOR_INTERVAL); + } + + /** + * Parses a string containing a set of comma-delimited strings. Whitespace is ignored. + * If the input string is null or empty, an empty list is returned. + * + * @param commaDelimitedStrings the string to parse. + * @return the list of strings parsed from the input or an empty list if none. + */ + private static List<String> parseCommaDelimitedStrings(String commaDelimitedStrings) { + if (commaDelimitedStrings == null || commaDelimitedStrings.trim().isEmpty()) { + return Collections.emptyList(); + } + return Arrays.asList(commaDelimitedStrings.split("\\s*,\\s*")); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java new file mode 100644 index 0000000..5b34da8 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest; + +import joptsimple.OptionSet; +import org.apache.samza.config.MapConfig; +import org.apache.samza.monitor.SamzaMonitorService; +import org.apache.samza.monitor.ScheduledExecutorSchedulingProvider; +import org.apache.samza.util.CommandLine; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.glassfish.jersey.servlet.ServletContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.Servlet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + + +/** + * The main Java class for the Samza REST API. It runs an embedded Jetty server so it can be deployed as a Jar file. + * + * This class can be started from the command line by providing the --config-path parameter to a Samza REST config file + * which will be used to configure the default resources exposed by the API. + * + * It can also be managed programmatically using the + * {@link org.apache.samza.rest.SamzaRestService#addServlet(javax.servlet.Servlet, String)}, + * {@link #start()} and {@link #stop()} methods. + */ +public class SamzaRestService { + + private static final Logger log = LoggerFactory.getLogger(SamzaRestService.class); + + private final Server server; + private final ServletContextHandler context; + + + public SamzaRestService(SamzaRestConfig config) { + log.info("Creating new SamzaRestService with config: {}", config); + server = new Server(config.getPort()); + + context = new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath("/"); + server.setHandler(context); + } + + /** + * Command line interface to run the server. + * + * @param args arguments supported by {@link org.apache.samza.util.CommandLine}. + * In particular, --config-path and --config-factory are used to read the Samza REST config file. + * @throws Exception if the server could not be successfully started. + */ + public static void main(String[] args) + throws Exception { + try { + SamzaRestConfig config = parseConfig(args); + SamzaRestService restService = new SamzaRestService(config); + + // Add applications + SamzaRestApplication samzaRestApplication = new SamzaRestApplication(config); + ServletContainer container = new ServletContainer(samzaRestApplication); + restService.addServlet(container, "/*"); + + // Schedule monitors to run + ScheduledExecutorService schedulingService = Executors.newScheduledThreadPool(1); + ScheduledExecutorSchedulingProvider schedulingProvider = new ScheduledExecutorSchedulingProvider(schedulingService); + SamzaMonitorService monitorService = new SamzaMonitorService(config, schedulingProvider); + monitorService.start(); + + restService.runBlocking(); + monitorService.stop(); + } catch (Throwable t) { + log.error("Exception in main.", t); + } + } + + /** + * Reads a {@link org.apache.samza.config.Config} from command line parameters. + * @param args the command line parameters supported by {@link org.apache.samza.util.CommandLine}. + * @return the parsed {@link org.apache.samza.config.Config}. + */ + private static SamzaRestConfig parseConfig(String[] args) { + CommandLine cmd = new CommandLine(); + OptionSet options = cmd.parser().parse(args); + MapConfig cfg = cmd.loadConfig(options); + return new SamzaRestConfig(new MapConfig(cfg)); + } + + /** + * Adds the specified {@link javax.servlet.Servlet} to the server at the specified path. + * @param servlet the {@link javax.servlet.Servlet} to be added. + * @param path the path for the servlet. + */ + public void addServlet(Servlet servlet, String path) { + log.info("Adding servlet {} for path {}", servlet, path); + ServletHolder holder = new ServletHolder(servlet); + context.addServlet(holder, path); + holder.setInitOrder(0); + } + + /** + * Runs the server and waits for it to finish. + * + * @throws Exception if the server could not be successfully started. + */ + private void runBlocking() + throws Exception { + try { + start(); + server.join(); + } finally { + server.destroy(); + log.info("Server terminated."); + } + } + + /** + * Starts the server asynchronously. To stop the server, see {@link #stop()}. + * + * @throws Exception if the server could not be successfully started. + */ + public void start() + throws Exception { + log.info("Starting server on port {}", server.getConnectors()[0].getPort()); + server.start(); + log.info("Server is running"); + } + + /** + * Stops the server. + * + * @throws Exception if the server could not be successfully stopped. + */ + public void stop() + throws Exception { + log.info("Stopping server"); + server.stop(); + log.info("Server is stopped"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/model/Job.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/Job.java b/samza-rest/src/main/java/org/apache/samza/rest/model/Job.java new file mode 100644 index 0000000..e540635 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/model/Job.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.model; + +import org.codehaus.jackson.annotate.JsonProperty; + + +/** + * The client view of a job. Includes the job name, id, and status. + * + * The minimum goal is to provide enough information to execute REST commands on that job, so the name and id are required. + */ +public class Job { + private String jobName; + private String jobId; + private JobStatus status = JobStatus.UNKNOWN; // started, stopped, or starting + private String statusDetail; // Detailed status e.g. for YARN it could be ACCEPTED, RUNNING, etc. + + public Job(String jobName, String jobId) { + this.jobName = jobName; + this.jobId = jobId; + } + + public Job(@JsonProperty("jobName") String jobName, @JsonProperty("jobId")String jobId, @JsonProperty("status") JobStatus status, @JsonProperty("statusDetail") String statusDetail) { + this.jobName = jobName; + this.jobId = jobId; + this.status = status; + this.statusDetail = statusDetail; + } + + public Job() { + } + + public String getJobId() { + return jobId; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public JobStatus getStatus() { + return status; + } + + public void setStatus(JobStatus status) { + this.status = status; + } + + public String getStatusDetail() { + return statusDetail; + } + + public void setStatusDetail(String statusDetail) { + this.statusDetail = statusDetail; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/model/JobStatus.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/JobStatus.java b/samza-rest/src/main/java/org/apache/samza/rest/model/JobStatus.java new file mode 100644 index 0000000..8c655d8 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/model/JobStatus.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.model; + +/** + * The abstract status of the job, irrespective of the status in any underlying cluster management system (e.g. YARN). + * This status is the client view of the job status. + */ +public enum JobStatus { + + /** Job is in the process of starting but is not yet running. */ + STARTING("starting"), + + /** Job has been started. */ + STARTED("started"), + + /** Job has been stopped. */ + STOPPED("stopped"), + + /** Job status is unknown. */ + UNKNOWN("unknown"); + + private final String stringVal; + + JobStatus(final String stringVal) { + this.stringVal = stringVal; + } + + @Override + public String toString() { + return stringVal; + } + + public boolean hasBeenStarted() { + return !(this == STOPPED || this == UNKNOWN); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationFinder.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationFinder.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationFinder.java new file mode 100644 index 0000000..1ac49ae --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationFinder.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.proxy.installation; + +import java.util.Map; +import org.apache.samza.rest.proxy.job.JobInstance; + + +/** + * Finds all the installed jobs. For example, one implementation may take an installation root directory + * and scan all subdirectories for job installations. + * + * Provides a map from a {@link JobInstance} to its {@link InstallationRecord} based on the structure within the + * installation directory. Implementations of this interface should encapsulate any custom installation + * structure such that the resulting {@link InstallationRecord} simply contains the locations of the files + * needed to control the job. + */ +public interface InstallationFinder { + + /** + * @param jobInstance the job to check. + * @return <code>true</code> if a job with the specified name and id is installed on the local host. + */ + boolean isInstalled(JobInstance jobInstance); + + /** + * @return a map from each {@link JobInstance} to the corresponding {@link InstallationRecord} + * for each Samza installation found in the installRoot. + */ + Map<JobInstance, InstallationRecord> getAllInstalledJobs(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationRecord.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationRecord.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationRecord.java new file mode 100644 index 0000000..27b1ab8 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/InstallationRecord.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.proxy.installation; + +import org.apache.samza.rest.proxy.job.JobInstance; + + +/** + * Represents an installation of one Samza job instance on a file system. + * + * This class does not hard code any knowledge about the structure of a Samza job installation. Rather, it + * just points to the relevant paths within the installation. The structure is resolved by an implementation + * of the {@link InstallationFinder} interface. + */ +public class InstallationRecord extends JobInstance { + + private final String rootPath; + private final String configFilePath; + private final String binPath; + + public InstallationRecord(String jobName, String jobId, String rootPath, String configFilePath, String binPath) { + super(jobName, jobId); + this.rootPath = rootPath; + this.configFilePath = configFilePath; + this.binPath = binPath; + } + + /** + * @return the path of the config file for the job. + */ + public String getConfigFilePath() { + return configFilePath; + } + + /** + * @return the path of the directory containing the scripts. + */ + public String getScriptFilePath() { + return binPath; + } + + /** + * @return the root path of the installed Samza job on the file system. This path may be in common with + * other job instances if, for example, there are multiple configs defining separate instances. + */ + public String getRootPath() { + return rootPath; + } + + @Override + public String toString() { + return String.format("Job %s installed at %s", super.toString(), getRootPath()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/SimpleInstallationFinder.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/SimpleInstallationFinder.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/SimpleInstallationFinder.java new file mode 100644 index 0000000..0adad5b --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/installation/SimpleInstallationFinder.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.proxy.installation; + +import java.io.File; +import java.net.URI; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigFactory; +import org.apache.samza.config.JobConfig; +import org.apache.samza.rest.proxy.job.JobInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A simple default implementation of {@link InstallationFinder}. + * + * Assumes that one or more Samza jobs are contained in each sub directory of the provided installationsPath. + * Each sub directory is also expected to contian a bin directory and a config directory containing one or + * more job config files. + */ +public class SimpleInstallationFinder implements InstallationFinder { + private static final Logger log = LoggerFactory.getLogger(SimpleInstallationFinder.class); + + protected static final String BIN_SUBPATH = "bin"; + protected static final String CFG_SUBPATH = "config"; + + protected final String installationsPath; + protected final ConfigFactory jobConfigFactory; + + /** + * Required constructor. + * + * @param installationsPath the root path where all Samza jobs are installed. + * @param jobConfigFactory the {@link ConfigFactory} to use to read the job configs. + */ + public SimpleInstallationFinder(String installationsPath, ConfigFactory jobConfigFactory) { + this.installationsPath = installationsPath; + this.jobConfigFactory = jobConfigFactory; + } + + @Override + public boolean isInstalled(JobInstance jobInstance) { + return getAllInstalledJobs().containsKey(jobInstance); + } + + @Override + public Map<JobInstance, InstallationRecord> getAllInstalledJobs() { + Map<JobInstance, InstallationRecord> installations = new HashMap<>(); + for (File jobInstallPath : new File(installationsPath).listFiles()) { + if (!jobInstallPath.isDirectory()) { + continue; + } + + findJobInstances(jobInstallPath, installations); + } + return installations; + } + + /** + * Finds all the job instances in the specified path and adds a corresponding {@link JobInstance} and + * {@link InstallationRecord} for each instance. + * + * @param jobInstallPath the path to search for job instances. + * @param jobs the map to which the job instances will be added. + */ + private void findJobInstances(final File jobInstallPath, final Map<JobInstance, InstallationRecord> jobs) { + try { + String jobInstallCanonPath = jobInstallPath.getCanonicalPath(); + File configPath = Paths.get(jobInstallCanonPath, CFG_SUBPATH).toFile(); + if (!(configPath.exists() && configPath.isDirectory())) { + log.debug("Config path not found: " + configPath); + return; + } + + for (File configFile : configPath.listFiles()) { + + if (configFile.isFile()) { + + String configFilePath = configFile.getCanonicalPath(); + Config config = jobConfigFactory.getConfig(new URI("file://" + configFilePath)); + + if (config.containsKey(JobConfig.JOB_NAME()) && config.containsKey(JobConfig.STREAM_JOB_FACTORY_CLASS())) { + + String jobName = config.get(JobConfig.JOB_NAME()); + String jobId = config.get(JobConfig.JOB_ID(), "1"); + JobInstance jobInstance = new JobInstance(jobName, jobId); + + if (jobs.containsKey(jobInstance)) { + throw new IllegalStateException( + String.format("Found more than one job config with jobName:%s and jobId:%s", jobName, jobId)); + } + InstallationRecord jobInstall = + new InstallationRecord(jobName, jobId, jobInstallCanonPath, configFilePath, getBinPath(jobInstallCanonPath)); + jobs.put(jobInstance, jobInstall); + } + } + } + } catch (Exception e) { + throw new SamzaException("Exception finding job instance in path: " + jobInstallPath, e); + } + } + + /** + * @param jobInstallPath the root path of the job installation. + * @return the bin directory within the job installation. + */ + private String getBinPath(String jobInstallPath) { + return Paths.get(jobInstallPath, BIN_SUBPATH).toString(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java new file mode 100644 index 0000000..bcc88d0 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.proxy.job; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import org.apache.samza.SamzaException; +import org.apache.samza.config.ConfigFactory; +import org.apache.samza.config.factories.PropertiesConfigFactory; +import org.apache.samza.rest.model.Job; +import org.apache.samza.rest.model.JobStatus; +import org.apache.samza.rest.resources.JobsResourceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Implements a subset of the {@link JobProxy} interface with the default, cluster-agnostic, + * implementations. Subclasses are expected to override these default methods where necessary. + */ +public abstract class AbstractJobProxy implements JobProxy { + private static final Logger log = LoggerFactory.getLogger(AbstractJobProxy.class); + + protected final JobsResourceConfig config; + + /** + * Creates a new JobProxy instance from the factory class specified in the config. + * + * @param config the config containing the job proxy factory property. + * @return the JobProxy produced by the factory. + */ + public static JobProxy fromFactory(JobsResourceConfig config) { + String jobProxyFactory = config.getJobProxyFactory(); + if (jobProxyFactory != null && !jobProxyFactory.isEmpty()) { + try { + Class factoryCls = Class.forName(jobProxyFactory); + JobProxyFactory factory = (JobProxyFactory) factoryCls.newInstance(); + return factory.getJobProxy(config); + } catch (Exception e) { + throw new SamzaException(e); + } + } else { + throw new SamzaException("Missing config: " + JobsResourceConfig.CONFIG_JOB_PROXY_FACTORY); + } + } + + /** + * Required constructor. + * + * @param config the config containing the installations path. + */ + public AbstractJobProxy(JobsResourceConfig config) { + this.config = config; + } + + @Override + public List<Job> getAllJobStatuses() + throws IOException, InterruptedException { + List<Job> allJobs = new ArrayList<>(); + Collection<JobInstance> jobInstances = getAllJobInstances(); + for(JobInstance jobInstance : jobInstances) { + allJobs.add(new Job(jobInstance.getJobName(), jobInstance.getJobId())); + } + getJobStatusProvider().getJobStatuses(allJobs); + + return allJobs; + } + + /** + * Convenience method to get the Samza job status from the name and id. + * + * @param jobInstance the instance of the job. + * @return the current Samza status for the job. + * @throws IOException if there was a problem executing the command to get the status. + * @throws InterruptedException if the thread was interrupted while waiting for the status result. + */ + protected JobStatus getJobSamzaStatus(JobInstance jobInstance) + throws IOException, InterruptedException { + return getJobStatus(jobInstance).getStatus(); + } + + @Override + public Job getJobStatus(JobInstance jobInstance) + throws IOException, InterruptedException { + return getJobStatusProvider().getJobStatus(jobInstance); + } + + @Override + public boolean jobExists(JobInstance jobInstance) { + return getAllJobInstances().contains(jobInstance); + } + + /** + * @return the {@link ConfigFactory} to use to read job configuration files. + */ + protected ConfigFactory getJobConfigFactory() { + String configFactoryClassName = config.get(JobsResourceConfig.CONFIG_JOB_CONFIG_FACTORY); + if (configFactoryClassName == null) { + configFactoryClassName = PropertiesConfigFactory.class.getCanonicalName(); + log.warn("{} not specified. Defaulting to {}", JobsResourceConfig.CONFIG_JOB_CONFIG_FACTORY, configFactoryClassName); + } + + try { + Class factoryCls = Class.forName(configFactoryClassName); + return (ConfigFactory) factoryCls.newInstance(); + } catch (Exception e) { + throw new SamzaException(e); + } + } + /** + * @return the {@link JobStatusProvider} to use in retrieving the job status. + */ + protected abstract JobStatusProvider getJobStatusProvider(); + + /** + * @return all available job instances. + */ + protected abstract Set<JobInstance> getAllJobInstances(); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobInstance.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobInstance.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobInstance.java new file mode 100644 index 0000000..97599c2 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobInstance.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.proxy.job; + +import com.google.common.base.Preconditions; + + +/** + * Allows us to encapsulate the jobName,jobId tuple as one entity. + */ +public class JobInstance { + + private final String jobName; + private final String jobId; + + /** + * Required constructor. + * + * @param jobName the name of the job. + * @param jobId the id of the job. + */ + public JobInstance(String jobName, String jobId) { + this.jobName = Preconditions.checkNotNull(jobName); + this.jobId = Preconditions.checkNotNull(jobId); + } + + /** + * @return the name of the job. + */ + public String getJobName() { + return jobName; + } + + /** + * @return the id of the job. + */ + public String getJobId() { + return jobId; + } + + @Override + public int hashCode() { + final int prime = 139; + int hash = prime * jobName.hashCode() + prime * jobId.hashCode(); + return hash; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof JobInstance)) { + return false; + } + + JobInstance otherJob = (JobInstance) other; + return this.jobName.equals(otherJob.jobName) && this.jobId.equals(otherJob.jobId); + } + + @Override + public String toString() { + return String.format("jobName:%s jobId:%s", jobName, jobId); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxy.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxy.java new file mode 100644 index 0000000..7e168d7 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxy.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.proxy.job; + +import java.io.IOException; +import java.util.List; +import org.apache.samza.rest.model.Job; +import org.apache.samza.rest.model.JobStatus; + + +/** + * Job proxy is the primary abstraction used by the REST API to interact with jobs. + * + * Concrete implementations of this interface may vary in the APIs they use to discover, start, stop, and stat jobs, + * how to interact with the installed jobs on disk, etc. + */ +public interface JobProxy { + /** + * @param jobInstance the instance of the job + * @return true if the job exists and can be started, stopped, etc. + */ + boolean jobExists(JobInstance jobInstance); + + /** + * @return a {@link Job} for each Samza job instance installed on this host. + * @throws IOException if there was a problem executing the command to get the status. + * @throws InterruptedException if the thread was interrupted while waiting for the status result. + */ + List<Job> getAllJobStatuses() + throws IOException, InterruptedException; + + /** + * @param jobInstance the instance of the job for which the status is needed. + * @return a {@link Job} containing + * the status for the job specified by jobName and jobId. + * @throws IOException if there was a problem executing the command to get the status. + * @throws InterruptedException if the thread was interrupted while waiting for the status result. + */ + Job getJobStatus(JobInstance jobInstance) + throws IOException, InterruptedException; + + /** + * Starts the job instance specified by jobName and jobId. When this method returns, the status of the job + * should be {@link JobStatus#STARTING} or + * {@link JobStatus#STARTED} depending on the implementation. + * + * @param jobInstance the instance of the job to start. + * @throws Exception if the job could not be successfully started. + */ + void start(JobInstance jobInstance) + throws Exception; + + /** + * Stops the job instance specified by jobName and jobId. When this method returns, the status of the job + * should be {@link JobStatus#STOPPED}. + * + * @param jobInstance the instance of the job to stop. + * @throws Exception if the job could not be successfully stopped. + */ + void stop(JobInstance jobInstance) + throws Exception; +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java new file mode 100644 index 0000000..067711a --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.proxy.job; + +import org.apache.samza.rest.resources.JobsResourceConfig; + + +/** + * Simple factory interface to produce instances of {@link JobProxy}, + * depending on the implementation. + * + * To use a custom {@link JobProxy}, create an implementation of that interface, an implementation + * of this interface which instantiates the custom proxy and finally reference the custom factory + * in the config {@link JobsResourceConfig#CONFIG_JOB_PROXY_FACTORY}. + */ +public interface JobProxyFactory { + + /** + * Creates a new {@link JobProxy} and initializes it with the specified config. + * + * @param config the {@link org.apache.samza.rest.SamzaRestConfig} to pass to the proxy. + * @return the created proxy. + */ + JobProxy getJobProxy(JobsResourceConfig config); +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobStatusProvider.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobStatusProvider.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobStatusProvider.java new file mode 100644 index 0000000..23a7f73 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobStatusProvider.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.proxy.job; + +import java.io.IOException; +import java.util.Collection; +import org.apache.samza.rest.model.Job; + + +/** + * Interface for getting job status independent of the underlying cluster implementation. + */ +public interface JobStatusProvider { + /** + * Populates the status* fields of each {@link Job} in the provided Collection. + * + * @param jobs the collection of {@link Job} for which the status is needed. + * @throws IOException if there was a problem executing the command to get the status. + * @throws InterruptedException if the thread was interrupted while waiting for the status result. + */ + void getJobStatuses(Collection<Job> jobs) + throws IOException, InterruptedException; + + /** + * @param jobInstance the instance of the job. + * @return a {@link Job} containing + * the status for the job specified by jobName and jobId. + * @throws IOException if there was a problem executing the command to get the status. + * @throws InterruptedException if the thread was interrupted while waiting for the status result. + */ + Job getJobStatus(JobInstance jobInstance) + throws IOException, InterruptedException; +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/ScriptJobProxy.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/ScriptJobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/ScriptJobProxy.java new file mode 100644 index 0000000..2d14366 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/ScriptJobProxy.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.proxy.job; + +import java.io.File; +import java.io.FileNotFoundException; +import java.nio.file.Paths; +import org.apache.samza.rest.proxy.installation.InstallationFinder; +import org.apache.samza.rest.proxy.installation.InstallationRecord; +import org.apache.samza.rest.resources.JobsResourceConfig; +import org.apache.samza.rest.script.ScriptPathProvider; +import org.apache.samza.rest.script.ScriptRunner; + +/** + * Extends {@link AbstractJobProxy} with some script support functionality. + */ +public abstract class ScriptJobProxy extends AbstractJobProxy implements ScriptPathProvider { + + protected final ScriptRunner scriptRunner = new ScriptRunner(); + + /** + * Required constructor. + * + * @param config the config which specifies the path to the Samza framework installation. + */ + public ScriptJobProxy(JobsResourceConfig config) { + super(config); + } + + /** + * Constructs the path to the specified script within the job installation. + * + * @param jobInstance the instance of the job. + * @param scriptName the name of the script. + * @return the full path to the script. + * @throws FileNotFoundException if the job installation path doesn't exist. + */ + public String getScriptPath(JobInstance jobInstance, String scriptName) + throws FileNotFoundException { + String scriptPath; + InstallationRecord jobInstallation = getInstallationFinder().getAllInstalledJobs().get(jobInstance); + scriptPath = Paths.get(jobInstallation.getScriptFilePath(), scriptName).toString(); + + File scriptFile = new File(scriptPath); + if (!scriptFile.exists()) { + throw new FileNotFoundException("Script does not exist: " + scriptPath); + } + return scriptPath; + } + + /** + * @return the {@link InstallationFinder} which will be used to find jobs installed on this machine. + */ + protected abstract InstallationFinder getInstallationFinder(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java new file mode 100644 index 0000000..a935c98 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.proxy.job; + +import java.util.Set; +import org.apache.samza.SamzaException; +import org.apache.samza.rest.model.JobStatus; +import org.apache.samza.rest.proxy.installation.InstallationFinder; +import org.apache.samza.rest.proxy.installation.InstallationRecord; +import org.apache.samza.rest.proxy.installation.SimpleInstallationFinder; +import org.apache.samza.rest.resources.JobsResourceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Extends the {@link ScriptJobProxy} with methods specific to simple Samza deployments. + */ +public class SimpleYarnJobProxy extends ScriptJobProxy { + private static final Logger log = LoggerFactory.getLogger(SimpleYarnJobProxy.class); + + private static final String START_SCRIPT_NAME = "run-job.sh"; + private static final String STOP_SCRIPT_NAME = "kill-yarn-job-by-name.sh"; + + private static final String CONFIG_FACTORY_PARAM = "--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory"; + private static final String CONFIG_PATH_PARAM_FORMAT = "--config-path=file://%s"; + + private final JobStatusProvider statusProvider = new YarnCliJobStatusProvider(this); + + private final InstallationFinder installFinder; + + public SimpleYarnJobProxy(JobsResourceConfig config) { + super(config); + + installFinder = new SimpleInstallationFinder(config.getInstallationsPath(), getJobConfigFactory()); + } + + @Override + public void start(JobInstance jobInstance) + throws Exception { + JobStatus currentStatus = getJobSamzaStatus(jobInstance); + if (currentStatus.hasBeenStarted()) { + log.info("Job {} will not be started because it is currently {}.", jobInstance, currentStatus.toString()); + return; + } + + String scriptPath = getScriptPath(jobInstance, START_SCRIPT_NAME); + int resultCode = scriptRunner.runScript(scriptPath, CONFIG_FACTORY_PARAM, + generateConfigPathParameter(jobInstance)); + if (resultCode != 0) { + throw new SamzaException("Failed to start job. Result code: " + resultCode); + } + } + + @Override + public void stop(JobInstance jobInstance) + throws Exception { + JobStatus currentStatus = getJobSamzaStatus(jobInstance); + if (!currentStatus.hasBeenStarted()) { + log.info("Job {} will not be stopped because it is currently {}.", jobInstance, currentStatus.toString()); + return; + } + + String scriptPath = getScriptPath(jobInstance, STOP_SCRIPT_NAME); + int resultCode = scriptRunner.runScript(scriptPath, YarnCliJobStatusProvider.getQualifiedJobName(jobInstance)); + if (resultCode != 0) { + throw new SamzaException("Failed to stop job. Result code: " + resultCode); + } + } + + /** + * Generates the command line argument which specifies the path to the config file for the job. + * + * @param jobInstance the instance of the job. + * @return the --config-path command line argument. + */ + private String generateConfigPathParameter(JobInstance jobInstance) { + InstallationRecord record = installFinder.getAllInstalledJobs().get(jobInstance); + return String.format(CONFIG_PATH_PARAM_FORMAT, record.getConfigFilePath()); + } + + /** + * @return the {@link JobStatusProvider} to use for retrieving job status. + */ + public JobStatusProvider getJobStatusProvider() { + return statusProvider; + } + + @Override + protected Set<JobInstance> getAllJobInstances() { + return installFinder.getAllInstalledJobs().keySet(); + } + + @Override + protected InstallationFinder getInstallationFinder() { + return installFinder; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java new file mode 100644 index 0000000..11d93d4 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.proxy.job; + +import org.apache.samza.rest.resources.JobsResourceConfig; + + +/** + * Factory to produce SimpleJobProxy instances. + * + * See {@link AbstractJobProxy#fromFactory(org.apache.samza.rest.resources.JobsResourceConfig)} + */ +public class SimpleYarnJobProxyFactory implements JobProxyFactory { + + @Override + public JobProxy getJobProxy(JobsResourceConfig config) { + return new SimpleYarnJobProxy(config); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/260d1ff9/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnCliJobStatusProvider.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnCliJobStatusProvider.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnCliJobStatusProvider.java new file mode 100644 index 0000000..d1f34e8 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnCliJobStatusProvider.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.proxy.job; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.samza.SamzaException; +import org.apache.samza.rest.model.Job; +import org.apache.samza.rest.model.JobStatus; +import org.apache.samza.rest.script.ScriptOutputHandler; +import org.apache.samza.rest.script.ScriptPathProvider; +import org.apache.samza.rest.script.ScriptRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * An implementation of the {@link JobStatusProvider} that retrieves + * the job status from the YARN command line interface. + */ +public class YarnCliJobStatusProvider implements JobStatusProvider { + private static final Logger log = LoggerFactory.getLogger(YarnCliJobStatusProvider.class); + private static final String JOB_NAME_ID_FORMAT = "%s_%s"; + private final ScriptPathProvider scriptPathProvider; + + /** + * Constructs the job name used in YARN. This is the value shown in the "Name" + * column of the Resource Manager UI. + * + * @param jobInstance the instance of the job. + * @return the job name to use for the job in YARN. + */ + public static String getQualifiedJobName(JobInstance jobInstance) { + return String.format(JOB_NAME_ID_FORMAT, jobInstance.getJobName(), jobInstance.getJobId()); + } + + /** + * Default constructor. + * + * @param provider a delegate that provides the path to the Samza yarn scripts. + */ + public YarnCliJobStatusProvider(ScriptPathProvider provider) { + scriptPathProvider = provider; + } + + @Override + public void getJobStatuses(Collection<Job> jobs) + throws IOException, InterruptedException { + if (jobs == null || jobs.isEmpty()) { + return; + } + + // If the scripts are in the jobs, they will be in all job installations, so just pick one and get the script path. + Job anyJob = jobs.iterator().next(); + String scriptPath = scriptPathProvider.getScriptPath(new JobInstance(anyJob.getJobName(), anyJob.getJobId()), "run-class.sh"); + + // We will identify jobs returned by the YARN application states by their qualified names, so build a map + // to translate back from that name to the JobInfo we wish to populate. This avoids parsing/delimiter issues. + final Map<String, Job> qualifiedJobToInfo = new HashMap<>(); + for(Job job : jobs) { + qualifiedJobToInfo.put(getQualifiedJobName(new JobInstance(job.getJobName(), job.getJobId())), job); + } + + // Run "application -list" command and get the YARN state for each application + ScriptRunner runner = new ScriptRunner(); + int resultCode = runner.runScript(scriptPath, new ScriptOutputHandler() { + @Override + public void processScriptOutput(InputStream output) + throws IOException { + InputStreamReader isr = new InputStreamReader(output); + BufferedReader br = new BufferedReader(isr); + String line; + String APPLICATION_PREFIX = "application_"; + log.debug("YARN status:"); + while ((line = br.readLine()) != null) { + log.debug(line); + if (line.startsWith(APPLICATION_PREFIX)) { + String[] columns = line.split("\\s+"); + String qualifiedName = columns[1]; + String yarnState = columns[5]; + + JobStatus samzaStatus = yarnStateToSamzaStatus(YarnApplicationState.valueOf(yarnState.toUpperCase())); + Job job = qualifiedJobToInfo.get(qualifiedName); + + // If job is null, it wasn't requested. The default status is STOPPED because there could be many + // application attempts in that status. Only update the job status if it's not STOPPED. + if (job != null && (job.getStatusDetail() == null || samzaStatus != JobStatus.STOPPED)) { + job.setStatusDetail(yarnState); + job.setStatus(samzaStatus); + } + } + } + } + }, "org.apache.hadoop.yarn.client.cli.ApplicationCLI", "application", "-list", "-appStates", "ALL"); + + if (resultCode != 0) { + throw new SamzaException("Failed to get job status. Result code: " + resultCode); + } + } + + @Override + public Job getJobStatus(JobInstance jobInstance) + throws IOException, InterruptedException { + Job info = new Job(jobInstance.getJobName(), jobInstance.getJobId()); + getJobStatuses(Collections.singletonList(info)); + return info; + } + + /** + * Translates the YARN application state to the more generic Samza job status. + * + * @param yarnState the YARN application state to translate. + * @return the corresponding Samza job status. + */ + private JobStatus yarnStateToSamzaStatus(YarnApplicationState yarnState) { + switch (yarnState) { + case RUNNING: + return JobStatus.STARTED; + case NEW: + case NEW_SAVING: + case SUBMITTED: + case ACCEPTED: + return JobStatus.STARTING; + case FINISHED: + case FAILED: + case KILLED: + default: + return JobStatus.STOPPED; + } + } +}
