Repository: apex-core Updated Branches: refs/heads/master 5fb9d045d -> cf8141846
APEXCORE-405 API to launch app on local mode or cluster Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/7ad1d75d Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/7ad1d75d Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/7ad1d75d Branch: refs/heads/master Commit: 7ad1d75db581641916d3c7f68fb9dcd145d8cc66 Parents: 81b8c92 Author: Pramod Immaneni <[email protected]> Authored: Fri Jun 24 00:44:57 2016 -0700 Committer: Pramod Immaneni <[email protected]> Committed: Mon Nov 21 08:43:17 2016 -0800 ---------------------------------------------------------------------- .idea/codeStyleSettings.xml | 2 +- .../java/com/datatorrent/api/LocalMode.java | 41 +--- .../apache/apex/api/EmbeddedAppLauncher.java | 94 +++++++++ .../main/java/org/apache/apex/api/Launcher.java | 192 +++++++++++++++++++ .../org/apache/apex/api/YarnAppLauncher.java | 88 +++++++++ .../com/datatorrent/stram/LocalModeImpl.java | 103 ---------- .../java/com/datatorrent/stram/StramUtils.java | 23 ++- .../java/com/datatorrent/stram/cli/ApexCli.java | 5 +- .../stram/client/StramAppLauncher.java | 28 +-- .../apex/engine/EmbeddedAppLauncherImpl.java | 173 +++++++++++++++++ .../apache/apex/engine/YarnAppLauncherImpl.java | 148 ++++++++++++++ .../apex/engine/util/StreamingAppFactory.java | 64 +++++++ .../services/com.datatorrent.api.LocalMode | 2 +- .../org.apache.apex.api.EmbeddedAppLauncher | 1 + .../org.apache.apex.api.YarnAppLauncher | 1 + 15 files changed, 796 insertions(+), 169 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/.idea/codeStyleSettings.xml ---------------------------------------------------------------------- diff --git a/.idea/codeStyleSettings.xml b/.idea/codeStyleSettings.xml index 8dfc54e..7b75d12 100644 --- a/.idea/codeStyleSettings.xml +++ b/.idea/codeStyleSettings.xml @@ -105,4 +105,4 @@ </option> <option name="USE_PER_PROJECT_SETTINGS" value="true" /> </component> -</project> \ No newline at end of file +</project> http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/api/src/main/java/com/datatorrent/api/LocalMode.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/LocalMode.java b/api/src/main/java/com/datatorrent/api/LocalMode.java index 51d3da3..7d6f1ee 100644 --- a/api/src/main/java/com/datatorrent/api/LocalMode.java +++ b/api/src/main/java/com/datatorrent/api/LocalMode.java @@ -18,17 +18,17 @@ */ package com.datatorrent.api; -import java.util.Iterator; -import java.util.ServiceLoader; - +import org.apache.apex.api.EmbeddedAppLauncher; import org.apache.hadoop.conf.Configuration; /** * Local mode execution for single application * + * @deprecated * @since 0.3.2 */ -public abstract class LocalMode +@Deprecated +public abstract class LocalMode<H extends EmbeddedAppLauncher.EmbeddedAppHandle> extends EmbeddedAppLauncher<H> { /** @@ -96,38 +96,7 @@ public abstract class LocalMode */ public static LocalMode newInstance() { - ServiceLoader<LocalMode> loader = ServiceLoader.load(LocalMode.class); - Iterator<LocalMode> impl = loader.iterator(); - if (!impl.hasNext()) { - throw new RuntimeException("No implementation for " + LocalMode.class); - } - return impl.next(); - } - - /** - * Shortcut to run an application. Used for testing. - * - * @param app - * @param runMillis - */ - public static void runApp(StreamingApplication app, int runMillis) - { - runApp(app, null, runMillis); - } - - /** - * Shortcut to run an application with the modified configuration. - * - * @param app - Application to be run - * @param configuration - Configuration - * @param runMillis - The time after which the application will be shutdown; pass 0 to run indefinitely. - */ - public static void runApp(StreamingApplication app, Configuration configuration, int runMillis) - { - LocalMode lma = newInstance(); - app.populateDAG(lma.getDAG(), configuration == null ? new Configuration(false) : configuration); - LocalMode.Controller lc = lma.getController(); - lc.run(runMillis); + return loadService(LocalMode.class); } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java b/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java new file mode 100644 index 0000000..8e3e0f6 --- /dev/null +++ b/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.api; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; + +/** + * Launcher for running the application directly in the current Java VM. For basic operations such as launching or + * stopping the application, {@link Launcher} can be used directly. + */ +public abstract class EmbeddedAppLauncher<H extends EmbeddedAppLauncher.EmbeddedAppHandle> extends Launcher<H> +{ + /** + * Parameter to specify the time after which the application will be shutdown; pass 0 to run indefinitely. + */ + public static final Attribute<Long> RUN_MILLIS = new Attribute<Long>(0L); + + /** + * Parameter to launch application asynchronously and return from launch immediately. + */ + public static final Attribute<Boolean> RUN_ASYNC = new Attribute<Boolean>(false); + + /** + * Parameter to enable or disable heartbeat monitoring. + */ + public static final Attribute<Boolean> HEARTBEAT_MONITORING = new Attribute<Boolean>(true); + + /** + * Parameter to serialize DAG before launch. + */ + public static final Attribute<Boolean> SERIALIZE_DAG = new Attribute<Boolean>(false); + + static { + Attribute.AttributeMap.AttributeInitializer.initialize(LocalMode.class); + } + + public static EmbeddedAppLauncher newInstance() + { + return loadService(EmbeddedAppLauncher.class); + } + + /** + * The EmbeddedAppHandle class would be useful in future to provide additional information without breaking backwards + * compatibility of the launchApp method + */ + public interface EmbeddedAppHandle extends AppHandle {} + + /** + * Shortcut to run an application. Used for testing. + * + * @param app + * @param runMillis + */ + public static void runApp(StreamingApplication app, int runMillis) + { + runApp(app, null, runMillis); + } + + /** + * Shortcut to run an application with the modified configuration. + * + * @param app - Application to be run + * @param configuration - Configuration + * @param runMillis - The time after which the application will be shutdown; pass 0 to run indefinitely. + */ + public static void runApp(StreamingApplication app, Configuration configuration, int runMillis) + { + EmbeddedAppLauncher launcher = newInstance(); + Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + launchAttributes.put(RUN_MILLIS, (long)runMillis); + launcher.launchApp(app, configuration, launchAttributes); + } + +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/api/src/main/java/org/apache/apex/api/Launcher.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/Launcher.java b/api/src/main/java/org/apache/apex/api/Launcher.java new file mode 100644 index 0000000..14c365a --- /dev/null +++ b/api/src/main/java/org/apache/apex/api/Launcher.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.api; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Iterator; +import java.util.ServiceLoader; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Throwables; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.StreamingApplication; + +/** + * A class that provides an entry point for functionality to run applications in different environments such as current + * Java VM, Hadoop YARN etc. + */ +public abstract class Launcher<H extends Launcher.AppHandle> +{ + + public static final String NEW_INSTANCE_METHOD = "newInstance"; + + /** + * Denotes an environment in which to launch the application. Also, contains the list of supported environments. + * @param <L> The launcher for the specific environment + */ + public static class LaunchMode<L extends Launcher> + { + /** + * Launch application in the current Java VM + */ + public static final LaunchMode<EmbeddedAppLauncher> EMBEDDED = new LaunchMode<>(EmbeddedAppLauncher.class); + /** + * Launch application on Hadoop YARN + */ + public static final LaunchMode<YarnAppLauncher> YARN = new LaunchMode<>(YarnAppLauncher.class); + + Class<L> clazz; + + public LaunchMode(Class<L> clazz) + { + this.clazz = clazz; + } + } + + /** + * Specifies the manner in which a running application be stopped. + */ + public enum ShutdownMode + { + /** + * Shutdown the application in an orderly fashion and wait till it stops running + */ + AWAIT_TERMINATION, + /** + * Kill the application immediately + */ + KILL + } + + // Marker interface + public interface AppHandle {} + + /** + * Get a launcher instance.<br><br> + * + * Returns a launcher specific to the given launch mode. This allows the user to also use custom methods supported by + * the specific launcher along with the basic launch methods from this class. + * + * @param launchMode - The launch mode to use + * + * @return The launcher + */ + public static <L extends Launcher<?>> L getLauncher(LaunchMode<L> launchMode) + { + L launcher; + // If the static method for creating a new instance is present in the launcher, it is invoked to create an instance. + // This gives an opportunity for the launcher to do something custom when creating an instance. If the method is not + // present, the service is loaded from the class name. A factory approach would be cleaner and type safe but adds + // unnecessary complexity, going with the static method for now. + try { + Method m = launchMode.clazz.getDeclaredMethod(NEW_INSTANCE_METHOD); + launcher = (L)m.invoke(null); + } catch (NoSuchMethodException e) { + launcher = loadService(launchMode.clazz); + } catch (InvocationTargetException | IllegalAccessException e) { + throw Throwables.propagate(e); + } + return launcher; + } + + /** + * Launch application with configuration.<br><br> + * + * Launch the given streaming application with the given configuration. + * + * @param application - Application to be run + * @param configuration - Application Configuration + * + * @return The application handle + */ + public H launchApp(StreamingApplication application, Configuration configuration) throws LauncherException + { + return launchApp(application, configuration, null); + } + + /** + * Launch application with configuration and launch parameters. + * + * Launch the given streaming application with the given configuration and parameters. The parameters should be from + * the list of parameters supported by the launcher. To find out more about the supported parameters look at the + * documentation of the individual launcher. + * + * @param application - Application to be run + * @param configuration - Application Configuration + * @param launchParameters - Launch Parameters + * + * @return The application handle + */ + public abstract H launchApp(StreamingApplication application, Configuration configuration, Attribute.AttributeMap launchParameters) throws LauncherException; + + /** + * Shutdown the application and await termination. + * Also see {@link #shutdownApp(AppHandle, ShutdownMode)} + * + * @param app The application handle + */ + public void shutdownApp(H app) throws LauncherException + { + shutdownApp(app, ShutdownMode.AWAIT_TERMINATION); + } + + /** + * Shutdown the application. + * + * The method takes the application handle and a shutdown mode. The shutdown mode specifies how to shutdown the + * application. + * + * If the mode is AWAIT_TERMINATION, an attempt should be made to shutdown the application in an orderly fashion + * and wait till termination. If the application does not terminate in a reasonable amount of time the + * implementation can forcibly terminate the application. + * + * If the mode is KILL, the application can be killed immediately. + * + * @param app The application handle + * @param shutdownMode The shutdown mode + */ + public abstract void shutdownApp(H app, ShutdownMode shutdownMode) throws LauncherException; + + protected static <T> T loadService(Class<T> clazz) + { + ServiceLoader<T> loader = ServiceLoader.load(clazz); + Iterator<T> impl = loader.iterator(); + if (!impl.hasNext()) { + throw new RuntimeException("No implementation for " + clazz); + } + return impl.next(); + } + + public static class LauncherException extends RuntimeException + { + public LauncherException(String message) + { + super(message); + } + + public LauncherException(Throwable cause) + { + super(cause); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java new file mode 100644 index 0000000..e3b36db --- /dev/null +++ b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.api; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.StringCodec; + +/** + * Launcher for running the application on Hadoop YARN. For basic operations such as launching or stopping the + * application, {@link Launcher} can be used directly. + */ +public abstract class YarnAppLauncher<H extends YarnAppLauncher.YarnAppHandle> extends Launcher<H> +{ + + /** + * Parameter to specify extra jars for launch. + */ + public static final Attribute<String> LIB_JARS = new Attribute<String>(new StringCodec.String2String()); + + /** + * Parameter to specify the previous application id to use to resume launch from. + */ + public static final Attribute<String> ORIGINAL_APP_ID = new Attribute<String>(new StringCodec.String2String()); + + /** + * Parameter to specify the queue name to use for launch. + */ + public static final Attribute<String> QUEUE_NAME = new Attribute<String>(new StringCodec.String2String()); + + static { + Attribute.AttributeMap.AttributeInitializer.initialize(YarnAppLauncher.class); + } + + public static YarnAppLauncher newInstance() + { + return loadService(YarnAppLauncher.class); + } + + public interface YarnAppHandle extends AppHandle + { + String getApplicationId(); + } + + /** + * Shortcut to run an application with the modified configuration. + * + * @param app - Application to be run + * @param configuration - Application Configuration + */ + public static void runApp(StreamingApplication app, Configuration configuration) throws LauncherException + { + runApp(app, configuration, null); + } + + /** + * Shortcut to run an application with the modified configuration. + * + * @param app - Application to be run + * @param configuration - Application Configuration + * @param launchAttributes - Launch Configuration + */ + public static void runApp(StreamingApplication app, Configuration configuration, Attribute.AttributeMap launchAttributes) throws LauncherException + + { + YarnAppLauncher launcher = newInstance(); + launcher.launchApp(app, configuration, launchAttributes); + } + +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java b/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java deleted file mode 100644 index 3fedc7c..0000000 --- a/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.stram; - -import java.io.File; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.LocalMode; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.stram.plan.logical.LogicalPlan; -import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; - -/** - * <p>LocalModeImpl class.</p> - * - * @since 0.3.2 - */ -public class LocalModeImpl extends LocalMode -{ - private final LogicalPlan lp = new LogicalPlan(); - - @Override - public DAG getDAG() - { - return lp; - } - - @Override - public DAG cloneDAG() throws Exception - { - return StramLocalCluster.cloneLogicalPlan(lp); - } - - @Override - public DAG prepareDAG(StreamingApplication app, Configuration conf) throws Exception - { - if (app == null && conf == null) { - throw new IllegalArgumentException("Require app or configuration to populate logical plan."); - } - if (conf == null) { - conf = new Configuration(false); - } - LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf); - String appName = app != null ? app.getClass().getName() : "unknown"; - lpc.prepareDAG(lp, app, appName); - return lp; - } - - @Override - public Controller getController() - { - try { - addLibraryJarsToClasspath(lp); - return new StramLocalCluster(lp); - } catch (Exception e) { - throw new RuntimeException("Error creating local cluster", e); - } - } - - private void addLibraryJarsToClasspath(LogicalPlan lp) throws MalformedURLException - { - String libJarsCsv = lp.getAttributes().get(Context.DAGContext.LIBRARY_JARS); - - if (libJarsCsv != null && libJarsCsv.length() != 0) { - String[] split = libJarsCsv.split(StramClient.LIB_JARS_SEP); - if (split.length != 0) { - URL[] urlList = new URL[split.length]; - for (int i = 0; i < split.length; i++) { - File file = new File(split[i]); - urlList[i] = file.toURI().toURL(); - } - - // Set class loader. - ClassLoader prevCl = Thread.currentThread().getContextClassLoader(); - URLClassLoader cl = URLClassLoader.newInstance(urlList, prevCl); - Thread.currentThread().setContextClassLoader(cl); - } - } - - } -} http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/com/datatorrent/stram/StramUtils.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramUtils.java b/engine/src/main/java/com/datatorrent/stram/StramUtils.java index 2b2baa6..99f7bd4 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramUtils.java +++ b/engine/src/main/java/com/datatorrent/stram/StramUtils.java @@ -18,7 +18,8 @@ */ package com.datatorrent.stram; - +import java.io.IOException; +import java.util.List; import java.util.Map; import org.codehaus.jettison.json.JSONArray; @@ -28,10 +29,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; import com.google.common.base.Strings; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.datatorrent.api.Attribute; import com.datatorrent.api.StreamingApplication; import com.datatorrent.stram.util.LoggerUtil; @@ -148,4 +154,19 @@ public abstract class StramUtils return jsonObject; } + + public static <T> T getValueWithDefault(Attribute.AttributeMap map, Attribute<T> key) + { + T value = map.get(key); + if (value == null) { + value = key.defaultValue; + } + return value; + } + + public static List<ApplicationReport> getApexApplicationList(YarnClient yarnClient) throws IOException, YarnException + { + return yarnClient.getApplications(Sets.newHashSet(StramClient.YARN_APPLICATION_TYPE, StramClient.YARN_APPLICATION_TYPE_DEPRECATED)); + } + } http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java index 5cfde36..422f0c6 100644 --- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java +++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java @@ -89,14 +89,13 @@ import org.apache.log4j.Level; import org.apache.tools.ant.DirectoryScanner; import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; import com.sun.jersey.api.client.WebResource; import com.datatorrent.api.Context; import com.datatorrent.api.DAG.GenericOperator; import com.datatorrent.api.Operator; import com.datatorrent.api.StreamingApplication; -import com.datatorrent.stram.StramClient; +import com.datatorrent.stram.StramUtils; import com.datatorrent.stram.client.AppPackage; import com.datatorrent.stram.client.AppPackage.AppInfo; import com.datatorrent.stram.client.ConfigPackage; @@ -1593,7 +1592,7 @@ public class ApexCli private List<ApplicationReport> getApplicationList() { try { - return yarnClient.getApplications(Sets.newHashSet(StramClient.YARN_APPLICATION_TYPE, StramClient.YARN_APPLICATION_TYPE_DEPRECATED)); + return StramUtils.getApexApplicationList(yarnClient); } catch (Exception e) { throw new CliException("Error getting application list from resource manager", e); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java index 961a97b..216771d 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java @@ -44,6 +44,7 @@ import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.engine.util.StreamingAppFactory; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.NotImplementedException; @@ -62,7 +63,6 @@ import com.google.common.collect.Sets; import com.datatorrent.api.Context; import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.stram.StramClient; import com.datatorrent.stram.StramLocalCluster; import com.datatorrent.stram.StramUtils; @@ -462,36 +462,16 @@ public class StramAppLauncher try { final Class<?> clazz = cl.loadClass(className); if (!Modifier.isAbstract(clazz.getModifiers()) && StreamingApplication.class.isAssignableFrom(clazz)) { - final AppFactory appConfig = new AppFactory() + final AppFactory appConfig = new StreamingAppFactory(classFileName, clazz) { @Override - public String getName() - { - return classFileName; - } - - @Override - public String getDisplayName() - { - ApplicationAnnotation an = clazz.getAnnotation(ApplicationAnnotation.class); - if (an != null) { - return an.name(); - } else { - return classFileName; - } - } - - @Override - public LogicalPlan createApp(LogicalPlanConfiguration conf) + public LogicalPlan createApp(LogicalPlanConfiguration planConfig) { // load class from current context class loader Class<? extends StreamingApplication> c = StramUtils.classForName(className, StreamingApplication.class); StreamingApplication app = StramUtils.newInstance(c); - LogicalPlan dag = new LogicalPlan(); - conf.prepareDAG(dag, app, getName()); - return dag; + return super.createApp(app, planConfig); } - }; appResourceList.add(appConfig); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java new file mode 100644 index 0000000..9ace9b5 --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine; + +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; + +import org.apache.apex.api.EmbeddedAppLauncher; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.stram.StramClient; +import com.datatorrent.stram.StramLocalCluster; +import com.datatorrent.stram.StramUtils; +import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; + +/** + * An implementation of {@link EmbeddedAppLauncher} to launch applications directly in the current Java VM. + * + * TODO: When LocalMode is removed, make this class extend EmbeddedAppLauncher directly + * @since 0.3.2 + */ +public class EmbeddedAppLauncherImpl extends LocalMode<EmbeddedAppLauncherImpl.EmbeddedAppHandleImpl> +{ + private final LogicalPlan lp = new LogicalPlan(); + + @Override + public DAG getDAG() + { + return lp; + } + + @Override + public DAG cloneDAG() throws Exception + { + return StramLocalCluster.cloneLogicalPlan(lp); + } + + @Override + public EmbeddedAppHandleImpl launchApp(StreamingApplication application, Configuration configuration, Attribute.AttributeMap + launchParameters) throws LauncherException + { + try { + prepareDAG(application, configuration); + } catch (Exception e) { + throw new LauncherException(e); + } + LocalMode.Controller lc = getController(); + boolean launched = false; + if (launchParameters != null) { + if (StramUtils.getValueWithDefault(launchParameters, SERIALIZE_DAG)) { + // Check if DAG can be serialized + try { + cloneDAG(); + } catch (Exception e) { + throw new LauncherException(e); + } + } + if (StramUtils.getValueWithDefault(launchParameters, HEARTBEAT_MONITORING)) { + lc.setHeartbeatMonitoringEnabled(true); + } + if (StramUtils.getValueWithDefault(launchParameters, RUN_ASYNC)) { + lc.runAsync(); + launched = true; + } else { + Long runMillis = StramUtils.getValueWithDefault(launchParameters, RUN_MILLIS); + if (runMillis != null) { + lc.run(runMillis); + launched = true; + } + } + } + if (!launched) { + lc.run(); + } + return new EmbeddedAppHandleImpl(lc); + } + + @Override + public void shutdownApp(EmbeddedAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException + { + if (shutdownMode != ShutdownMode.KILL) { + app.controller.shutdown(); + } else { + throw new UnsupportedOperationException("Kill not supported"); + } + } + + @Override + public DAG prepareDAG(StreamingApplication app, Configuration conf) throws Exception + { + if (app == null && conf == null) { + throw new IllegalArgumentException("Require app or configuration to populate logical plan."); + } + if (conf == null) { + conf = new Configuration(false); + } + LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf); + String appName = app != null ? app.getClass().getName() : "unknown"; + lpc.prepareDAG(lp, app, appName); + return lp; + } + + @Override + public Controller getController() + { + try { + addLibraryJarsToClasspath(lp); + return new StramLocalCluster(lp); + } catch (Exception e) { + throw new RuntimeException("Error creating local cluster", e); + } + } + + private void addLibraryJarsToClasspath(LogicalPlan lp) throws MalformedURLException + { + String libJarsCsv = lp.getAttributes().get(Context.DAGContext.LIBRARY_JARS); + + if (libJarsCsv != null && libJarsCsv.length() != 0) { + String[] split = libJarsCsv.split(StramClient.LIB_JARS_SEP); + if (split.length != 0) { + URL[] urlList = new URL[split.length]; + for (int i = 0; i < split.length; i++) { + File file = new File(split[i]); + urlList[i] = file.toURI().toURL(); + } + + // Set class loader. + ClassLoader prevCl = Thread.currentThread().getContextClassLoader(); + URLClassLoader cl = URLClassLoader.newInstance(urlList, prevCl); + Thread.currentThread().setContextClassLoader(cl); + } + } + + } + + /** + * + */ + public static class EmbeddedAppHandleImpl implements EmbeddedAppLauncher.EmbeddedAppHandle + { + Controller controller; + + public EmbeddedAppHandleImpl(Controller controller) + { + this.controller = controller; + } + + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java new file mode 100644 index 0000000..4f5c8c8 --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.apex.api.YarnAppLauncher; +import org.apache.apex.engine.util.StreamingAppFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import com.google.common.base.Throwables; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.stram.StramUtils; +import com.datatorrent.stram.client.StramAppLauncher; +import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; + +/** + * An implementation of {@link YarnAppLauncher} to launch applications on Hadoop YARN. + */ +public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.YarnAppHandleImpl> +{ + + private static final Map<Attribute<?>, String> propMapping = new HashMap<>(); + + static { + propMapping.put(YarnAppLauncher.LIB_JARS, StramAppLauncher.LIBJARS_CONF_KEY_NAME); + propMapping.put(YarnAppLauncher.ORIGINAL_APP_ID, StramAppLauncher.ORIGINAL_APP_ID); + propMapping.put(YarnAppLauncher.QUEUE_NAME, StramAppLauncher.QUEUE_NAME); + } + + public YarnAppHandleImpl launchApp(final StreamingApplication app, Configuration conf, Attribute.AttributeMap launchParameters) throws LauncherException + { + if (launchParameters != null) { + for (Map.Entry<Attribute<?>, Object> entry : launchParameters.entrySet()) { + String property = propMapping.get(entry.getKey()); + if (property != null) { + setConfiguration(conf, property, entry.getValue()); + } + } + } + try { + String name = app.getClass().getName(); + StramAppLauncher appLauncher = new StramAppLauncher(name, conf); + appLauncher.loadDependencies(); + StreamingAppFactory appFactory = new StreamingAppFactory(name, app.getClass()) + { + @Override + public LogicalPlan createApp(LogicalPlanConfiguration planConfig) + { + return super.createApp(app, planConfig); + } + }; + ApplicationId appId = appLauncher.launchApp(appFactory); + return new YarnAppHandleImpl(appId); + } catch (Exception ex) { + throw new LauncherException(ex); + } + } + + @Override + public void shutdownApp(YarnAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException + { + if (shutdownMode == ShutdownMode.KILL) { + YarnClient yarnClient = YarnClient.createYarnClient(); + try { + String appId = app.getApplicationId(); + ApplicationId applicationId = null; + List<ApplicationReport> applications = StramUtils.getApexApplicationList(yarnClient); + for (ApplicationReport application : applications) { + if (application.getApplicationId().toString().equals(appId)) { + applicationId = application.getApplicationId(); + break; + } + } + if (applicationId == null) { + throw new LauncherException("Application " + appId + " not found"); + } + yarnClient.killApplication(applicationId); + } catch (YarnException | IOException e) { + throw Throwables.propagate(e); + } + } else { + throw new UnsupportedOperationException("Orderly shutdown not supported, try kill instead"); + } + } + + private void setConfiguration(Configuration conf, String property, Object value) + { + if (value instanceof Integer) { + conf.setInt(property, (Integer)value); + } else if (value instanceof Boolean) { + conf.setBoolean(property, (Boolean)value); + } else if (value instanceof Long) { + conf.setLong(property, (Long)value); + } else if (value instanceof Float) { + conf.setFloat(property, (Float)value); + } else if (value instanceof Double) { + conf.setDouble(property, (Double)value); + } else { + conf.set(property, value.toString()); + } + } + + /** + * + */ + public static class YarnAppHandleImpl implements YarnAppLauncher.YarnAppHandle + { + ApplicationId appId; + + public YarnAppHandleImpl(ApplicationId appId) + { + this.appId = appId; + } + + @Override + public String getApplicationId() + { + return appId.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java b/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java new file mode 100644 index 0000000..6c943fc --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.util; + +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.stram.client.StramAppLauncher; +import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; + +/** + * + */ +public abstract class StreamingAppFactory implements StramAppLauncher.AppFactory +{ + private Class<?> appClazz; + private String name; + + public StreamingAppFactory(String name, Class<?> appClazz) + { + this.name = name; + this.appClazz = appClazz; + } + + public abstract LogicalPlan createApp(LogicalPlanConfiguration planConfig); + + protected LogicalPlan createApp(StreamingApplication app, LogicalPlanConfiguration planConfig) + { + LogicalPlan dag = new LogicalPlan(); + planConfig.prepareDAG(dag, app, getName()); + return dag; + } + + public String getName() + { + return name; + } + + public String getDisplayName() + { + ApplicationAnnotation an = appClazz.getAnnotation(ApplicationAnnotation.class); + if (an != null) { + return an.name(); + } else { + return name; + } + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/resources/META-INF/services/com.datatorrent.api.LocalMode ---------------------------------------------------------------------- diff --git a/engine/src/main/resources/META-INF/services/com.datatorrent.api.LocalMode b/engine/src/main/resources/META-INF/services/com.datatorrent.api.LocalMode index 712a323..cb9119f 100644 --- a/engine/src/main/resources/META-INF/services/com.datatorrent.api.LocalMode +++ b/engine/src/main/resources/META-INF/services/com.datatorrent.api.LocalMode @@ -1 +1 @@ -com.datatorrent.stram.LocalModeImpl \ No newline at end of file +org.apache.apex.engine.EmbeddedAppLauncherImpl http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/resources/META-INF/services/org.apache.apex.api.EmbeddedAppLauncher ---------------------------------------------------------------------- diff --git a/engine/src/main/resources/META-INF/services/org.apache.apex.api.EmbeddedAppLauncher b/engine/src/main/resources/META-INF/services/org.apache.apex.api.EmbeddedAppLauncher new file mode 100644 index 0000000..cb9119f --- /dev/null +++ b/engine/src/main/resources/META-INF/services/org.apache.apex.api.EmbeddedAppLauncher @@ -0,0 +1 @@ +org.apache.apex.engine.EmbeddedAppLauncherImpl http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/resources/META-INF/services/org.apache.apex.api.YarnAppLauncher ---------------------------------------------------------------------- diff --git a/engine/src/main/resources/META-INF/services/org.apache.apex.api.YarnAppLauncher b/engine/src/main/resources/META-INF/services/org.apache.apex.api.YarnAppLauncher new file mode 100644 index 0000000..45989a4 --- /dev/null +++ b/engine/src/main/resources/META-INF/services/org.apache.apex.api.YarnAppLauncher @@ -0,0 +1 @@ +org.apache.apex.engine.YarnAppLauncherImpl
