http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java index aebb136..90b50ea 100755 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java @@ -32,192 +32,199 @@ import com.yahoo.labs.samoa.core.Processor; */ public class TopologyBuilder { - // TODO: - // Possible options: - // 1. we may convert this as interface and platform dependent builder will inherit this method - // 2. refactor by combining TopologyBuilder, ComponentFactory and Topology - // -ve -> fat class where it has capabilities to instantiate specific component and connecting them - // +ve -> easy abstraction for SAMOA developer "you just implement your builder logic here!" - private ComponentFactory componentFactory; - private Topology topology; - private Map<Processor, IProcessingItem> mapProcessorToProcessingItem; - - // TODO: refactor, temporary constructor used by Storm code - public TopologyBuilder() { - // TODO: initialize _componentFactory using dynamic binding - // for now, use StormComponentFactory - // should the factory be Singleton (?) - // ans: at the moment, no, i.e. each builder will has its associated factory! - // and the factory will be instantiated using dynamic binding - // this.componentFactory = new StormComponentFactory(); - } - - // TODO: refactor, temporary constructor used by S4 code - public TopologyBuilder(ComponentFactory theFactory) { - this.componentFactory = theFactory; - } - - /** - * Initiates topology with a specific name. - * - * @param topologyName - */ - public void initTopology(String topologyName) { - this.initTopology(topologyName, 0); - } - - /** - * Initiates topology with a specific name and a delay between consecutive instances. - * - * @param topologyName - * @param delay - * delay between injections of two instances from source (in milliseconds) - */ - public void initTopology(String topologyName, int delay) { - if (this.topology != null) { - // TODO: possible refactor this code later - System.out.println("Topology has been initialized before!"); - return; - } - this.topology = componentFactory.createTopology(topologyName); - } - - /** - * Returns the platform specific topology. - * - * @return - */ - public Topology build() { - return topology; - } - - public ProcessingItem addProcessor(Processor processor, int parallelism) { - ProcessingItem pi = createPi(processor, parallelism); - if (this.mapProcessorToProcessingItem == null) - this.mapProcessorToProcessingItem = new HashMap<Processor, IProcessingItem>(); - this.mapProcessorToProcessingItem.put(processor, pi); - return pi; - } - - public ProcessingItem addProcessor(Processor processor) { - return addProcessor(processor, 1); - } - - public ProcessingItem connectInputShuffleStream(Stream inputStream, Processor processor) { - ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); - Preconditions.checkNotNull(pi, "Trying to connect to null PI"); - return pi.connectInputShuffleStream(inputStream); - } - - public ProcessingItem connectInputKeyStream(Stream inputStream, Processor processor) { - ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); - Preconditions.checkNotNull(pi, "Trying to connect to null PI"); - return pi.connectInputKeyStream(inputStream); - } - - public ProcessingItem connectInputAllStream(Stream inputStream, Processor processor) { - ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); - Preconditions.checkNotNull(pi, "Trying to connect to null PI"); - return pi.connectInputAllStream(inputStream); - } - - public Stream createInputShuffleStream(Processor processor, Processor dest) { - Stream inputStream = this.createStream(dest); - ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); - Preconditions.checkNotNull(pi, "Trying to connect to null PI"); - pi.connectInputShuffleStream(inputStream); - return inputStream; - } - - public Stream createInputKeyStream(Processor processor, Processor dest) { - Stream inputStream = this.createStream(dest); - ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); - Preconditions.checkNotNull(pi, "Trying to connect to null PI"); - pi.connectInputKeyStream(inputStream); - return inputStream; - } - - public Stream createInputAllStream(Processor processor, Processor dest) { - Stream inputStream = this.createStream(dest); - ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); - Preconditions.checkNotNull(pi, "Trying to connect to null PI"); - pi.connectInputAllStream(inputStream); - return inputStream; - } - - public Stream createStream(Processor processor) { - IProcessingItem pi = mapProcessorToProcessingItem.get(processor); - Stream ret = null; - Preconditions.checkNotNull(pi, "Trying to create stream from null PI"); - ret = this.createStream(pi); - if (pi instanceof EntranceProcessingItem) - ((EntranceProcessingItem) pi).setOutputStream(ret); - return ret; - } - - public EntranceProcessingItem addEntranceProcessor(EntranceProcessor entranceProcessor) { - EntranceProcessingItem pi = createEntrancePi(entranceProcessor); - if (this.mapProcessorToProcessingItem == null) - this.mapProcessorToProcessingItem = new HashMap<Processor, IProcessingItem>(); - mapProcessorToProcessingItem.put(entranceProcessor, pi); - return pi; - } - - public ProcessingItem getProcessingItem(Processor processor) { - ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); - Preconditions.checkNotNull(pi, "Trying to retrieve null PI"); - return pi; - } - - /** - * Creates a processing item with a specific processor and paralellism level of 1. - * - * @param processor - * @return ProcessingItem - */ - @SuppressWarnings("unused") - private ProcessingItem createPi(Processor processor) { - return createPi(processor, 1); - } - - /** - * Creates a processing item with a specific processor and paralellism level. - * - * @param processor - * @param parallelism - * @return ProcessingItem - */ - private ProcessingItem createPi(Processor processor, int parallelism) { - ProcessingItem pi = this.componentFactory.createPi(processor, parallelism); - this.topology.addProcessingItem(pi, parallelism); - return pi; - } - - /** - * Creates a platform specific entrance processing item. - * - * @param processor - * @return - */ - private EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { - EntranceProcessingItem epi = this.componentFactory.createEntrancePi(processor); - this.topology.addEntranceProcessingItem(epi); - if (this.mapProcessorToProcessingItem == null) - this.mapProcessorToProcessingItem = new HashMap<Processor, IProcessingItem>(); - this.mapProcessorToProcessingItem.put(processor, epi); - return epi; - } - - /** - * Creates a platform specific stream. - * - * @param sourcePi - * source processing item. - * @return - */ - private Stream createStream(IProcessingItem sourcePi) { - Stream stream = this.componentFactory.createStream(sourcePi); - this.topology.addStream(stream); - return stream; - } + // TODO: + // Possible options: + // 1. we may convert this as interface and platform dependent builder will + // inherit this method + // 2. refactor by combining TopologyBuilder, ComponentFactory and Topology + // -ve -> fat class where it has capabilities to instantiate specific + // component and connecting them + // +ve -> easy abstraction for SAMOA developer + // "you just implement your builder logic here!" + private ComponentFactory componentFactory; + private Topology topology; + private Map<Processor, IProcessingItem> mapProcessorToProcessingItem; + + // TODO: refactor, temporary constructor used by Storm code + public TopologyBuilder() { + // TODO: initialize _componentFactory using dynamic binding + // for now, use StormComponentFactory + // should the factory be Singleton (?) + // ans: at the moment, no, i.e. each builder will has its associated + // factory! + // and the factory will be instantiated using dynamic binding + // this.componentFactory = new StormComponentFactory(); + } + + // TODO: refactor, temporary constructor used by S4 code + public TopologyBuilder(ComponentFactory theFactory) { + this.componentFactory = theFactory; + } + + /** + * Initiates topology with a specific name. + * + * @param topologyName + */ + public void initTopology(String topologyName) { + this.initTopology(topologyName, 0); + } + + /** + * Initiates topology with a specific name and a delay between consecutive + * instances. + * + * @param topologyName + * @param delay + * delay between injections of two instances from source (in + * milliseconds) + */ + public void initTopology(String topologyName, int delay) { + if (this.topology != null) { + // TODO: possible refactor this code later + System.out.println("Topology has been initialized before!"); + return; + } + this.topology = componentFactory.createTopology(topologyName); + } + + /** + * Returns the platform specific topology. + * + * @return + */ + public Topology build() { + return topology; + } + + public ProcessingItem addProcessor(Processor processor, int parallelism) { + ProcessingItem pi = createPi(processor, parallelism); + if (this.mapProcessorToProcessingItem == null) + this.mapProcessorToProcessingItem = new HashMap<Processor, IProcessingItem>(); + this.mapProcessorToProcessingItem.put(processor, pi); + return pi; + } + + public ProcessingItem addProcessor(Processor processor) { + return addProcessor(processor, 1); + } + + public ProcessingItem connectInputShuffleStream(Stream inputStream, Processor processor) { + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + return pi.connectInputShuffleStream(inputStream); + } + + public ProcessingItem connectInputKeyStream(Stream inputStream, Processor processor) { + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + return pi.connectInputKeyStream(inputStream); + } + + public ProcessingItem connectInputAllStream(Stream inputStream, Processor processor) { + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + return pi.connectInputAllStream(inputStream); + } + + public Stream createInputShuffleStream(Processor processor, Processor dest) { + Stream inputStream = this.createStream(dest); + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + pi.connectInputShuffleStream(inputStream); + return inputStream; + } + + public Stream createInputKeyStream(Processor processor, Processor dest) { + Stream inputStream = this.createStream(dest); + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + pi.connectInputKeyStream(inputStream); + return inputStream; + } + + public Stream createInputAllStream(Processor processor, Processor dest) { + Stream inputStream = this.createStream(dest); + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + pi.connectInputAllStream(inputStream); + return inputStream; + } + + public Stream createStream(Processor processor) { + IProcessingItem pi = mapProcessorToProcessingItem.get(processor); + Stream ret = null; + Preconditions.checkNotNull(pi, "Trying to create stream from null PI"); + ret = this.createStream(pi); + if (pi instanceof EntranceProcessingItem) + ((EntranceProcessingItem) pi).setOutputStream(ret); + return ret; + } + + public EntranceProcessingItem addEntranceProcessor(EntranceProcessor entranceProcessor) { + EntranceProcessingItem pi = createEntrancePi(entranceProcessor); + if (this.mapProcessorToProcessingItem == null) + this.mapProcessorToProcessingItem = new HashMap<Processor, IProcessingItem>(); + mapProcessorToProcessingItem.put(entranceProcessor, pi); + return pi; + } + + public ProcessingItem getProcessingItem(Processor processor) { + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to retrieve null PI"); + return pi; + } + + /** + * Creates a processing item with a specific processor and paralellism level + * of 1. + * + * @param processor + * @return ProcessingItem + */ + @SuppressWarnings("unused") + private ProcessingItem createPi(Processor processor) { + return createPi(processor, 1); + } + + /** + * Creates a processing item with a specific processor and paralellism level. + * + * @param processor + * @param parallelism + * @return ProcessingItem + */ + private ProcessingItem createPi(Processor processor, int parallelism) { + ProcessingItem pi = this.componentFactory.createPi(processor, parallelism); + this.topology.addProcessingItem(pi, parallelism); + return pi; + } + + /** + * Creates a platform specific entrance processing item. + * + * @param processor + * @return + */ + private EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { + EntranceProcessingItem epi = this.componentFactory.createEntrancePi(processor); + this.topology.addEntranceProcessingItem(epi); + if (this.mapProcessorToProcessingItem == null) + this.mapProcessorToProcessingItem = new HashMap<Processor, IProcessingItem>(); + this.mapProcessorToProcessingItem.put(processor, epi); + return epi; + } + + /** + * Creates a platform specific stream. + * + * @param sourcePi + * source processing item. + * @return + */ + private Stream createStream(IProcessingItem sourcePi) { + Stream stream = this.componentFactory.createStream(sourcePi); + this.topology.addStream(stream); + return stream; + } }
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java index ac6fc3f..457e407 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java @@ -22,11 +22,12 @@ package com.yahoo.labs.samoa.utils; /** * Represents the 3 schemes to partition the streams + * * @author Anh Thu Vu - * + * */ public enum PartitioningScheme { - SHUFFLE, GROUP_BY_KEY, BROADCAST + SHUFFLE, GROUP_BY_KEY, BROADCAST } // TODO: use this enum in S4 // Storm doesn't seem to need this \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java index 2781fb8..a759b26 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java @@ -23,43 +23,42 @@ package com.yahoo.labs.samoa.utils; import com.yahoo.labs.samoa.topology.IProcessingItem; /** - * Represents one destination for streams. It has the info of: - * the ProcessingItem, parallelismHint, and partitioning scheme. - * Usage: - * - When ProcessingItem connects to a stream, it will pass - * a StreamDestination to the stream. - * - Stream manages a set of StreamDestination. - * - Used in single-threaded and multi-threaded local mode. + * Represents one destination for streams. It has the info of: the + * ProcessingItem, parallelismHint, and partitioning scheme. Usage: - When + * ProcessingItem connects to a stream, it will pass a StreamDestination to the + * stream. - Stream manages a set of StreamDestination. - Used in + * single-threaded and multi-threaded local mode. + * * @author Anh Thu Vu - * + * */ public class StreamDestination { - private IProcessingItem pi; - private int parallelism; - private PartitioningScheme type; - - /* - * Constructor - */ - public StreamDestination(IProcessingItem pi, int parallelismHint, PartitioningScheme type) { - this.pi = pi; - this.parallelism = parallelismHint; - this.type = type; - } - - /* - * Getters - */ - public IProcessingItem getProcessingItem() { - return this.pi; - } - - public int getParallelism() { - return this.parallelism; - } - - public PartitioningScheme getPartitioningScheme() { - return this.type; - } + private IProcessingItem pi; + private int parallelism; + private PartitioningScheme type; + + /* + * Constructor + */ + public StreamDestination(IProcessingItem pi, int parallelismHint, PartitioningScheme type) { + this.pi = pi; + this.parallelism = parallelismHint; + this.type = type; + } + + /* + * Getters + */ + public IProcessingItem getProcessingItem() { + return this.pi; + } + + public int getParallelism() { + return this.parallelism; + } + + public PartitioningScheme getPartitioningScheme() { + return this.type; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java index ce18d78..bd819ad 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java @@ -35,148 +35,150 @@ import java.util.zip.ZipEntry; /** * Utils class for building and deploying applications programmatically. + * * @author severien - * + * */ public class Utils { - public static void buildSamoaPackage() { - try { - String output = "/tmp/samoa/samoa.jar";// System.getProperty("user.home") + "/samoa.jar"; - Manifest manifest = createManifest(); - - BufferedOutputStream bo; - - bo = new BufferedOutputStream(new FileOutputStream(output)); - JarOutputStream jo = new JarOutputStream(bo, manifest); - - String baseDir = System.getProperty("user.dir"); - System.out.println(baseDir); - - File samoaJar = new File(baseDir+"/target/samoa-0.0.1-SNAPSHOT.jar"); - addEntry(jo,samoaJar,baseDir+"/target/","/app/"); - addLibraries(jo); - - jo.close(); - bo.close(); - } catch (IOException e) { - e.printStackTrace(); - } - - } - - // TODO should get the modules file from the parameters - public static void buildModulesPackage(List<String> modulesNames) { - System.out.println(System.getProperty("user.dir")); - try { - String baseDir = System.getProperty("user.dir"); - List<File> filesArray = new ArrayList<>(); - for (String module : modulesNames) { - module = "/"+module.replace(".", "/")+".class"; - filesArray.add(new File(baseDir+module)); - } - String output = System.getProperty("user.home") + "/modules.jar"; - - Manifest manifest = new Manifest(); - manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, - "1.0"); - manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL, - "http://samoa.yahoo.com"); - manifest.getMainAttributes().put( - Attributes.Name.IMPLEMENTATION_VERSION, "0.1"); - manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR, - "Yahoo"); - manifest.getMainAttributes().put( - Attributes.Name.IMPLEMENTATION_VENDOR_ID, "SAMOA"); - - BufferedOutputStream bo; - - bo = new BufferedOutputStream(new FileOutputStream(output)); - JarOutputStream jo = new JarOutputStream(bo, manifest); - - File[] files = filesArray.toArray(new File[filesArray.size()]); - addEntries(jo,files, baseDir, ""); - - jo.close(); - bo.close(); - } catch (IOException e) { - e.printStackTrace(); - } - - } - - private static void addLibraries(JarOutputStream jo) { - try { - String baseDir = System.getProperty("user.dir"); - String libDir = baseDir+"/target/lib"; - File inputFile = new File(libDir); - - File[] files = inputFile.listFiles(); - for (File file : files) { - addEntry(jo, file, baseDir, "lib"); - } - jo.close(); - - } catch (IOException e) { - e.printStackTrace(); - } - } - - private static void addEntries(JarOutputStream jo, File[] files, String baseDir, String rootDir){ - for (File file : files) { - - if (!file.isDirectory()) { - addEntry(jo, file, baseDir, rootDir); - } else { - File dir = new File(file.getAbsolutePath()); - addEntries(jo, dir.listFiles(), baseDir, rootDir); - } - } - } - - private static void addEntry(JarOutputStream jo, File file, String baseDir, String rootDir) { - try { - BufferedInputStream bi = new BufferedInputStream(new FileInputStream(file)); - - String path = file.getAbsolutePath().replaceFirst(baseDir, rootDir); - jo.putNextEntry(new ZipEntry(path)); - - byte[] buf = new byte[1024]; - int anz; - while ((anz = bi.read(buf)) != -1) { - jo.write(buf, 0, anz); - } - bi.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - public static Manifest createManifest() { - Manifest manifest = new Manifest(); - manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0"); - manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL, "http://samoa.yahoo.com"); - manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VERSION, "0.1"); - manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR, "Yahoo"); - manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR_ID, "SAMOA"); - Attributes s4Attributes = new Attributes(); - s4Attributes.putValue("S4-App-Class", "path.to.Class"); - Attributes.Name name = new Attributes.Name("S4-App-Class"); - Attributes.Name S4Version = new Attributes.Name("S4-Version"); - manifest.getMainAttributes().put(name, "samoa.topology.impl.DoTaskApp"); - manifest.getMainAttributes().put(S4Version, "0.6.0-incubating"); - return manifest; - } - - public static Object getInstance(String className) { + public static void buildSamoaPackage() { + try { + String output = "/tmp/samoa/samoa.jar";// System.getProperty("user.home") + // + "/samoa.jar"; + Manifest manifest = createManifest(); + + BufferedOutputStream bo; + + bo = new BufferedOutputStream(new FileOutputStream(output)); + JarOutputStream jo = new JarOutputStream(bo, manifest); + + String baseDir = System.getProperty("user.dir"); + System.out.println(baseDir); + + File samoaJar = new File(baseDir + "/target/samoa-0.0.1-SNAPSHOT.jar"); + addEntry(jo, samoaJar, baseDir + "/target/", "/app/"); + addLibraries(jo); + + jo.close(); + bo.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + // TODO should get the modules file from the parameters + public static void buildModulesPackage(List<String> modulesNames) { + System.out.println(System.getProperty("user.dir")); + try { + String baseDir = System.getProperty("user.dir"); + List<File> filesArray = new ArrayList<>(); + for (String module : modulesNames) { + module = "/" + module.replace(".", "/") + ".class"; + filesArray.add(new File(baseDir + module)); + } + String output = System.getProperty("user.home") + "/modules.jar"; + + Manifest manifest = new Manifest(); + manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, + "1.0"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL, + "http://samoa.yahoo.com"); + manifest.getMainAttributes().put( + Attributes.Name.IMPLEMENTATION_VERSION, "0.1"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR, + "Yahoo"); + manifest.getMainAttributes().put( + Attributes.Name.IMPLEMENTATION_VENDOR_ID, "SAMOA"); + + BufferedOutputStream bo; + + bo = new BufferedOutputStream(new FileOutputStream(output)); + JarOutputStream jo = new JarOutputStream(bo, manifest); + + File[] files = filesArray.toArray(new File[filesArray.size()]); + addEntries(jo, files, baseDir, ""); + + jo.close(); + bo.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + private static void addLibraries(JarOutputStream jo) { + try { + String baseDir = System.getProperty("user.dir"); + String libDir = baseDir + "/target/lib"; + File inputFile = new File(libDir); + + File[] files = inputFile.listFiles(); + for (File file : files) { + addEntry(jo, file, baseDir, "lib"); + } + jo.close(); + + } catch (IOException e) { + e.printStackTrace(); + } + } + + private static void addEntries(JarOutputStream jo, File[] files, String baseDir, String rootDir) { + for (File file : files) { + + if (!file.isDirectory()) { + addEntry(jo, file, baseDir, rootDir); + } else { + File dir = new File(file.getAbsolutePath()); + addEntries(jo, dir.listFiles(), baseDir, rootDir); + } + } + } + + private static void addEntry(JarOutputStream jo, File file, String baseDir, String rootDir) { + try { + BufferedInputStream bi = new BufferedInputStream(new FileInputStream(file)); + + String path = file.getAbsolutePath().replaceFirst(baseDir, rootDir); + jo.putNextEntry(new ZipEntry(path)); + + byte[] buf = new byte[1024]; + int anz; + while ((anz = bi.read(buf)) != -1) { + jo.write(buf, 0, anz); + } + bi.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static Manifest createManifest() { + Manifest manifest = new Manifest(); + manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL, "http://samoa.yahoo.com"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VERSION, "0.1"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR, "Yahoo"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR_ID, "SAMOA"); + Attributes s4Attributes = new Attributes(); + s4Attributes.putValue("S4-App-Class", "path.to.Class"); + Attributes.Name name = new Attributes.Name("S4-App-Class"); + Attributes.Name S4Version = new Attributes.Name("S4-Version"); + manifest.getMainAttributes().put(name, "samoa.topology.impl.DoTaskApp"); + manifest.getMainAttributes().put(S4Version, "0.6.0-incubating"); + return manifest; + } + + public static Object getInstance(String className) { Class<?> cls; - Object obj = null; - try { - cls = Class.forName(className); - obj = cls.newInstance(); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { - e.printStackTrace(); - } - return obj; - } + Object obj = null; + try { + cls = Class.forName(className); + obj = cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + e.printStackTrace(); + } + return obj; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java b/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java index f82588b..e8d589f 100644 --- a/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java +++ b/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java @@ -27,71 +27,78 @@ import org.junit.Before; import org.junit.Test; public class DoubleVectorTest { - private DoubleVector emptyVector, array5Vector; - - @Before - public void setUp() { - emptyVector = new DoubleVector(); - array5Vector = new DoubleVector(new double[] { 1.1, 2.5, 0, 4.7, 0 }); - } - - @Test - public void testGetArrayRef() { - assertThat(emptyVector.getArrayRef(), notNullValue()); - assertTrue(emptyVector.getArrayRef() == emptyVector.getArrayRef()); - assertEquals(5, array5Vector.getArrayRef().length); - } - - @Test - public void testGetArrayCopy() { - double[] arrayRef; - arrayRef = emptyVector.getArrayRef(); - assertTrue(arrayRef != emptyVector.getArrayCopy()); - assertThat(arrayRef, is(equalTo(emptyVector.getArrayCopy()))); - - arrayRef = array5Vector.getArrayRef(); - assertTrue(arrayRef != array5Vector.getArrayCopy()); - assertThat(arrayRef, is(equalTo(array5Vector.getArrayCopy()))); - } - - @Test - public void testNumNonZeroEntries() { - assertEquals(0, emptyVector.numNonZeroEntries()); - assertEquals(3, array5Vector.numNonZeroEntries()); - } - - @Test(expected = IndexOutOfBoundsException.class) - public void testGetValueOutOfBound() { - @SuppressWarnings("unused") - double value = emptyVector.getArrayRef()[0]; - } - - @Test() - public void testSetValue() { - // test automatic vector enlargement - emptyVector.setValue(0, 1.0); - assertEquals(1, emptyVector.getArrayRef().length); - assertEquals(1.0, emptyVector.getArrayRef()[0], 0.0); // should be exactly the same, so delta=0.0 - - emptyVector.setValue(5, 5.5); - assertEquals(6, emptyVector.getArrayRef().length); - assertEquals(2, emptyVector.numNonZeroEntries()); - assertEquals(5.5, emptyVector.getArrayRef()[5], 0.0); // should be exactly the same, so delta=0.0 - } - - @Test - public void testAddToValue() { - array5Vector.addToValue(2, 5.0); - assertEquals(5, array5Vector.getArrayRef()[2], 0.0); // should be exactly the same, so delta=0.0 - - // test automatic vector enlargement - emptyVector.addToValue(0, 1.0); - assertEquals(1, emptyVector.getArrayRef()[0], 0.0); // should be exactly the same, so delta=0.0 - } - - @Test - public void testSumOfValues() { - assertEquals(1.1 + 2.5 + 4.7, array5Vector.sumOfValues(), Double.MIN_NORMAL); - } + private DoubleVector emptyVector, array5Vector; + + @Before + public void setUp() { + emptyVector = new DoubleVector(); + array5Vector = new DoubleVector(new double[] { 1.1, 2.5, 0, 4.7, 0 }); + } + + @Test + public void testGetArrayRef() { + assertThat(emptyVector.getArrayRef(), notNullValue()); + assertTrue(emptyVector.getArrayRef() == emptyVector.getArrayRef()); + assertEquals(5, array5Vector.getArrayRef().length); + } + + @Test + public void testGetArrayCopy() { + double[] arrayRef; + arrayRef = emptyVector.getArrayRef(); + assertTrue(arrayRef != emptyVector.getArrayCopy()); + assertThat(arrayRef, is(equalTo(emptyVector.getArrayCopy()))); + + arrayRef = array5Vector.getArrayRef(); + assertTrue(arrayRef != array5Vector.getArrayCopy()); + assertThat(arrayRef, is(equalTo(array5Vector.getArrayCopy()))); + } + + @Test + public void testNumNonZeroEntries() { + assertEquals(0, emptyVector.numNonZeroEntries()); + assertEquals(3, array5Vector.numNonZeroEntries()); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testGetValueOutOfBound() { + @SuppressWarnings("unused") + double value = emptyVector.getArrayRef()[0]; + } + + @Test() + public void testSetValue() { + // test automatic vector enlargement + emptyVector.setValue(0, 1.0); + assertEquals(1, emptyVector.getArrayRef().length); + assertEquals(1.0, emptyVector.getArrayRef()[0], 0.0); // should be exactly + // the same, so + // delta=0.0 + + emptyVector.setValue(5, 5.5); + assertEquals(6, emptyVector.getArrayRef().length); + assertEquals(2, emptyVector.numNonZeroEntries()); + assertEquals(5.5, emptyVector.getArrayRef()[5], 0.0); // should be exactly + // the same, so + // delta=0.0 + } + + @Test + public void testAddToValue() { + array5Vector.addToValue(2, 5.0); + assertEquals(5, array5Vector.getArrayRef()[2], 0.0); // should be exactly + // the same, so + // delta=0.0 + + // test automatic vector enlargement + emptyVector.addToValue(0, 1.0); + assertEquals(1, emptyVector.getArrayRef()[0], 0.0); // should be exactly the + // same, so delta=0.0 + } + + @Test + public void testSumOfValues() { + assertEquals(1.1 + 2.5 + 4.7, array5Vector.sumOfValues(), Double.MIN_NORMAL); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java b/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java index 04f3184..51ec57d 100644 --- a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java +++ b/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java @@ -47,257 +47,260 @@ import org.junit.Before; import org.junit.Test; public class HDFSFileStreamSourceTest { - - private static final String[] HOSTS = {"localhost"}; - private static final String BASE_DIR = "/minidfsTest"; - private static final int NUM_FILES_IN_DIR = 4; - private static final int NUM_NOISE_FILES_IN_DIR = 2; - - private HDFSFileStreamSource streamSource; - - private Configuration config; - private MiniDFSCluster hdfsCluster; - private String hdfsURI; - - @Before - public void setUp() throws Exception { - // Start MiniDFSCluster - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new Configuration()).hosts(HOSTS).numDataNodes(1).format(true); - hdfsCluster = builder.build(); - hdfsCluster.waitActive(); - hdfsURI = "hdfs://localhost:"+ hdfsCluster.getNameNodePort(); - - // Construct stream source - streamSource = new HDFSFileStreamSource(); - - // General config - config = new Configuration(); - config.set("fs.defaultFS",hdfsURI); - } - - @After - public void tearDown() throws Exception { - hdfsCluster.shutdown(); - } - - /* - * Init tests - */ - @Test - public void testInitWithSingleFileAndExtension() { - // write input file - writeSimpleFiles(BASE_DIR,"txt",1); - - // init with path to input file - streamSource.init(config, BASE_DIR+"/1.txt", "txt"); - - //assertions - assertEquals("Size of filePaths is not correct.", 1,streamSource.getFilePathListSize(),0); - String fn = streamSource.getFilePathAt(0); - assertTrue("Incorrect file in filePaths.",fn.equals(BASE_DIR+"/1.txt") || fn.equals(hdfsURI+BASE_DIR+"1.txt")); - } - - @Test - public void testInitWithSingleFileAndNullExtension() { - // write input file - writeSimpleFiles(BASE_DIR,"txt",1); - - // init with path to input file - streamSource.init(config, BASE_DIR+"/1.txt", null); - - // assertions - assertEquals("Size of filePaths is not correct.", 1,streamSource.getFilePathListSize(),0); - String fn = streamSource.getFilePathAt(0); - assertTrue("Incorrect file in filePaths.",fn.equals(BASE_DIR+"/1.txt") || fn.equals(hdfsURI+BASE_DIR+"1.txt")); - } - - @Test - public void testInitWithFolderAndExtension() { - // write input files & noise files - writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR); - writeSimpleFiles(BASE_DIR,null,NUM_NOISE_FILES_IN_DIR); - - // init with path to input dir - streamSource.init(config, BASE_DIR, "txt"); - - // assertions - assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR,streamSource.getFilePathListSize(),0); - Set<String> filenames = new HashSet<String>(); - for (int i=1; i<=NUM_FILES_IN_DIR; i++) { - String targetFn = BASE_DIR+"/"+Integer.toString(i)+".txt"; - filenames.add(targetFn); - filenames.add(hdfsURI+targetFn); - } - for (int i=0; i<NUM_FILES_IN_DIR; i++) { - String fn = streamSource.getFilePathAt(i); - assertTrue("Incorrect file in filePaths:"+fn,filenames.contains(fn)); - } - } - - @Test - public void testInitWithFolderAndNullExtension() { - // write input file - writeSimpleFiles(BASE_DIR,null,NUM_FILES_IN_DIR); - - // init with path to input dir - streamSource.init(config, BASE_DIR, null); - - // assertions - assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR,streamSource.getFilePathListSize(),0); - Set<String> filenames = new HashSet<String>(); - for (int i=1; i<=NUM_FILES_IN_DIR; i++) { - String targetFn = BASE_DIR+"/"+Integer.toString(i); - filenames.add(targetFn); - filenames.add(hdfsURI+targetFn); - } - for (int i=0; i< NUM_FILES_IN_DIR; i++) { - String fn = streamSource.getFilePathAt(i); - assertTrue("Incorrect file in filePaths:"+fn,filenames.contains(fn)); - } - } - - /* - * getNextInputStream tests - */ - @Test - public void testGetNextInputStream() { - // write input files & noise files - writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR); - - // init with path to input dir - streamSource.init(config, BASE_DIR, "txt"); - - // call getNextInputStream & assertions - Set<String> contents = new HashSet<String>(); - for (int i=1; i<=NUM_FILES_IN_DIR; i++) { - contents.add(Integer.toString(i)); - } - for (int i=0; i< NUM_FILES_IN_DIR; i++) { - InputStream inStream = streamSource.getNextInputStream(); - assertNotNull("Unexpected end of input stream list.",inStream); - - BufferedReader rd = new BufferedReader(new InputStreamReader(inStream)); - String inputRead = null; - try { - inputRead = rd.readLine(); - } catch (IOException ioe) { - fail("Fail reading from stream at index:"+i + ioe.getMessage()); - } - assertTrue("File content is incorrect.",contents.contains(inputRead)); - Iterator<String> it = contents.iterator(); - while (it.hasNext()) { - if (it.next().equals(inputRead)) { - it.remove(); - break; - } - } - } - - // assert that another call to getNextInputStream will return null - assertNull("Call getNextInputStream after the last file did not return null.",streamSource.getNextInputStream()); - } - - /* - * getCurrentInputStream tests - */ - public void testGetCurrentInputStream() { - // write input files & noise files - writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR); - - // init with path to input dir - streamSource.init(config, BASE_DIR, "txt"); - - // call getNextInputStream, getCurrentInputStream & assertions - for (int i=0; i<= NUM_FILES_IN_DIR; i++) { // test also after-end-of-list - InputStream inStream1 = streamSource.getNextInputStream(); - InputStream inStream2 = streamSource.getCurrentInputStream(); - assertSame("Incorrect current input stream.",inStream1, inStream2); - } - } - - /* - * reset tests - */ - public void testReset() { - // write input files & noise files - writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR); - - // init with path to input dir - streamSource.init(config, BASE_DIR, "txt"); - - // Get the first input string - InputStream firstInStream = streamSource.getNextInputStream(); - String firstInput = null; - assertNotNull("Unexpected end of input stream list.",firstInStream); - - BufferedReader rd1 = new BufferedReader(new InputStreamReader(firstInStream)); - try { - firstInput = rd1.readLine(); - } catch (IOException ioe) { - fail("Fail reading from stream at index:0" + ioe.getMessage()); - } - - // call getNextInputStream a few times - streamSource.getNextInputStream(); - - // call reset, call next, assert that output is 1 (the first file) - try { - streamSource.reset(); - } catch (IOException ioe) { - fail("Fail resetting stream source." + ioe.getMessage()); - } - - InputStream inStream = streamSource.getNextInputStream(); - assertNotNull("Unexpected end of input stream list.",inStream); - - BufferedReader rd2 = new BufferedReader(new InputStreamReader(inStream)); - String inputRead = null; - try { - inputRead = rd2.readLine(); - } catch (IOException ioe) { - fail("Fail reading from stream at index:0" + ioe.getMessage()); - } - assertEquals("File content is incorrect.",firstInput,inputRead); - } - - private void writeSimpleFiles(String path, String ext, int numOfFiles) { - // get filesystem - FileSystem dfs; - try { - dfs = hdfsCluster.getFileSystem(); - } catch (IOException ioe) { - fail("Could not access MiniDFSCluster" + ioe.getMessage()); - return; - } - - // create basedir - Path basedir = new Path(path); - try { - dfs.mkdirs(basedir); - } catch (IOException ioe) { - fail("Could not create DIR:"+ path + "\n" + ioe.getMessage()); - return; - } - - // write files - for (int i=1; i<=numOfFiles; i++) { - String fn = null; - if (ext != null) { - fn = Integer.toString(i) + "."+ ext; - } else { - fn = Integer.toString(i); - } - - try { - OutputStream fin = dfs.create(new Path(path,fn)); - BufferedWriter wr = new BufferedWriter(new OutputStreamWriter(fin)); - wr.write(Integer.toString(i)); - wr.close(); - fin.close(); - } catch (IOException ioe) { - fail("Fail writing to input file: "+ fn + " in directory: " + path + ioe.getMessage()); - } - } - } + + private static final String[] HOSTS = { "localhost" }; + private static final String BASE_DIR = "/minidfsTest"; + private static final int NUM_FILES_IN_DIR = 4; + private static final int NUM_NOISE_FILES_IN_DIR = 2; + + private HDFSFileStreamSource streamSource; + + private Configuration config; + private MiniDFSCluster hdfsCluster; + private String hdfsURI; + + @Before + public void setUp() throws Exception { + // Start MiniDFSCluster + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new Configuration()).hosts(HOSTS).numDataNodes(1) + .format(true); + hdfsCluster = builder.build(); + hdfsCluster.waitActive(); + hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort(); + + // Construct stream source + streamSource = new HDFSFileStreamSource(); + + // General config + config = new Configuration(); + config.set("fs.defaultFS", hdfsURI); + } + + @After + public void tearDown() throws Exception { + hdfsCluster.shutdown(); + } + + /* + * Init tests + */ + @Test + public void testInitWithSingleFileAndExtension() { + // write input file + writeSimpleFiles(BASE_DIR, "txt", 1); + + // init with path to input file + streamSource.init(config, BASE_DIR + "/1.txt", "txt"); + + // assertions + assertEquals("Size of filePaths is not correct.", 1, streamSource.getFilePathListSize(), 0); + String fn = streamSource.getFilePathAt(0); + assertTrue("Incorrect file in filePaths.", + fn.equals(BASE_DIR + "/1.txt") || fn.equals(hdfsURI + BASE_DIR + "1.txt")); + } + + @Test + public void testInitWithSingleFileAndNullExtension() { + // write input file + writeSimpleFiles(BASE_DIR, "txt", 1); + + // init with path to input file + streamSource.init(config, BASE_DIR + "/1.txt", null); + + // assertions + assertEquals("Size of filePaths is not correct.", 1, streamSource.getFilePathListSize(), 0); + String fn = streamSource.getFilePathAt(0); + assertTrue("Incorrect file in filePaths.", + fn.equals(BASE_DIR + "/1.txt") || fn.equals(hdfsURI + BASE_DIR + "1.txt")); + } + + @Test + public void testInitWithFolderAndExtension() { + // write input files & noise files + writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); + writeSimpleFiles(BASE_DIR, null, NUM_NOISE_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(config, BASE_DIR, "txt"); + + // assertions + assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, streamSource.getFilePathListSize(), 0); + Set<String> filenames = new HashSet<String>(); + for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { + String targetFn = BASE_DIR + "/" + Integer.toString(i) + ".txt"; + filenames.add(targetFn); + filenames.add(hdfsURI + targetFn); + } + for (int i = 0; i < NUM_FILES_IN_DIR; i++) { + String fn = streamSource.getFilePathAt(i); + assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn)); + } + } + + @Test + public void testInitWithFolderAndNullExtension() { + // write input file + writeSimpleFiles(BASE_DIR, null, NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(config, BASE_DIR, null); + + // assertions + assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, streamSource.getFilePathListSize(), 0); + Set<String> filenames = new HashSet<String>(); + for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { + String targetFn = BASE_DIR + "/" + Integer.toString(i); + filenames.add(targetFn); + filenames.add(hdfsURI + targetFn); + } + for (int i = 0; i < NUM_FILES_IN_DIR; i++) { + String fn = streamSource.getFilePathAt(i); + assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn)); + } + } + + /* + * getNextInputStream tests + */ + @Test + public void testGetNextInputStream() { + // write input files & noise files + writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(config, BASE_DIR, "txt"); + + // call getNextInputStream & assertions + Set<String> contents = new HashSet<String>(); + for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { + contents.add(Integer.toString(i)); + } + for (int i = 0; i < NUM_FILES_IN_DIR; i++) { + InputStream inStream = streamSource.getNextInputStream(); + assertNotNull("Unexpected end of input stream list.", inStream); + + BufferedReader rd = new BufferedReader(new InputStreamReader(inStream)); + String inputRead = null; + try { + inputRead = rd.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:" + i + ioe.getMessage()); + } + assertTrue("File content is incorrect.", contents.contains(inputRead)); + Iterator<String> it = contents.iterator(); + while (it.hasNext()) { + if (it.next().equals(inputRead)) { + it.remove(); + break; + } + } + } + + // assert that another call to getNextInputStream will return null + assertNull("Call getNextInputStream after the last file did not return null.", streamSource.getNextInputStream()); + } + + /* + * getCurrentInputStream tests + */ + public void testGetCurrentInputStream() { + // write input files & noise files + writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(config, BASE_DIR, "txt"); + + // call getNextInputStream, getCurrentInputStream & assertions + for (int i = 0; i <= NUM_FILES_IN_DIR; i++) { // test also after-end-of-list + InputStream inStream1 = streamSource.getNextInputStream(); + InputStream inStream2 = streamSource.getCurrentInputStream(); + assertSame("Incorrect current input stream.", inStream1, inStream2); + } + } + + /* + * reset tests + */ + public void testReset() { + // write input files & noise files + writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(config, BASE_DIR, "txt"); + + // Get the first input string + InputStream firstInStream = streamSource.getNextInputStream(); + String firstInput = null; + assertNotNull("Unexpected end of input stream list.", firstInStream); + + BufferedReader rd1 = new BufferedReader(new InputStreamReader(firstInStream)); + try { + firstInput = rd1.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:0" + ioe.getMessage()); + } + + // call getNextInputStream a few times + streamSource.getNextInputStream(); + + // call reset, call next, assert that output is 1 (the first file) + try { + streamSource.reset(); + } catch (IOException ioe) { + fail("Fail resetting stream source." + ioe.getMessage()); + } + + InputStream inStream = streamSource.getNextInputStream(); + assertNotNull("Unexpected end of input stream list.", inStream); + + BufferedReader rd2 = new BufferedReader(new InputStreamReader(inStream)); + String inputRead = null; + try { + inputRead = rd2.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:0" + ioe.getMessage()); + } + assertEquals("File content is incorrect.", firstInput, inputRead); + } + + private void writeSimpleFiles(String path, String ext, int numOfFiles) { + // get filesystem + FileSystem dfs; + try { + dfs = hdfsCluster.getFileSystem(); + } catch (IOException ioe) { + fail("Could not access MiniDFSCluster" + ioe.getMessage()); + return; + } + + // create basedir + Path basedir = new Path(path); + try { + dfs.mkdirs(basedir); + } catch (IOException ioe) { + fail("Could not create DIR:" + path + "\n" + ioe.getMessage()); + return; + } + + // write files + for (int i = 1; i <= numOfFiles; i++) { + String fn = null; + if (ext != null) { + fn = Integer.toString(i) + "." + ext; + } else { + fn = Integer.toString(i); + } + + try { + OutputStream fin = dfs.create(new Path(path, fn)); + BufferedWriter wr = new BufferedWriter(new OutputStreamWriter(fin)); + wr.write(Integer.toString(i)); + wr.close(); + fin.close(); + } catch (IOException ioe) { + fail("Fail writing to input file: " + fn + " in directory: " + path + ioe.getMessage()); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java b/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java index 21ca378..b121425 100644 --- a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java +++ b/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java @@ -43,234 +43,234 @@ import org.junit.Test; import org.apache.commons.io.FileUtils; public class LocalFileStreamSourceTest { - private static final String BASE_DIR = "localfsTest"; - private static final int NUM_FILES_IN_DIR = 4; - private static final int NUM_NOISE_FILES_IN_DIR = 2; - - private LocalFileStreamSource streamSource; - - @Before - public void setUp() throws Exception { - streamSource = new LocalFileStreamSource(); - - } - - @After - public void tearDown() throws Exception { - FileUtils.deleteDirectory(new File(BASE_DIR)); - } - - @Test - public void testInitWithSingleFileAndExtension() { - // write input file - writeSimpleFiles(BASE_DIR,"txt",1); - - // init with path to input file - File inFile = new File(BASE_DIR,"1.txt"); - String inFilePath = inFile.getAbsolutePath(); - streamSource.init(inFilePath, "txt"); - - //assertions - assertEquals("Size of filePaths is not correct.", 1,streamSource.getFilePathListSize(),0); - String fn = streamSource.getFilePathAt(0); - assertEquals("Incorrect file in filePaths.",inFilePath,fn); - } - - @Test - public void testInitWithSingleFileAndNullExtension() { - // write input file - writeSimpleFiles(BASE_DIR,"txt",1); - - // init with path to input file - File inFile = new File(BASE_DIR,"1.txt"); - String inFilePath = inFile.getAbsolutePath(); - streamSource.init(inFilePath, null); - - //assertions - assertEquals("Size of filePaths is not correct.", 1,streamSource.getFilePathListSize(),0); - String fn = streamSource.getFilePathAt(0); - assertEquals("Incorrect file in filePaths.",inFilePath,fn); - } - - @Test - public void testInitWithFolderAndExtension() { - // write input file - writeSimpleFiles(BASE_DIR,null,NUM_NOISE_FILES_IN_DIR); - writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR); - - // init with path to input dir - File inDir = new File(BASE_DIR); - String inDirPath = inDir.getAbsolutePath(); - streamSource.init(inDirPath, "txt"); - - //assertions - assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR,streamSource.getFilePathListSize(),0); - Set<String> filenames = new HashSet<String>(); - for (int i=1; i<=NUM_FILES_IN_DIR; i++) { - String expectedFn = (new File(inDirPath,Integer.toString(i)+".txt")).getAbsolutePath(); - filenames.add(expectedFn); - } - for (int i=0; i< NUM_FILES_IN_DIR; i++) { - String fn = streamSource.getFilePathAt(i); - assertTrue("Incorrect file in filePaths:"+fn,filenames.contains(fn)); - } - } - - @Test - public void testInitWithFolderAndNullExtension() { - // write input file - writeSimpleFiles(BASE_DIR,null,NUM_FILES_IN_DIR); - - // init with path to input dir - File inDir = new File(BASE_DIR); - String inDirPath = inDir.getAbsolutePath(); - streamSource.init(inDirPath, null); - - //assertions - assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR,streamSource.getFilePathListSize(),0); - Set<String> filenames = new HashSet<String>(); - for (int i=1; i<=NUM_FILES_IN_DIR; i++) { - String expectedFn = (new File(inDirPath,Integer.toString(i))).getAbsolutePath(); - filenames.add(expectedFn); - } - for (int i=0; i< NUM_FILES_IN_DIR; i++) { - String fn = streamSource.getFilePathAt(i); - assertTrue("Incorrect file in filePaths:"+fn,filenames.contains(fn)); - } - } - - /* - * getNextInputStream tests - */ - @Test - public void testGetNextInputStream() { - // write input files & noise files - writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR); - - // init with path to input dir - streamSource.init(BASE_DIR, "txt"); - - // call getNextInputStream & assertions - Set<String> contents = new HashSet<String>(); - for (int i=1; i<=NUM_FILES_IN_DIR; i++) { - contents.add(Integer.toString(i)); - } - for (int i=0; i< NUM_FILES_IN_DIR; i++) { - InputStream inStream = streamSource.getNextInputStream(); - assertNotNull("Unexpected end of input stream list.",inStream); - - BufferedReader rd = new BufferedReader(new InputStreamReader(inStream)); - String inputRead = null; - try { - inputRead = rd.readLine(); - } catch (IOException ioe) { - fail("Fail reading from stream at index:"+i + ioe.getMessage()); - } - assertTrue("File content is incorrect.",contents.contains(inputRead)); - Iterator<String> it = contents.iterator(); - while (it.hasNext()) { - if (it.next().equals(inputRead)) { - it.remove(); - break; - } - } - } - - // assert that another call to getNextInputStream will return null - assertNull("Call getNextInputStream after the last file did not return null.",streamSource.getNextInputStream()); - } - - /* - * getCurrentInputStream tests - */ - public void testGetCurrentInputStream() { - // write input files & noise files - writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR); - - // init with path to input dir - streamSource.init(BASE_DIR, "txt"); - - // call getNextInputStream, getCurrentInputStream & assertions - for (int i=0; i<= NUM_FILES_IN_DIR; i++) { // test also after-end-of-list - InputStream inStream1 = streamSource.getNextInputStream(); - InputStream inStream2 = streamSource.getCurrentInputStream(); - assertSame("Incorrect current input stream.",inStream1, inStream2); - } - } - - /* - * reset tests - */ - public void testReset() { - // write input files & noise files - writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR); - - // init with path to input dir - streamSource.init(BASE_DIR, "txt"); - - // Get the first input string - InputStream firstInStream = streamSource.getNextInputStream(); - String firstInput = null; - assertNotNull("Unexpected end of input stream list.",firstInStream); - - BufferedReader rd1 = new BufferedReader(new InputStreamReader(firstInStream)); - try { - firstInput = rd1.readLine(); - } catch (IOException ioe) { - fail("Fail reading from stream at index:0" + ioe.getMessage()); - } - - // call getNextInputStream a few times - streamSource.getNextInputStream(); - - // call reset, call next, assert that output is 1 (the first file) - try { - streamSource.reset(); - } catch (IOException ioe) { - fail("Fail resetting stream source." + ioe.getMessage()); - } - - InputStream inStream = streamSource.getNextInputStream(); - assertNotNull("Unexpected end of input stream list.",inStream); - - BufferedReader rd2 = new BufferedReader(new InputStreamReader(inStream)); - String inputRead = null; - try { - inputRead = rd2.readLine(); - } catch (IOException ioe) { - fail("Fail reading from stream at index:0" + ioe.getMessage()); - } - assertEquals("File content is incorrect.",firstInput,inputRead); - } - - private void writeSimpleFiles(String path, String ext, int numOfFiles) { - // Create folder - File folder = new File(path); - if (!folder.exists()) { - try{ - folder.mkdir(); - } catch(SecurityException se){ - fail("Failed creating directory:"+path+se); - } - } - - // Write files - for (int i=1; i<=numOfFiles; i++) { - String fn = null; - if (ext != null) { - fn = Integer.toString(i) + "."+ ext; - } else { - fn = Integer.toString(i); - } - - try { - FileWriter fwr = new FileWriter(new File(path,fn)); - fwr.write(Integer.toString(i)); - fwr.close(); - } catch (IOException ioe) { - fail("Fail writing to input file: "+ fn + " in directory: " + path + ioe.getMessage()); - } - } - } + private static final String BASE_DIR = "localfsTest"; + private static final int NUM_FILES_IN_DIR = 4; + private static final int NUM_NOISE_FILES_IN_DIR = 2; + + private LocalFileStreamSource streamSource; + + @Before + public void setUp() throws Exception { + streamSource = new LocalFileStreamSource(); + + } + + @After + public void tearDown() throws Exception { + FileUtils.deleteDirectory(new File(BASE_DIR)); + } + + @Test + public void testInitWithSingleFileAndExtension() { + // write input file + writeSimpleFiles(BASE_DIR, "txt", 1); + + // init with path to input file + File inFile = new File(BASE_DIR, "1.txt"); + String inFilePath = inFile.getAbsolutePath(); + streamSource.init(inFilePath, "txt"); + + // assertions + assertEquals("Size of filePaths is not correct.", 1, streamSource.getFilePathListSize(), 0); + String fn = streamSource.getFilePathAt(0); + assertEquals("Incorrect file in filePaths.", inFilePath, fn); + } + + @Test + public void testInitWithSingleFileAndNullExtension() { + // write input file + writeSimpleFiles(BASE_DIR, "txt", 1); + + // init with path to input file + File inFile = new File(BASE_DIR, "1.txt"); + String inFilePath = inFile.getAbsolutePath(); + streamSource.init(inFilePath, null); + + // assertions + assertEquals("Size of filePaths is not correct.", 1, streamSource.getFilePathListSize(), 0); + String fn = streamSource.getFilePathAt(0); + assertEquals("Incorrect file in filePaths.", inFilePath, fn); + } + + @Test + public void testInitWithFolderAndExtension() { + // write input file + writeSimpleFiles(BASE_DIR, null, NUM_NOISE_FILES_IN_DIR); + writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); + + // init with path to input dir + File inDir = new File(BASE_DIR); + String inDirPath = inDir.getAbsolutePath(); + streamSource.init(inDirPath, "txt"); + + // assertions + assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, streamSource.getFilePathListSize(), 0); + Set<String> filenames = new HashSet<String>(); + for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { + String expectedFn = (new File(inDirPath, Integer.toString(i) + ".txt")).getAbsolutePath(); + filenames.add(expectedFn); + } + for (int i = 0; i < NUM_FILES_IN_DIR; i++) { + String fn = streamSource.getFilePathAt(i); + assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn)); + } + } + + @Test + public void testInitWithFolderAndNullExtension() { + // write input file + writeSimpleFiles(BASE_DIR, null, NUM_FILES_IN_DIR); + + // init with path to input dir + File inDir = new File(BASE_DIR); + String inDirPath = inDir.getAbsolutePath(); + streamSource.init(inDirPath, null); + + // assertions + assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, streamSource.getFilePathListSize(), 0); + Set<String> filenames = new HashSet<String>(); + for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { + String expectedFn = (new File(inDirPath, Integer.toString(i))).getAbsolutePath(); + filenames.add(expectedFn); + } + for (int i = 0; i < NUM_FILES_IN_DIR; i++) { + String fn = streamSource.getFilePathAt(i); + assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn)); + } + } + + /* + * getNextInputStream tests + */ + @Test + public void testGetNextInputStream() { + // write input files & noise files + writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(BASE_DIR, "txt"); + + // call getNextInputStream & assertions + Set<String> contents = new HashSet<String>(); + for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { + contents.add(Integer.toString(i)); + } + for (int i = 0; i < NUM_FILES_IN_DIR; i++) { + InputStream inStream = streamSource.getNextInputStream(); + assertNotNull("Unexpected end of input stream list.", inStream); + + BufferedReader rd = new BufferedReader(new InputStreamReader(inStream)); + String inputRead = null; + try { + inputRead = rd.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:" + i + ioe.getMessage()); + } + assertTrue("File content is incorrect.", contents.contains(inputRead)); + Iterator<String> it = contents.iterator(); + while (it.hasNext()) { + if (it.next().equals(inputRead)) { + it.remove(); + break; + } + } + } + + // assert that another call to getNextInputStream will return null + assertNull("Call getNextInputStream after the last file did not return null.", streamSource.getNextInputStream()); + } + + /* + * getCurrentInputStream tests + */ + public void testGetCurrentInputStream() { + // write input files & noise files + writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(BASE_DIR, "txt"); + + // call getNextInputStream, getCurrentInputStream & assertions + for (int i = 0; i <= NUM_FILES_IN_DIR; i++) { // test also after-end-of-list + InputStream inStream1 = streamSource.getNextInputStream(); + InputStream inStream2 = streamSource.getCurrentInputStream(); + assertSame("Incorrect current input stream.", inStream1, inStream2); + } + } + + /* + * reset tests + */ + public void testReset() { + // write input files & noise files + writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(BASE_DIR, "txt"); + + // Get the first input string + InputStream firstInStream = streamSource.getNextInputStream(); + String firstInput = null; + assertNotNull("Unexpected end of input stream list.", firstInStream); + + BufferedReader rd1 = new BufferedReader(new InputStreamReader(firstInStream)); + try { + firstInput = rd1.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:0" + ioe.getMessage()); + } + + // call getNextInputStream a few times + streamSource.getNextInputStream(); + + // call reset, call next, assert that output is 1 (the first file) + try { + streamSource.reset(); + } catch (IOException ioe) { + fail("Fail resetting stream source." + ioe.getMessage()); + } + + InputStream inStream = streamSource.getNextInputStream(); + assertNotNull("Unexpected end of input stream list.", inStream); + + BufferedReader rd2 = new BufferedReader(new InputStreamReader(inStream)); + String inputRead = null; + try { + inputRead = rd2.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:0" + ioe.getMessage()); + } + assertEquals("File content is incorrect.", firstInput, inputRead); + } + + private void writeSimpleFiles(String path, String ext, int numOfFiles) { + // Create folder + File folder = new File(path); + if (!folder.exists()) { + try { + folder.mkdir(); + } catch (SecurityException se) { + fail("Failed creating directory:" + path + se); + } + } + + // Write files + for (int i = 1; i <= numOfFiles; i++) { + String fn = null; + if (ext != null) { + fn = Integer.toString(i) + "." + ext; + } else { + fn = Integer.toString(i); + } + + try { + FileWriter fwr = new FileWriter(new File(path, fn)); + fwr.write(Integer.toString(i)); + fwr.close(); + } catch (IOException ioe) { + fail("Fail writing to input file: " + fn + " in directory: " + path + ioe.getMessage()); + } + } + } }
