Repository: incubator-blur Updated Branches: refs/heads/master 28f574671 -> 0e6aed41b
Creating a stream util, likely required for anything that uses blur streaming. Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/647c66bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/647c66bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/647c66bb Branch: refs/heads/master Commit: 647c66bbe2072ba5c674ab33914c7a248e208f4d Parents: 28f5746 Author: Aaron McCurry <amccu...@gmail.com> Authored: Tue Oct 13 08:08:11 2015 -0400 Committer: Aaron McCurry <amccu...@gmail.com> Committed: Tue Oct 13 08:08:11 2015 -0400 ---------------------------------------------------------------------- .../apache/blur/command/stream/StreamUtil.java | 143 +++++++++++++++++++ .../org/apache/blur/spark/BlurSparkUtil.java | 143 +------------------ 2 files changed, 146 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/647c66bb/blur-core/src/main/java/org/apache/blur/command/stream/StreamUtil.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamUtil.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamUtil.java index e4811b2..6d2ced3 100644 --- a/blur-core/src/main/java/org/apache/blur/command/stream/StreamUtil.java +++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamUtil.java @@ -16,18 +16,41 @@ */ package org.apache.blur.command.stream; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.DataInput; import java.io.DataOutput; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; +import java.io.OutputStream; import java.io.Serializable; import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.HashSet; +import java.util.Set; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; +import org.apache.commons.io.IOUtils; import org.apache.commons.io.output.ByteArrayOutputStream; +import com.google.common.base.Splitter; + public class StreamUtil { private static final String UTF_8 = "UTF-8"; + private static final char JAR_END = '!'; + private static final String JAR_START = "jar:"; + private static final String TMP_BLUR_JOB = "tmp-blur-job_"; + private static final String JAR = ".jar"; + private static final String SEP = "/"; + private static final String PATH_SEPARATOR = "path.separator"; + private static final String JAVA_CLASS_PATH = "java.class.path"; public static void writeString(DataOutput output, String s) throws IOException { try { @@ -53,4 +76,124 @@ public class StreamUtil { out.close(); return outputStream.toByteArray(); } + + public static String[] getJarPaths(Class<?>... clazzes) throws IOException { + Set<String> classPathThatNeedsToBeIncluded = findJarFiles(clazzes); + Set<String> jars = new HashSet<String>(); + for (String s : classPathThatNeedsToBeIncluded) { + if (isJarFile(s)) { + jars.add(s); + } else { + jars.add(createJar(s)); + } + } + return jars.toArray(new String[jars.size()]); + } + + private static Set<String> findJarFiles(Class<?>[] clazzes) { + Set<String> result = new HashSet<String>(); + for (Class<?> c : clazzes) { + result.add(findJarFile(c)); + } + return result; + } + + private static String findJarFile(Class<?> c) { + String resourceName = "/" + c.getName().replace('.', '/') + ".class"; + URL url = StreamUtil.class.getResource(resourceName); + try { + URI uri = url.toURI(); + if (uri.getScheme().equals("file")) { + return findFileInClassFileUri(uri); + } else if (uri.getScheme().equals("jar")) { + return findFileInJarFileUri(uri); + } else { + throw new RuntimeException("Unsupported schema [" + uri.getScheme() + "] for uri [" + uri + "]"); + } + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + private static String findFileInClassFileUri(URI uri) { + String classPath = System.getProperty(JAVA_CLASS_PATH); + String pathSeparator = System.getProperty(PATH_SEPARATOR); + Splitter splitter = Splitter.on(pathSeparator); + Iterable<String> split = splitter.split(classPath); + String path = uri.getPath(); + for (String s : split) { + if (path.startsWith(s)) { + return new File(s).getAbsolutePath(); + } + } + throw new RuntimeException("Uri [" + uri + "] was not found on the classpath."); + } + + private static String findFileInJarFileUri(URI uri) throws URISyntaxException { + String s = uri.toString(); + int indexOf1 = s.indexOf(JAR_START); + int indexOf2 = s.indexOf(JAR_END); + return new File(new URI(s.substring(indexOf1 + JAR_START.length(), indexOf2))).getAbsolutePath(); + } + + private static String createJar(String s) throws IOException { + File sourceFile = new File(s); + if (sourceFile.isDirectory()) { + File file = File.createTempFile(TMP_BLUR_JOB, JAR); + OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(file)); + JarOutputStream jarOut = new JarOutputStream(outputStream); + for (File f : sourceFile.listFiles()) { + pack(sourceFile, f, jarOut); + } + jarOut.close(); + file.deleteOnExit(); + return file.getAbsolutePath(); + } + throw new RuntimeException("File [" + s + "] is not a directory."); + } + + private static void pack(File rootPath, File source, JarOutputStream target) throws IOException { + String name = getName(rootPath, source); + if (source.isDirectory()) { + if (!SEP.equals(name)) { + JarEntry entry = new JarEntry(name); + entry.setTime(source.lastModified()); + target.putNextEntry(entry); + target.closeEntry(); + } + for (File f : source.listFiles()) { + pack(rootPath, f, target); + } + } else { + JarEntry entry = new JarEntry(name); + entry.setTime(source.lastModified()); + target.putNextEntry(entry); + BufferedInputStream in = new BufferedInputStream(new FileInputStream(source)); + IOUtils.copy(in, target); + in.close(); + target.closeEntry(); + } + } + + private static String getName(File rootPath, File source) { + String rootStr = rootPath.toURI().toString(); + String sourceStr = source.toURI().toString(); + if (sourceStr.startsWith(rootStr)) { + String result = sourceStr.substring(rootStr.length()); + if (source.isDirectory() && !result.endsWith(SEP)) { + result += SEP; + } + return result; + } else { + throw new RuntimeException("Not sure what happened."); + } + } + + private static boolean isJarFile(String s) { + if (s.endsWith(JAR) || s.endsWith(".zip")) { + return true; + } + return false; + } + } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/647c66bb/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkUtil.java ---------------------------------------------------------------------- diff --git a/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkUtil.java b/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkUtil.java index a1f54fc..6235d49 100644 --- a/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkUtil.java +++ b/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkUtil.java @@ -16,153 +16,16 @@ */ package org.apache.blur.spark; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.HashSet; -import java.util.Set; -import java.util.jar.JarEntry; -import java.util.jar.JarOutputStream; -import org.apache.commons.io.IOUtils; +import org.apache.blur.command.stream.StreamUtil; import org.apache.spark.SparkConf; -import com.google.common.base.Splitter; - public class BlurSparkUtil { - private static final char JAR_END = '!'; - private static final String JAR_START = "jar:"; - private static final String TMP_SPARK_JOB = "tmp-spark-job_"; - private static final String JAR = ".jar"; - private static final String SEP = "/"; - private static final String PATH_SEPARATOR = "path.separator"; - private static final String JAVA_CLASS_PATH = "java.class.path"; - public static void packJars(SparkConf conf, Class<?>... clazzes) throws IOException { - Set<String> classPathThatNeedsToBeIncluded = findJarFiles(clazzes); - Set<String> jars = new HashSet<String>(); - for (String s : classPathThatNeedsToBeIncluded) { - if (isJarFile(s)) { - jars.add(s); - } else { - jars.add(createJar(s)); - } - } - conf.setJars(jars.toArray(new String[jars.size()])); - } - - private static Set<String> findJarFiles(Class<?>[] clazzes) { - Set<String> result = new HashSet<String>(); - for (Class<?> c : clazzes) { - result.add(findJarFile(c)); - } - return result; - } - - private static String findJarFile(Class<?> c) { - String resourceName = "/" + c.getName().replace('.', '/') + ".class"; - URL url = BlurSparkUtil.class.getResource(resourceName); - try { - URI uri = url.toURI(); - if (uri.getScheme().equals("file")) { - return findFileInClassFileUri(uri); - } else if (uri.getScheme().equals("jar")) { - return findFileInJarFileUri(uri); - } else { - throw new RuntimeException("Unsupported schema [" + uri.getScheme() + "] for uri [" + uri + "]"); - } - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - } - - private static String findFileInClassFileUri(URI uri) { - String classPath = System.getProperty(JAVA_CLASS_PATH); - String pathSeparator = System.getProperty(PATH_SEPARATOR); - Splitter splitter = Splitter.on(pathSeparator); - Iterable<String> split = splitter.split(classPath); - String path = uri.getPath(); - for (String s : split) { - if (path.startsWith(s)) { - return new File(s).getAbsolutePath(); - } - } - throw new RuntimeException("Uri [" + uri + "] was not found on the classpath."); - } - - private static String findFileInJarFileUri(URI uri) throws URISyntaxException { - String s = uri.toString(); - int indexOf1 = s.indexOf(JAR_START); - int indexOf2 = s.indexOf(JAR_END); - return new File(new URI(s.substring(indexOf1 + JAR_START.length(), indexOf2))).getAbsolutePath(); - } - - private static String createJar(String s) throws IOException { - File sourceFile = new File(s); - if (sourceFile.isDirectory()) { - File file = File.createTempFile(TMP_SPARK_JOB, JAR); - OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(file)); - JarOutputStream jarOut = new JarOutputStream(outputStream); - for (File f : sourceFile.listFiles()) { - pack(sourceFile, f, jarOut); - } - jarOut.close(); - file.deleteOnExit(); - return file.getAbsolutePath(); - } - throw new RuntimeException("File [" + s + "] is not a directory."); - } - - private static void pack(File rootPath, File source, JarOutputStream target) throws IOException { - String name = getName(rootPath, source); - if (source.isDirectory()) { - if (!SEP.equals(name)) { - JarEntry entry = new JarEntry(name); - entry.setTime(source.lastModified()); - target.putNextEntry(entry); - target.closeEntry(); - } - for (File f : source.listFiles()) { - pack(rootPath, f, target); - } - } else { - JarEntry entry = new JarEntry(name); - entry.setTime(source.lastModified()); - target.putNextEntry(entry); - BufferedInputStream in = new BufferedInputStream(new FileInputStream(source)); - IOUtils.copy(in, target); - in.close(); - target.closeEntry(); - } - } - - private static String getName(File rootPath, File source) { - String rootStr = rootPath.toURI().toString(); - String sourceStr = source.toURI().toString(); - if (sourceStr.startsWith(rootStr)) { - String result = sourceStr.substring(rootStr.length()); - if (source.isDirectory() && !result.endsWith(SEP)) { - result += SEP; - } - return result; - } else { - throw new RuntimeException("Not sure what happened."); - } - } - - private static boolean isJarFile(String s) { - if (s.endsWith(JAR) || s.endsWith(".zip")) { - return true; - } - return false; + String[] jarPaths = StreamUtil.getJarPaths(clazzes); + conf.setJars(jarPaths); } }