Repository: apex-core Updated Branches: refs/heads/master 9d6408ea4 -> 9054fd2b9
APEXCORE-683 Apex client should support application packages on HDFS Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/926ecc89 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/926ecc89 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/926ecc89 Branch: refs/heads/master Commit: 926ecc89aa9083d1b8104f90d1e3bc31dd368bbd Parents: 04a352b Author: Vlad Rozov <[email protected]> Authored: Sat Mar 25 09:00:33 2017 -0700 Committer: Vlad Rozov <[email protected]> Committed: Tue Mar 28 10:52:21 2017 -0700 ---------------------------------------------------------------------- .../java/com/datatorrent/stram/cli/ApexCli.java | 51 ++++++---- .../datatorrent/stram/client/AppPackage.java | 98 ++++++++++++++++---- .../stram/client/AppPackageTest.java | 17 +--- 3 files changed, 117 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/926ecc89/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java index dfaae97..77959ab 100644 --- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java +++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java @@ -20,6 +20,7 @@ package com.datatorrent.stram.cli; import java.io.BufferedReader; import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; @@ -139,7 +140,6 @@ import jline.console.completer.StringsCompleter; import jline.console.history.FileHistory; import jline.console.history.History; import jline.console.history.MemoryHistory; -import net.lingala.zip4j.exception.ZipException; import sun.misc.Signal; import sun.misc.SignalHandler; @@ -460,7 +460,26 @@ public class ApexCli } } - AppPackage newAppPackageInstance(File f) throws IOException, ZipException + AppPackage newAppPackageInstance(URI uri, boolean suppressOutput) throws IOException + { + PrintStream outputStream = suppressOutput ? suppressOutput() : null; + try { + final String scheme = uri.getScheme(); + if (scheme == null || scheme.equals("file")) { + return new AppPackage(new FileInputStream(new File(expandFileName(uri.getPath(), true))), true); + } else { + try (FileSystem fs = FileSystem.newInstance(uri, conf)) { + return new AppPackage(fs.open(new Path(uri.getPath())), true); + } + } + } finally { + if (outputStream != null) { + restoreOutput(outputStream); + } + } + } + + AppPackage newAppPackageInstance(File f) throws IOException { PrintStream outputStream = suppressOutput(); try { @@ -609,7 +628,7 @@ public class ApexCli "Connect to an app")); globalCommands.put("launch", new OptionsCommandSpec(new LaunchCommand(), new Arg[]{}, - new Arg[]{new FileArg("jar-file/json-file/properties-file/app-package-file"), new Arg("matching-app-name")}, + new Arg[]{new FileArg("jar-file/json-file/properties-file/app-package-file-path/app-package-file-uri"), new Arg("matching-app-name")}, "Launch an app", LAUNCH_OPTIONS.options)); globalCommands.put("shutdown-app", new CommandSpec(new ShutdownAppCommand(), new Arg[]{new Arg("app-id")}, @@ -673,17 +692,17 @@ public class ApexCli new Arg[]{new FileArg("parameter-name")}, "Get the configuration parameter")); globalCommands.put("get-app-package-info", new OptionsCommandSpec(new GetAppPackageInfoCommand(), - new Arg[]{new FileArg("app-package-file")}, + new Arg[]{new FileArg("app-package-file-path/app-package-file-uri")}, new Arg[]{new Arg("-withDescription")}, "Get info on the app package file", GET_APP_PACKAGE_INFO_OPTIONS)); globalCommands.put("get-app-package-operators", new OptionsCommandSpec(new GetAppPackageOperatorsCommand(), - new Arg[]{new FileArg("app-package-file")}, + new Arg[]{new FileArg("app-package-file-path/app-package-file-uri")}, new Arg[]{new Arg("search-term")}, "Get operators within the given app package", GET_OPERATOR_CLASSES_OPTIONS.options)); globalCommands.put("get-app-package-operator-properties", new CommandSpec(new GetAppPackageOperatorPropertiesCommand(), - new Arg[]{new FileArg("app-package-file"), new Arg("operator-class")}, + new Arg[]{new FileArg("app-package-file-path/app-package-file-uri"), new Arg("operator-class")}, null, "Get operator properties within the given app package")); globalCommands.put("list-default-app-attributes", new CommandSpec(new ListDefaultAttributesCommand(AttributesType.APPLICATION), @@ -771,7 +790,7 @@ public class ApexCli "Begin Logical Plan Change")); connectedCommands.put("show-logical-plan", new OptionsCommandSpec(new ShowLogicalPlanCommand(), null, - new Arg[]{new FileArg("jar-file/app-package-file"), new Arg("class-name")}, + new Arg[]{new FileArg("jar-file/app-package-file-path/app-package-file-uri"), new Arg("class-name")}, "Show logical plan of an app class", getShowLogicalPlanCommandLineOptions())); connectedCommands.put("dump-properties-file", new CommandSpec(new DumpPropertiesFileCommand(), @@ -1944,7 +1963,7 @@ public class ApexCli // see if it's an app package AppPackage ap = null; try { - ap = newAppPackageInstance(new File(fileName)); + ap = newAppPackageInstance(new URI(fileName), true); } catch (Exception ex) { // It's not an app package if (requiredAppPackageName != null) { @@ -2842,18 +2861,15 @@ public class ApexCli } if (commandLineInfo.args.length > 0) { - String filename = expandFileName(commandLineInfo.args[0], true); - // see if the first argument is actually an app package - try { - AppPackage ap = new AppPackage(new File(filename)); - ap.close(); + try (AppPackage ap = newAppPackageInstance(new URI(commandLineInfo.args[0]), false)) { new ShowLogicalPlanAppPackageCommand().execute(args, reader); return; } catch (Exception ex) { // fall through } + String filename = expandFileName(commandLineInfo.args[0], true); if (commandLineInfo.args.length >= 2) { String appName = commandLineInfo.args[1]; StramAppLauncher submitApp = getStramAppLauncher(filename, config, commandLineInfo.ignorePom); @@ -2938,8 +2954,7 @@ public class ApexCli @Override public void execute(String[] args, ConsoleReader reader) throws Exception { - String jarfile = expandFileName(args[1], true); - try (AppPackage ap = newAppPackageInstance(new File(jarfile))) { + try (AppPackage ap = newAppPackageInstance(new URI(args[1]), true)) { List<AppInfo> applications = ap.getApplications(); if (args.length >= 3) { @@ -3499,7 +3514,7 @@ public class ApexCli String[] tmpArgs = new String[args.length - 2]; System.arraycopy(args, 2, tmpArgs, 0, args.length - 2); GetAppPackageInfoCommandLineInfo commandLineInfo = getGetAppPackageInfoCommandLineInfo(tmpArgs); - try (AppPackage ap = newAppPackageInstance(new File(expandFileName(args[1], true)))) { + try (AppPackage ap = newAppPackageInstance(new URI(args[1]), true)) { JSONSerializationProvider jomp = new JSONSerializationProvider(); jomp.addSerializer(PropertyInfo.class, new AppPackage.PropertyInfoSerializer(commandLineInfo.provideDescription)); @@ -3877,7 +3892,7 @@ public class ApexCli String[] tmpArgs = new String[args.length - 1]; System.arraycopy(args, 1, tmpArgs, 0, args.length - 1); GetOperatorClassesCommandLineInfo commandLineInfo = getGetOperatorClassesCommandLineInfo(tmpArgs); - try (AppPackage ap = newAppPackageInstance(new File(expandFileName(commandLineInfo.args[0], true)))) { + try (AppPackage ap = newAppPackageInstance(new URI(commandLineInfo.args[0]), true)) { List<String> newArgs = new ArrayList<>(); List<String> jars = new ArrayList<>(); for (String jar : ap.getAppJars()) { @@ -3907,7 +3922,7 @@ public class ApexCli @Override public void execute(String[] args, ConsoleReader reader) throws Exception { - try (AppPackage ap = newAppPackageInstance(new File(expandFileName(args[1], true)))) { + try (AppPackage ap = newAppPackageInstance(new URI(args[1]), true)) { List<String> newArgs = new ArrayList<>(); List<String> jars = new ArrayList<>(); for (String jar : ap.getAppJars()) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/926ecc89/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java index 238b646..a4d0364 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java +++ b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java @@ -18,8 +18,12 @@ */ package com.datatorrent.stram.client; +import java.io.Closeable; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; @@ -31,7 +35,9 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.jar.Attributes; +import java.util.jar.JarEntry; import java.util.jar.JarFile; +import java.util.jar.JarInputStream; import java.util.jar.Manifest; import org.codehaus.jackson.JsonGenerator; @@ -42,6 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -60,7 +67,7 @@ import net.lingala.zip4j.model.ZipParameters; * * @since 1.0.3 */ -public class AppPackage extends JarFile +public class AppPackage implements Closeable { public static final String ATTRIBUTE_DT_ENGINE_VERSION = "DT-Engine-Version"; public static final String ATTRIBUTE_DT_APP_PACKAGE_NAME = "DT-App-Package-Name"; @@ -160,9 +167,14 @@ public class AppPackage extends JarFile } } - public AppPackage(File file) throws IOException, ZipException + public AppPackage(File file) throws IOException { - this(file, false); + this(new FileInputStream(file)); + } + + public AppPackage(InputStream input) throws IOException + { + this(input, false); } /** @@ -178,11 +190,29 @@ public class AppPackage extends JarFile * @param contentFolder the folder that the app package will be extracted to * @param processAppDirectory * @throws java.io.IOException - * @throws net.lingala.zip4j.exception.ZipException */ - public AppPackage(File file, File contentFolder, boolean processAppDirectory) throws IOException, ZipException + public AppPackage(File file, File contentFolder, boolean processAppDirectory) throws IOException + { + this(new FileInputStream(file), contentFolder, processAppDirectory); + } + + /** + * Creates an App Package object. + * + * If app directory is to be processed, there may be resource leak in the class loader. Only pass true for short-lived + * applications + * + * If contentFolder is not null, it will try to create the contentFolder, file will be retained on disk after App Package is closed + * If contentFolder is null, temp folder will be created and will be cleaned on close() + * + * @param input + * @param contentFolder the folder that the app package will be extracted to + * @param processAppDirectory + * @throws java.io.IOException + */ + public AppPackage(InputStream input, File contentFolder, boolean processAppDirectory) throws IOException { - super(file); + final JarInputStream jarInputStream = new JarInputStream(input); if (contentFolder != null) { FileUtils.forceMkdir(contentFolder); @@ -193,7 +223,7 @@ public class AppPackage extends JarFile } directory = contentFolder; - Manifest manifest = getManifest(); + Manifest manifest = jarInputStream.getManifest(); if (manifest == null) { throw new IOException("Not a valid app package. MANIFEST.MF is not present."); } @@ -209,7 +239,7 @@ public class AppPackage extends JarFile throw new IOException("Not a valid app package. App Package Name or Version or Class-Path is missing from MANIFEST.MF"); } classPath.addAll(Arrays.asList(StringUtils.split(classPathString, " "))); - extractToDirectory(directory, file); + extractToDirectory(directory, jarInputStream); File confDirectory = new File(directory, "conf"); if (confDirectory.exists()) { @@ -251,23 +281,56 @@ public class AppPackage extends JarFile * @param file * @param processAppDirectory * @throws java.io.IOException - * @throws net.lingala.zip4j.exception.ZipException */ - public AppPackage(File file, boolean processAppDirectory) throws IOException, ZipException + public AppPackage(File file, boolean processAppDirectory) throws IOException + { + this(new FileInputStream(file), processAppDirectory); + } + + /** + * Creates an App Package object. + * + * If app directory is to be processed, there may be resource leak in the class loader. Only pass true for short-lived + * applications + * + * Files in app package will be extracted to tmp folder and will be cleaned on close() + * The close() method could be explicitly called or implicitly called by GC finalize() + * + * @param input + * @param processAppDirectory + * @throws java.io.IOException + */ + public AppPackage(InputStream input, boolean processAppDirectory) throws IOException { - this(file, null, processAppDirectory); + this(input, null, processAppDirectory); } - public static void extractToDirectory(File directory, File appPackageFile) throws ZipException + public static void extractToDirectory(File directory, File appPackageFile) throws IOException { - ZipFile zipFile = new ZipFile(appPackageFile); + extractToDirectory(directory, new JarInputStream(new FileInputStream(appPackageFile))); + } - if (zipFile.isEncrypted()) { - throw new ZipException("Encrypted app package not supported yet"); + private static void extractToDirectory(File directory, JarInputStream input) throws IOException + { + File manifestFile = new File(directory, JarFile.MANIFEST_NAME); + manifestFile.getParentFile().mkdirs(); + try (FileOutputStream output = new FileOutputStream(manifestFile)) { + input.getManifest().write(output); } - directory.mkdirs(); - zipFile.extractAll(directory.getAbsolutePath()); + JarEntry entry = input.getNextJarEntry(); + while (entry != null) { + File newFile = new File(directory, entry.getName()); + if (entry.isDirectory()) { + newFile.mkdirs(); + } else { + try (FileOutputStream output = new FileOutputStream(newFile)) { + IOUtils.copy(input, output); + } + } + input.closeEntry(); + entry = input.getNextJarEntry(); + } } public static void createAppPackageFile(File fileToBeCreated, File directory) throws ZipException @@ -286,7 +349,6 @@ public class AppPackage extends JarFile @Override public void close() throws IOException { - super.close(); if (cleanOnClose) { cleanContent(); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/926ecc89/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java b/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java index 20550a9..f7bfa12 100644 --- a/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java +++ b/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java @@ -41,8 +41,6 @@ import com.datatorrent.stram.client.AppPackage.PropertyInfo; import com.datatorrent.stram.support.StramTestSupport; import com.datatorrent.stram.util.JSONSerializationProvider; -import net.lingala.zip4j.exception.ZipException; - /** * */ @@ -57,24 +55,17 @@ public class AppPackageTest String appPackageDir = "src/test/resources/testAppPackage/mydtapp"; @BeforeClass - public static void starting() + public static void starting() throws IOException, JSONException { + File file = StramTestSupport.createAppPackageFile(); + // Set up test instance + ap = new AppPackage(file, true); try { - File file = StramTestSupport.createAppPackageFile(); - // Set up test instance - ap = new AppPackage(file, true); // set up another instance File testfolder = new File("target/testapp"); yap = new AppPackage(file, testfolder, false); jomp = new JSONSerializationProvider(); json = new JSONObject(jomp.getContext(null).writeValueAsString(ap)); - - } catch (ZipException e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } catch (JSONException e) { - throw new RuntimeException(e); } finally { IOUtils.closeQuietly(ap); IOUtils.closeQuietly(yap);
