http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java index 079423c..a4f885e 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java @@ -34,117 +34,117 @@ import org.apache.hadoop.io.IOUtils; /** * Source for FileStream for HDFS files + * * @author Casey */ public class HDFSFileStreamSource implements FileStreamSource { - - /** + + /** * */ - private static final long serialVersionUID = -3887354805787167400L; - - private transient InputStream fileStream; - private transient Configuration config; - private List<String> filePaths; - private int currentIndex; - - public HDFSFileStreamSource(){ - this.currentIndex = -1; - } - - public void init(String path, String ext) { - this.init(this.getDefaultConfig(), path, ext); - } - - public void init(Configuration config, String path, String ext) { - this.config = config; - this.filePaths = new ArrayList<String>(); - Path hdfsPath = new Path(path); - FileSystem fs; - try { - fs = FileSystem.get(config); - FileStatus fileStat = fs.getFileStatus(hdfsPath); - if (fileStat.isDirectory()) { - Path filterPath = hdfsPath; - if (ext != null) { - filterPath = new Path(path.toString(),"*."+ext); - } - else { - filterPath = new Path(path.toString(),"*"); - } - FileStatus[] filesInDir = fs.globStatus(filterPath); - for (int i=0; i<filesInDir.length; i++) { - if (filesInDir[i].isFile()) { - filePaths.add(filesInDir[i].getPath().toString()); - } - } - } - else { - this.filePaths.add(path); - } - } - catch(IOException ioe) { - throw new RuntimeException("Failed getting list of files at:"+path,ioe); - } - - this.currentIndex = -1; - } - - private Configuration getDefaultConfig() { - String hadoopHome = System.getenv("HADOOP_HOME"); - Configuration conf = new Configuration(); - if (hadoopHome != null) { - java.nio.file.Path coreSitePath = FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/core-site.xml"); - java.nio.file.Path hdfsSitePath = FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/hdfs-site.xml"); - conf.addResource(new Path(coreSitePath.toAbsolutePath().toString())); - conf.addResource(new Path(hdfsSitePath.toAbsolutePath().toString())); + private static final long serialVersionUID = -3887354805787167400L; + + private transient InputStream fileStream; + private transient Configuration config; + private List<String> filePaths; + private int currentIndex; + + public HDFSFileStreamSource() { + this.currentIndex = -1; + } + + public void init(String path, String ext) { + this.init(this.getDefaultConfig(), path, ext); + } + + public void init(Configuration config, String path, String ext) { + this.config = config; + this.filePaths = new ArrayList<String>(); + Path hdfsPath = new Path(path); + FileSystem fs; + try { + fs = FileSystem.get(config); + FileStatus fileStat = fs.getFileStatus(hdfsPath); + if (fileStat.isDirectory()) { + Path filterPath = hdfsPath; + if (ext != null) { + filterPath = new Path(path.toString(), "*." + ext); } - return conf; - } - - public void reset() throws IOException { - this.currentIndex = -1; - this.closeFileStream(); - } - - private void closeFileStream() { - IOUtils.closeStream(fileStream); - } - - public InputStream getNextInputStream() { - this.closeFileStream(); - if (this.currentIndex >= (this.filePaths.size()-1)) return null; - - this.currentIndex++; - String filePath = this.filePaths.get(currentIndex); - - Path hdfsPath = new Path(filePath); - FileSystem fs; - try { - fs = FileSystem.get(config); - fileStream = fs.open(hdfsPath); + else { + filterPath = new Path(path.toString(), "*"); } - catch(IOException ioe) { - this.closeFileStream(); - throw new RuntimeException("Failed opening file:"+filePath,ioe); + FileStatus[] filesInDir = fs.globStatus(filterPath); + for (int i = 0; i < filesInDir.length; i++) { + if (filesInDir[i].isFile()) { + filePaths.add(filesInDir[i].getPath().toString()); + } } - - return fileStream; - } - - public InputStream getCurrentInputStream() { - return fileStream; - } - - protected int getFilePathListSize() { - if (filePaths != null) - return filePaths.size(); - return 0; - } - - protected String getFilePathAt(int index) { - if (filePaths != null && filePaths.size() > index) - return filePaths.get(index); - return null; - } + } + else { + this.filePaths.add(path); + } + } catch (IOException ioe) { + throw new RuntimeException("Failed getting list of files at:" + path, ioe); + } + + this.currentIndex = -1; + } + + private Configuration getDefaultConfig() { + String hadoopHome = System.getenv("HADOOP_HOME"); + Configuration conf = new Configuration(); + if (hadoopHome != null) { + java.nio.file.Path coreSitePath = FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/core-site.xml"); + java.nio.file.Path hdfsSitePath = FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/hdfs-site.xml"); + conf.addResource(new Path(coreSitePath.toAbsolutePath().toString())); + conf.addResource(new Path(hdfsSitePath.toAbsolutePath().toString())); + } + return conf; + } + + public void reset() throws IOException { + this.currentIndex = -1; + this.closeFileStream(); + } + + private void closeFileStream() { + IOUtils.closeStream(fileStream); + } + + public InputStream getNextInputStream() { + this.closeFileStream(); + if (this.currentIndex >= (this.filePaths.size() - 1)) + return null; + + this.currentIndex++; + String filePath = this.filePaths.get(currentIndex); + + Path hdfsPath = new Path(filePath); + FileSystem fs; + try { + fs = FileSystem.get(config); + fileStream = fs.open(hdfsPath); + } catch (IOException ioe) { + this.closeFileStream(); + throw new RuntimeException("Failed opening file:" + filePath, ioe); + } + + return fileStream; + } + + public InputStream getCurrentInputStream() { + return fileStream; + } + + protected int getFilePathListSize() { + if (filePaths != null) + return filePaths.size(); + return 0; + } + + protected String getFilePathAt(int index) { + if (filePaths != null && filePaths.size() > index) + return filePaths.get(index); + return null; + } }
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java index c0ab44f..e4ceb70 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java @@ -31,101 +31,103 @@ import java.util.List; /** * Source for FileStream for local files + * * @author Casey */ public class LocalFileStreamSource implements FileStreamSource { - /** + /** * */ - private static final long serialVersionUID = 3986511547525870698L; - - private transient InputStream fileStream; - private List<String> filePaths; - private int currentIndex; - - public LocalFileStreamSource(){ - this.currentIndex = -1; - } - - public void init(String path, String ext) { - this.filePaths = new ArrayList<String>(); - File fileAtPath = new File(path); - if (fileAtPath.isDirectory()) { - File[] filesInDir = fileAtPath.listFiles(new FileExtensionFilter(ext)); - for (int i=0; i<filesInDir.length; i++) { - filePaths.add(filesInDir[i].getAbsolutePath()); - } - } - else { - this.filePaths.add(path); - } - this.currentIndex = -1; - } - - public void reset() throws IOException { - this.currentIndex = -1; - this.closeFileStream(); - } - - private void closeFileStream() { - if (fileStream != null) { - try { - fileStream.close(); - } catch (IOException ioe) { - ioe.printStackTrace(); - } - } - } - - public InputStream getNextInputStream() { - this.closeFileStream(); - - if (this.currentIndex >= (this.filePaths.size()-1)) return null; - - this.currentIndex++; - String filePath = this.filePaths.get(currentIndex); - - File file = new File(filePath); - try { - fileStream = new FileInputStream(file); - } - catch(IOException ioe) { - this.closeFileStream(); - throw new RuntimeException("Failed opening file:"+filePath,ioe); - } - - return fileStream; - } - - public InputStream getCurrentInputStream() { - return fileStream; - } - - protected int getFilePathListSize() { - if (filePaths != null) - return filePaths.size(); - return 0; - } - - protected String getFilePathAt(int index) { - if (filePaths != null && filePaths.size() > index) - return filePaths.get(index); - return null; - } - - private class FileExtensionFilter implements FilenameFilter { - private String extension; - FileExtensionFilter(String ext) { - extension = ext; - } - - @Override - public boolean accept(File dir, String name) { - File f = new File(dir,name); - if (extension == null) - return f.isFile(); - else - return f.isFile() && name.toLowerCase().endsWith("."+extension); - } - } + private static final long serialVersionUID = 3986511547525870698L; + + private transient InputStream fileStream; + private List<String> filePaths; + private int currentIndex; + + public LocalFileStreamSource() { + this.currentIndex = -1; + } + + public void init(String path, String ext) { + this.filePaths = new ArrayList<String>(); + File fileAtPath = new File(path); + if (fileAtPath.isDirectory()) { + File[] filesInDir = fileAtPath.listFiles(new FileExtensionFilter(ext)); + for (int i = 0; i < filesInDir.length; i++) { + filePaths.add(filesInDir[i].getAbsolutePath()); + } + } + else { + this.filePaths.add(path); + } + this.currentIndex = -1; + } + + public void reset() throws IOException { + this.currentIndex = -1; + this.closeFileStream(); + } + + private void closeFileStream() { + if (fileStream != null) { + try { + fileStream.close(); + } catch (IOException ioe) { + ioe.printStackTrace(); + } + } + } + + public InputStream getNextInputStream() { + this.closeFileStream(); + + if (this.currentIndex >= (this.filePaths.size() - 1)) + return null; + + this.currentIndex++; + String filePath = this.filePaths.get(currentIndex); + + File file = new File(filePath); + try { + fileStream = new FileInputStream(file); + } catch (IOException ioe) { + this.closeFileStream(); + throw new RuntimeException("Failed opening file:" + filePath, ioe); + } + + return fileStream; + } + + public InputStream getCurrentInputStream() { + return fileStream; + } + + protected int getFilePathListSize() { + if (filePaths != null) + return filePaths.size(); + return 0; + } + + protected String getFilePathAt(int index) { + if (filePaths != null && filePaths.size() > index) + return filePaths.get(index); + return null; + } + + private class FileExtensionFilter implements FilenameFilter { + private String extension; + + FileExtensionFilter(String ext) { + extension = ext; + } + + @Override + public boolean accept(File dir, String name) { + File f = new File(dir, name); + if (extension == null) + return f.isFile(); + else + return f.isFile() && name.toLowerCase().endsWith("." + extension); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java index 4af3764..c62de1e 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java @@ -50,125 +50,138 @@ import com.yahoo.labs.samoa.topology.TopologyBuilder; */ public class ClusteringEvaluation implements Task, Configurable { - private static final long serialVersionUID = -8246537378371580550L; + private static final long serialVersionUID = -8246537378371580550L; - private static final int DISTRIBUTOR_PARALLELISM = 1; + private static final int DISTRIBUTOR_PARALLELISM = 1; - private static final Logger logger = LoggerFactory.getLogger(ClusteringEvaluation.class); + private static final Logger logger = LoggerFactory.getLogger(ClusteringEvaluation.class); - public ClassOption learnerOption = new ClassOption("learner", 'l', "Clustering to run.", Learner.class, DistributedClusterer.class.getName()); + public ClassOption learnerOption = new ClassOption("learner", 'l', "Clustering to run.", Learner.class, + DistributedClusterer.class.getName()); - public ClassOption streamTrainOption = new ClassOption("streamTrain", 's', "Input stream.", InstanceStream.class, - RandomRBFGeneratorEvents.class.getName()); + public ClassOption streamTrainOption = new ClassOption("streamTrain", 's', "Input stream.", InstanceStream.class, + RandomRBFGeneratorEvents.class.getName()); - public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i', "Maximum number of instances to test/train on (-1 = no limit).", 100000, -1, - Integer.MAX_VALUE); + public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i', + "Maximum number of instances to test/train on (-1 = no limit).", 100000, -1, + Integer.MAX_VALUE); - public IntOption measureCollectionTypeOption = new IntOption("measureCollectionType", 'm', "Type of measure collection", 0, 0, Integer.MAX_VALUE); + public IntOption measureCollectionTypeOption = new IntOption("measureCollectionType", 'm', + "Type of measure collection", 0, 0, Integer.MAX_VALUE); - public IntOption timeLimitOption = new IntOption("timeLimit", 't', "Maximum number of seconds to test/train for (-1 = no limit).", -1, -1, - Integer.MAX_VALUE); + public IntOption timeLimitOption = new IntOption("timeLimit", 't', + "Maximum number of seconds to test/train for (-1 = no limit).", -1, -1, + Integer.MAX_VALUE); - public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 'f', "How many instances between samples of the learning performance.", 1000, 0, - Integer.MAX_VALUE); + public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 'f', + "How many instances between samples of the learning performance.", 1000, 0, + Integer.MAX_VALUE); - public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", "Clustering__" - + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); + public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", + "Clustering__" + + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); - public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File to append intermediate csv results to", null, "csv", true); + public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File to append intermediate csv results to", + null, "csv", true); - public FloatOption samplingThresholdOption = new FloatOption("samplingThreshold", 'a', "Ratio of instances sampled that will be used for evaluation.", 0.5, - 0.0, 1.0); + public FloatOption samplingThresholdOption = new FloatOption("samplingThreshold", 'a', + "Ratio of instances sampled that will be used for evaluation.", 0.5, + 0.0, 1.0); - private ClusteringEntranceProcessor source; - private InstanceStream streamTrain; - private ClusteringDistributorProcessor distributor; - private Stream distributorStream; - private Stream evaluationStream; - - // Default=0: no delay/waiting - public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w', "How many miliseconds between injections of two instances.", 0, 0, Integer.MAX_VALUE); - - private Learner learner; - private ClusteringEvaluatorProcessor evaluator; + private ClusteringEntranceProcessor source; + private InstanceStream streamTrain; + private ClusteringDistributorProcessor distributor; + private Stream distributorStream; + private Stream evaluationStream; - private Topology topology; - private TopologyBuilder builder; + // Default=0: no delay/waiting + public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w', + "How many miliseconds between injections of two instances.", 0, 0, Integer.MAX_VALUE); - public void getDescription(StringBuilder sb) { - sb.append("Clustering evaluation"); - } + private Learner learner; + private ClusteringEvaluatorProcessor evaluator; - @Override - public void init() { - // TODO remove the if statement theoretically, dynamic binding will work here! for now, the if statement is used by Storm - - if (builder == null) { - logger.warn("Builder was not initialized, initializing it from the Task"); - - builder = new TopologyBuilder(); - logger.debug("Successfully instantiating TopologyBuilder"); - - builder.initTopology(evaluationNameOption.getValue(), sourceDelayOption.getValue()); - logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); - } - - // instantiate ClusteringEntranceProcessor and its output stream (sourceStream) - source = new ClusteringEntranceProcessor(); - streamTrain = this.streamTrainOption.getValue(); - source.setStreamSource(streamTrain); - builder.addEntranceProcessor(source); - source.setSamplingThreshold(samplingThresholdOption.getValue()); - source.setMaxNumInstances(instanceLimitOption.getValue()); - logger.debug("Successfully instantiated ClusteringEntranceProcessor"); - - Stream sourceStream = builder.createStream(source); - // starter.setInputStream(sourcePiOutputStream); // FIXME set stream in the EntrancePI - - // distribution of instances and sampling for evaluation - distributor = new ClusteringDistributorProcessor(); - builder.addProcessor(distributor, DISTRIBUTOR_PARALLELISM); - builder.connectInputShuffleStream(sourceStream, distributor); - distributorStream = builder.createStream(distributor); - distributor.setOutputStream(distributorStream); - evaluationStream = builder.createStream(distributor); - distributor.setEvaluationStream(evaluationStream); // passes evaluation events along - logger.debug("Successfully instantiated Distributor"); - - // instantiate learner and connect it to distributorStream - learner = this.learnerOption.getValue(); - learner.init(builder, source.getDataset(), 1); - builder.connectInputShuffleStream(distributorStream, learner.getInputProcessor()); - logger.debug("Successfully instantiated Learner"); - - evaluator = new ClusteringEvaluatorProcessor.Builder( - sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile()) - .decayHorizon(((ClusteringStream) streamTrain).getDecayHorizon()).build(); + private Topology topology; + private TopologyBuilder builder; - builder.addProcessor(evaluator); - for (Stream evaluatorPiInputStream:learner.getResultStreams()) { - builder.connectInputShuffleStream(evaluatorPiInputStream, evaluator); - } - builder.connectInputAllStream(evaluationStream, evaluator); - logger.debug("Successfully instantiated EvaluatorProcessor"); + public void getDescription(StringBuilder sb) { + sb.append("Clustering evaluation"); + } - topology = builder.build(); - logger.debug("Successfully built the topology"); - } + @Override + public void init() { + // TODO remove the if statement theoretically, dynamic binding will work + // here! for now, the if statement is used by Storm - @Override - public void setFactory(ComponentFactory factory) { - // TODO unify this code with init() for now, it's used by S4 App - // dynamic binding theoretically will solve this problem - builder = new TopologyBuilder(factory); - logger.debug("Successfully instantiated TopologyBuilder"); + if (builder == null) { + logger.warn("Builder was not initialized, initializing it from the Task"); - builder.initTopology(evaluationNameOption.getValue()); - logger.debug("Successfully initialized SAMOA topology with name {}", evaluationNameOption.getValue()); + builder = new TopologyBuilder(); + logger.debug("Successfully instantiating TopologyBuilder"); + builder.initTopology(evaluationNameOption.getValue(), sourceDelayOption.getValue()); + logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); } - public Topology getTopology() { - return topology; + // instantiate ClusteringEntranceProcessor and its output stream + // (sourceStream) + source = new ClusteringEntranceProcessor(); + streamTrain = this.streamTrainOption.getValue(); + source.setStreamSource(streamTrain); + builder.addEntranceProcessor(source); + source.setSamplingThreshold(samplingThresholdOption.getValue()); + source.setMaxNumInstances(instanceLimitOption.getValue()); + logger.debug("Successfully instantiated ClusteringEntranceProcessor"); + + Stream sourceStream = builder.createStream(source); + // starter.setInputStream(sourcePiOutputStream); // FIXME set stream in the + // EntrancePI + + // distribution of instances and sampling for evaluation + distributor = new ClusteringDistributorProcessor(); + builder.addProcessor(distributor, DISTRIBUTOR_PARALLELISM); + builder.connectInputShuffleStream(sourceStream, distributor); + distributorStream = builder.createStream(distributor); + distributor.setOutputStream(distributorStream); + evaluationStream = builder.createStream(distributor); + distributor.setEvaluationStream(evaluationStream); // passes evaluation + // events along + logger.debug("Successfully instantiated Distributor"); + + // instantiate learner and connect it to distributorStream + learner = this.learnerOption.getValue(); + learner.init(builder, source.getDataset(), 1); + builder.connectInputShuffleStream(distributorStream, learner.getInputProcessor()); + logger.debug("Successfully instantiated Learner"); + + evaluator = new ClusteringEvaluatorProcessor.Builder( + sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile()) + .decayHorizon(((ClusteringStream) streamTrain).getDecayHorizon()).build(); + + builder.addProcessor(evaluator); + for (Stream evaluatorPiInputStream : learner.getResultStreams()) { + builder.connectInputShuffleStream(evaluatorPiInputStream, evaluator); } + builder.connectInputAllStream(evaluationStream, evaluator); + logger.debug("Successfully instantiated EvaluatorProcessor"); + + topology = builder.build(); + logger.debug("Successfully built the topology"); + } + + @Override + public void setFactory(ComponentFactory factory) { + // TODO unify this code with init() for now, it's used by S4 App + // dynamic binding theoretically will solve this problem + builder = new TopologyBuilder(factory); + logger.debug("Successfully instantiated TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue()); + logger.debug("Successfully initialized SAMOA topology with name {}", evaluationNameOption.getValue()); + + } + + public Topology getTopology() { + return topology; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java index 70c44a1..081d123 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java @@ -49,158 +49,174 @@ import com.yahoo.labs.samoa.topology.Topology; import com.yahoo.labs.samoa.topology.TopologyBuilder; /** - * Prequential Evaluation task is a scheme in evaluating performance of online classifiers which uses each instance for testing online classifiers model and - * then it further uses the same instance for training the model(Test-then-train) + * Prequential Evaluation task is a scheme in evaluating performance of online + * classifiers which uses each instance for testing online classifiers model and + * then it further uses the same instance for training the + * model(Test-then-train) * * @author Arinto Murdopo * */ public class PrequentialEvaluation implements Task, Configurable { - private static final long serialVersionUID = -8246537378371580550L; + private static final long serialVersionUID = -8246537378371580550L; - private static Logger logger = LoggerFactory.getLogger(PrequentialEvaluation.class); + private static Logger logger = LoggerFactory.getLogger(PrequentialEvaluation.class); - public ClassOption learnerOption = new ClassOption("learner", 'l', "Classifier to train.", Learner.class, VerticalHoeffdingTree.class.getName()); + public ClassOption learnerOption = new ClassOption("learner", 'l', "Classifier to train.", Learner.class, + VerticalHoeffdingTree.class.getName()); - public ClassOption streamTrainOption = new ClassOption("trainStream", 's', "Stream to learn from.", InstanceStream.class, - RandomTreeGenerator.class.getName()); + public ClassOption streamTrainOption = new ClassOption("trainStream", 's', "Stream to learn from.", + InstanceStream.class, + RandomTreeGenerator.class.getName()); - public ClassOption evaluatorOption = new ClassOption("evaluator", 'e', "Classification performance evaluation method.", - PerformanceEvaluator.class, BasicClassificationPerformanceEvaluator.class.getName()); + public ClassOption evaluatorOption = new ClassOption("evaluator", 'e', + "Classification performance evaluation method.", + PerformanceEvaluator.class, BasicClassificationPerformanceEvaluator.class.getName()); - public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i', "Maximum number of instances to test/train on (-1 = no limit).", 1000000, -1, - Integer.MAX_VALUE); + public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i', + "Maximum number of instances to test/train on (-1 = no limit).", 1000000, -1, + Integer.MAX_VALUE); - public IntOption timeLimitOption = new IntOption("timeLimit", 't', "Maximum number of seconds to test/train for (-1 = no limit).", -1, -1, - Integer.MAX_VALUE); + public IntOption timeLimitOption = new IntOption("timeLimit", 't', + "Maximum number of seconds to test/train for (-1 = no limit).", -1, -1, + Integer.MAX_VALUE); - public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 'f', "How many instances between samples of the learning performance.", 100000, - 0, Integer.MAX_VALUE); + public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 'f', + "How many instances between samples of the learning performance.", 100000, + 0, Integer.MAX_VALUE); - public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", "Prequential_" - + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); + public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", + "Prequential_" + + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); - public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File to append intermediate csv results to", null, "csv", true); + public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File to append intermediate csv results to", + null, "csv", true); - // Default=0: no delay/waiting - public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w', "How many microseconds between injections of two instances.", 0, 0, Integer.MAX_VALUE); - // Batch size to delay the incoming stream: delay of x milliseconds after each batch - public IntOption batchDelayOption = new IntOption("delayBatchSize", 'b', "The delay batch size: delay of x milliseconds after each batch ", 1, 1, Integer.MAX_VALUE); - - private PrequentialSourceProcessor preqSource; + // Default=0: no delay/waiting + public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w', + "How many microseconds between injections of two instances.", 0, 0, Integer.MAX_VALUE); + // Batch size to delay the incoming stream: delay of x milliseconds after each + // batch + public IntOption batchDelayOption = new IntOption("delayBatchSize", 'b', + "The delay batch size: delay of x milliseconds after each batch ", 1, 1, Integer.MAX_VALUE); - // private PrequentialSourceTopologyStarter preqStarter; + private PrequentialSourceProcessor preqSource; - // private EntranceProcessingItem sourcePi; + // private PrequentialSourceTopologyStarter preqStarter; - private Stream sourcePiOutputStream; + // private EntranceProcessingItem sourcePi; - private Learner classifier; + private Stream sourcePiOutputStream; - private EvaluatorProcessor evaluator; + private Learner classifier; - // private ProcessingItem evaluatorPi; + private EvaluatorProcessor evaluator; - // private Stream evaluatorPiInputStream; + // private ProcessingItem evaluatorPi; - private Topology prequentialTopology; + // private Stream evaluatorPiInputStream; - private TopologyBuilder builder; + private Topology prequentialTopology; - public void getDescription(StringBuilder sb, int indent) { - sb.append("Prequential evaluation"); - } + private TopologyBuilder builder; - @Override - public void init() { - // TODO remove the if statement - // theoretically, dynamic binding will work here! - // test later! - // for now, the if statement is used by Storm - - if (builder == null) { - builder = new TopologyBuilder(); - logger.debug("Successfully instantiating TopologyBuilder"); - - builder.initTopology(evaluationNameOption.getValue()); - logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); - } - - // instantiate PrequentialSourceProcessor and its output stream (sourcePiOutputStream) - preqSource = new PrequentialSourceProcessor(); - preqSource.setStreamSource((InstanceStream) this.streamTrainOption.getValue()); - preqSource.setMaxNumInstances(instanceLimitOption.getValue()); - preqSource.setSourceDelay(sourceDelayOption.getValue()); - preqSource.setDelayBatchSize(batchDelayOption.getValue()); - builder.addEntranceProcessor(preqSource); - logger.debug("Successfully instantiating PrequentialSourceProcessor"); - - // preqStarter = new PrequentialSourceTopologyStarter(preqSource, instanceLimitOption.getValue()); - // sourcePi = builder.createEntrancePi(preqSource, preqStarter); - // sourcePiOutputStream = builder.createStream(sourcePi); - - sourcePiOutputStream = builder.createStream(preqSource); - // preqStarter.setInputStream(sourcePiOutputStream); - - // instantiate classifier and connect it to sourcePiOutputStream - classifier = this.learnerOption.getValue(); - classifier.init(builder, preqSource.getDataset(), 1); - builder.connectInputShuffleStream(sourcePiOutputStream, classifier.getInputProcessor()); - logger.debug("Successfully instantiating Classifier"); - - PerformanceEvaluator evaluatorOptionValue = this.evaluatorOption.getValue(); - if (!PrequentialEvaluation.isLearnerAndEvaluatorCompatible(classifier, evaluatorOptionValue)) { - evaluatorOptionValue = getDefaultPerformanceEvaluatorForLearner(classifier); - } - evaluator = new EvaluatorProcessor.Builder(evaluatorOptionValue) - .samplingFrequency(sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile()).build(); - - // evaluatorPi = builder.createPi(evaluator); - // evaluatorPi.connectInputShuffleStream(evaluatorPiInputStream); - builder.addProcessor(evaluator); - for (Stream evaluatorPiInputStream:classifier.getResultStreams()) { - builder.connectInputShuffleStream(evaluatorPiInputStream, evaluator); - } - - logger.debug("Successfully instantiating EvaluatorProcessor"); - - prequentialTopology = builder.build(); - logger.debug("Successfully building the topology"); - } + public void getDescription(StringBuilder sb, int indent) { + sb.append("Prequential evaluation"); + } - @Override - public void setFactory(ComponentFactory factory) { - // TODO unify this code with init() - // for now, it's used by S4 App - // dynamic binding theoretically will solve this problem - builder = new TopologyBuilder(factory); - logger.debug("Successfully instantiating TopologyBuilder"); + @Override + public void init() { + // TODO remove the if statement + // theoretically, dynamic binding will work here! + // test later! + // for now, the if statement is used by Storm - builder.initTopology(evaluationNameOption.getValue()); - logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + if (builder == null) { + builder = new TopologyBuilder(); + logger.debug("Successfully instantiating TopologyBuilder"); + builder.initTopology(evaluationNameOption.getValue()); + logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); } - public Topology getTopology() { - return prequentialTopology; + // instantiate PrequentialSourceProcessor and its output stream + // (sourcePiOutputStream) + preqSource = new PrequentialSourceProcessor(); + preqSource.setStreamSource((InstanceStream) this.streamTrainOption.getValue()); + preqSource.setMaxNumInstances(instanceLimitOption.getValue()); + preqSource.setSourceDelay(sourceDelayOption.getValue()); + preqSource.setDelayBatchSize(batchDelayOption.getValue()); + builder.addEntranceProcessor(preqSource); + logger.debug("Successfully instantiating PrequentialSourceProcessor"); + + // preqStarter = new PrequentialSourceTopologyStarter(preqSource, + // instanceLimitOption.getValue()); + // sourcePi = builder.createEntrancePi(preqSource, preqStarter); + // sourcePiOutputStream = builder.createStream(sourcePi); + + sourcePiOutputStream = builder.createStream(preqSource); + // preqStarter.setInputStream(sourcePiOutputStream); + + // instantiate classifier and connect it to sourcePiOutputStream + classifier = this.learnerOption.getValue(); + classifier.init(builder, preqSource.getDataset(), 1); + builder.connectInputShuffleStream(sourcePiOutputStream, classifier.getInputProcessor()); + logger.debug("Successfully instantiating Classifier"); + + PerformanceEvaluator evaluatorOptionValue = this.evaluatorOption.getValue(); + if (!PrequentialEvaluation.isLearnerAndEvaluatorCompatible(classifier, evaluatorOptionValue)) { + evaluatorOptionValue = getDefaultPerformanceEvaluatorForLearner(classifier); } - // - // @Override - // public TopologyStarter getTopologyStarter() { - // return this.preqStarter; - // } - - private static boolean isLearnerAndEvaluatorCompatible(Learner learner, PerformanceEvaluator evaluator) { - return (learner instanceof RegressionLearner && evaluator instanceof RegressionPerformanceEvaluator) || - (learner instanceof ClassificationLearner && evaluator instanceof ClassificationPerformanceEvaluator); + evaluator = new EvaluatorProcessor.Builder(evaluatorOptionValue) + .samplingFrequency(sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile()).build(); + + // evaluatorPi = builder.createPi(evaluator); + // evaluatorPi.connectInputShuffleStream(evaluatorPiInputStream); + builder.addProcessor(evaluator); + for (Stream evaluatorPiInputStream : classifier.getResultStreams()) { + builder.connectInputShuffleStream(evaluatorPiInputStream, evaluator); } - - private static PerformanceEvaluator getDefaultPerformanceEvaluatorForLearner(Learner learner) { - if (learner instanceof RegressionLearner) { - return new BasicRegressionPerformanceEvaluator(); - } - // Default to BasicClassificationPerformanceEvaluator for all other cases - return new BasicClassificationPerformanceEvaluator(); + + logger.debug("Successfully instantiating EvaluatorProcessor"); + + prequentialTopology = builder.build(); + logger.debug("Successfully building the topology"); + } + + @Override + public void setFactory(ComponentFactory factory) { + // TODO unify this code with init() + // for now, it's used by S4 App + // dynamic binding theoretically will solve this problem + builder = new TopologyBuilder(factory); + logger.debug("Successfully instantiating TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue()); + logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + + } + + public Topology getTopology() { + return prequentialTopology; + } + + // + // @Override + // public TopologyStarter getTopologyStarter() { + // return this.preqStarter; + // } + + private static boolean isLearnerAndEvaluatorCompatible(Learner learner, PerformanceEvaluator evaluator) { + return (learner instanceof RegressionLearner && evaluator instanceof RegressionPerformanceEvaluator) || + (learner instanceof ClassificationLearner && evaluator instanceof ClassificationPerformanceEvaluator); + } + + private static PerformanceEvaluator getDefaultPerformanceEvaluatorForLearner(Learner learner) { + if (learner instanceof RegressionLearner) { + return new BasicRegressionPerformanceEvaluator(); } + // Default to BasicClassificationPerformanceEvaluator for all other cases + return new BasicClassificationPerformanceEvaluator(); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java index 41b47e4..5753349 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java @@ -28,34 +28,34 @@ import com.yahoo.labs.samoa.topology.Topology; */ public interface Task { - /** - * Initialize this SAMOA task, - * i.e. create and connect ProcessingItems and Streams - * and initialize the topology - */ - public void init(); - - /** - * Return the final topology object to be executed in the cluster - * @return topology object to be submitted to be executed in the cluster - */ - public Topology getTopology(); - - // /** - // * Return the entrance processor to start SAMOA topology - // * The logic to start the topology should be implemented here - // * @return entrance processor to start the topology - // */ - // public TopologyStarter getTopologyStarter(); - - /** - * Sets the factory. - * TODO: propose to hide factory from task, - * i.e. Task will only see TopologyBuilder, - * and factory creation will be handled by TopologyBuilder - * - * @param factory the new factory - */ - public void setFactory(ComponentFactory factory) ; - + /** + * Initialize this SAMOA task, i.e. create and connect ProcessingItems and + * Streams and initialize the topology + */ + public void init(); + + /** + * Return the final topology object to be executed in the cluster + * + * @return topology object to be submitted to be executed in the cluster + */ + public Topology getTopology(); + + // /** + // * Return the entrance processor to start SAMOA topology + // * The logic to start the topology should be implemented here + // * @return entrance processor to start the topology + // */ + // public TopologyStarter getTopologyStarter(); + + /** + * Sets the factory. TODO: propose to hide factory from task, i.e. Task will + * only see TopologyBuilder, and factory creation will be handled by + * TopologyBuilder + * + * @param factory + * the new factory + */ + public void setFactory(ComponentFactory factory); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java index c0f0cc3..1e3c9b5 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java @@ -24,85 +24,93 @@ import com.yahoo.labs.samoa.core.EntranceProcessor; /** * Helper class for EntranceProcessingItem implementation. + * * @author Anh Thu Vu - * + * */ public abstract class AbstractEntranceProcessingItem implements EntranceProcessingItem { - private EntranceProcessor processor; - private String name; - private Stream outputStream; - - /* - * Constructor - */ - public AbstractEntranceProcessingItem() { - this(null); - } - public AbstractEntranceProcessingItem(EntranceProcessor processor) { - this.processor = processor; - } - - /* - * Processor - */ - /** - * Set the entrance processor for this EntranceProcessingItem - * @param processor - * the processor - */ - protected void setProcessor(EntranceProcessor processor) { - this.processor = processor; - } - - /** - * Get the EntranceProcessor of this EntranceProcessingItem. - * @return the EntranceProcessor - */ - public EntranceProcessor getProcessor() { - return this.processor; - } - - /* - * Name/ID - */ - /** - * Set the name (or ID) of this EntranceProcessingItem - * @param name - */ - public void setName(String name) { - this.name = name; - } - - /** - * Get the name (or ID) of this EntranceProcessingItem - * @return the name (or ID) - */ - public String getName() { - return this.name; - } - - /* - * Output Stream - */ - /** - * Set the output stream of this EntranceProcessingItem. - * An EntranceProcessingItem should have only 1 single output stream and - * should not be re-assigned. - * @return this EntranceProcessingItem - */ - public EntranceProcessingItem setOutputStream(Stream outputStream) { - if (this.outputStream != null && this.outputStream != outputStream) { - throw new IllegalStateException("Cannot overwrite output stream of EntranceProcessingItem"); - } else - this.outputStream = outputStream; - return this; - } - - /** - * Get the output stream of this EntranceProcessingItem. - * @return the output stream - */ - public Stream getOutputStream() { - return this.outputStream; - } + private EntranceProcessor processor; + private String name; + private Stream outputStream; + + /* + * Constructor + */ + public AbstractEntranceProcessingItem() { + this(null); + } + + public AbstractEntranceProcessingItem(EntranceProcessor processor) { + this.processor = processor; + } + + /* + * Processor + */ + /** + * Set the entrance processor for this EntranceProcessingItem + * + * @param processor + * the processor + */ + protected void setProcessor(EntranceProcessor processor) { + this.processor = processor; + } + + /** + * Get the EntranceProcessor of this EntranceProcessingItem. + * + * @return the EntranceProcessor + */ + public EntranceProcessor getProcessor() { + return this.processor; + } + + /* + * Name/ID + */ + /** + * Set the name (or ID) of this EntranceProcessingItem + * + * @param name + */ + public void setName(String name) { + this.name = name; + } + + /** + * Get the name (or ID) of this EntranceProcessingItem + * + * @return the name (or ID) + */ + public String getName() { + return this.name; + } + + /* + * Output Stream + */ + /** + * Set the output stream of this EntranceProcessingItem. An + * EntranceProcessingItem should have only 1 single output stream and should + * not be re-assigned. + * + * @return this EntranceProcessingItem + */ + public EntranceProcessingItem setOutputStream(Stream outputStream) { + if (this.outputStream != null && this.outputStream != outputStream) { + throw new IllegalStateException("Cannot overwrite output stream of EntranceProcessingItem"); + } else + this.outputStream = outputStream; + return this; + } + + /** + * Get the output stream of this EntranceProcessingItem. + * + * @return the output stream + */ + public Stream getOutputStream() { + return this.outputStream; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java index d0f04f7..60d76e0 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java @@ -26,136 +26,145 @@ import com.yahoo.labs.samoa.utils.PartitioningScheme; /** * Abstract ProcessingItem * - * Helper for implementation of ProcessingItem. It has basic information - * for a ProcessingItem: name, parallelismLevel and a processor. - * Subclass of this class needs to implement {@link #addInputStream(Stream, PartitioningScheme)}. + * Helper for implementation of ProcessingItem. It has basic information for a + * ProcessingItem: name, parallelismLevel and a processor. Subclass of this + * class needs to implement {@link #addInputStream(Stream, PartitioningScheme)}. * * @author Anh Thu Vu - * + * */ public abstract class AbstractProcessingItem implements ProcessingItem { - private String name; - private int parallelism; - private Processor processor; - - /* - * Constructor - */ - public AbstractProcessingItem() { - this(null); - } - public AbstractProcessingItem(Processor processor) { - this(processor,1); - } - public AbstractProcessingItem(Processor processor, int parallelism) { - this.processor = processor; - this.parallelism = parallelism; - } - - /* - * Processor - */ - /** - * Set the processor for this ProcessingItem - * @param processor - * the processor - */ - protected void setProcessor(Processor processor) { - this.processor = processor; - } - - /** - * Get the processor of this ProcessingItem - * @return the processor - */ - public Processor getProcessor() { - return this.processor; - } - - /* - * Parallelism - */ - /** - * Set the parallelism factor of this ProcessingItem - * @param parallelism - */ - protected void setParallelism(int parallelism) { - this.parallelism = parallelism; - } - - /** - * Get the parallelism factor of this ProcessingItem - * @return the parallelism factor - */ - @Override - public int getParallelism() { - return this.parallelism; - } - - /* - * Name/ID - */ - /** - * Set the name (or ID) of this ProcessingItem - * @param name - * the name/ID - */ - public void setName(String name) { - this.name = name; - } - - /** - * Get the name (or ID) of this ProcessingItem - * @return the name/ID - */ - public String getName() { - return this.name; - } - - /* - * Add input streams - */ - /** - * Add an input stream to this ProcessingItem - * - * @param inputStream - * the input stream to add - * @param scheme - * partitioning scheme associated with this ProcessingItem and the input stream - * @return this ProcessingItem - */ - protected abstract ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme); - - /** - * Add an input stream to this ProcessingItem with SHUFFLE scheme - * - * @param inputStream - * the input stream - * @return this ProcessingItem - */ - public ProcessingItem connectInputShuffleStream(Stream inputStream) { - return this.addInputStream(inputStream, PartitioningScheme.SHUFFLE); - } - - /** - * Add an input stream to this ProcessingItem with GROUP_BY_KEY scheme - * - * @param inputStream - * the input stream - * @return this ProcessingItem - */ - public ProcessingItem connectInputKeyStream(Stream inputStream) { - return this.addInputStream(inputStream, PartitioningScheme.GROUP_BY_KEY); - } - - /** - * Add an input stream to this ProcessingItem with BROADCAST scheme - * - * @param inputStream - * the input stream - * @return this ProcessingItem - */ - public ProcessingItem connectInputAllStream(Stream inputStream) { - return this.addInputStream(inputStream, PartitioningScheme.BROADCAST); - } + private String name; + private int parallelism; + private Processor processor; + + /* + * Constructor + */ + public AbstractProcessingItem() { + this(null); + } + + public AbstractProcessingItem(Processor processor) { + this(processor, 1); + } + + public AbstractProcessingItem(Processor processor, int parallelism) { + this.processor = processor; + this.parallelism = parallelism; + } + + /* + * Processor + */ + /** + * Set the processor for this ProcessingItem + * + * @param processor + * the processor + */ + protected void setProcessor(Processor processor) { + this.processor = processor; + } + + /** + * Get the processor of this ProcessingItem + * + * @return the processor + */ + public Processor getProcessor() { + return this.processor; + } + + /* + * Parallelism + */ + /** + * Set the parallelism factor of this ProcessingItem + * + * @param parallelism + */ + protected void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + /** + * Get the parallelism factor of this ProcessingItem + * + * @return the parallelism factor + */ + @Override + public int getParallelism() { + return this.parallelism; + } + + /* + * Name/ID + */ + /** + * Set the name (or ID) of this ProcessingItem + * + * @param name + * the name/ID + */ + public void setName(String name) { + this.name = name; + } + + /** + * Get the name (or ID) of this ProcessingItem + * + * @return the name/ID + */ + public String getName() { + return this.name; + } + + /* + * Add input streams + */ + /** + * Add an input stream to this ProcessingItem + * + * @param inputStream + * the input stream to add + * @param scheme + * partitioning scheme associated with this ProcessingItem and the + * input stream + * @return this ProcessingItem + */ + protected abstract ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme); + + /** + * Add an input stream to this ProcessingItem with SHUFFLE scheme + * + * @param inputStream + * the input stream + * @return this ProcessingItem + */ + public ProcessingItem connectInputShuffleStream(Stream inputStream) { + return this.addInputStream(inputStream, PartitioningScheme.SHUFFLE); + } + + /** + * Add an input stream to this ProcessingItem with GROUP_BY_KEY scheme + * + * @param inputStream + * the input stream + * @return this ProcessingItem + */ + public ProcessingItem connectInputKeyStream(Stream inputStream) { + return this.addInputStream(inputStream, PartitioningScheme.GROUP_BY_KEY); + } + + /** + * Add an input stream to this ProcessingItem with BROADCAST scheme + * + * @param inputStream + * the input stream + * @return this ProcessingItem + */ + public ProcessingItem connectInputAllStream(Stream inputStream) { + return this.addInputStream(inputStream, PartitioningScheme.BROADCAST); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java index b3544ed..a16d566 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java @@ -25,91 +25,95 @@ import com.yahoo.labs.samoa.core.ContentEvent; /** * Abstract Stream * - * Helper for implementation of Stream. It has basic information - * for a Stream: streamID and source ProcessingItem. - * Subclass of this class needs to implement {@link #put(ContentEvent)}. + * Helper for implementation of Stream. It has basic information for a Stream: + * streamID and source ProcessingItem. Subclass of this class needs to implement + * {@link #put(ContentEvent)}. * * @author Anh Thu Vu - * + * */ public abstract class AbstractStream implements Stream { - private String streamID; - private IProcessingItem sourcePi; - private int batchSize; - - /* - * Constructor - */ - public AbstractStream() { - this(null); - } - public AbstractStream(IProcessingItem sourcePi) { - this.sourcePi = sourcePi; - this.batchSize = 1; - } - - /** - * Get source processing item of this stream - * @return - */ - public IProcessingItem getSourceProcessingItem() { - return this.sourcePi; - } + private String streamID; + private IProcessingItem sourcePi; + private int batchSize; + + /* + * Constructor + */ + public AbstractStream() { + this(null); + } + + public AbstractStream(IProcessingItem sourcePi) { + this.sourcePi = sourcePi; + this.batchSize = 1; + } + + /** + * Get source processing item of this stream + * + * @return + */ + public IProcessingItem getSourceProcessingItem() { + return this.sourcePi; + } + + /* + * Process event + */ + @Override + /** + * Send a ContentEvent + * @param event + * the ContentEvent to be sent + */ + public abstract void put(ContentEvent event); + + /* + * Stream name + */ + /** + * Get name (ID) of this stream + * + * @return the name (ID) + */ + @Override + public String getStreamId() { + return this.streamID; + } - /* - * Process event - */ - @Override - /** - * Send a ContentEvent - * @param event - * the ContentEvent to be sent - */ - public abstract void put(ContentEvent event); + /** + * Set the name (ID) of this stream + * + * @param streamID + * the name (ID) + */ + public void setStreamId(String streamID) { + this.streamID = streamID; + } - /* - * Stream name - */ - /** - * Get name (ID) of this stream - * @return the name (ID) - */ - @Override - public String getStreamId() { - return this.streamID; - } - - /** - * Set the name (ID) of this stream - * @param streamID - * the name (ID) - */ - public void setStreamId (String streamID) { - this.streamID = streamID; - } - - /* - * Batch size - */ - /** - * Set suggested batch size - * - * @param batchSize - * the suggested batch size - * - */ - @Override - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; - } + /* + * Batch size + */ + /** + * Set suggested batch size + * + * @param batchSize + * the suggested batch size + * + */ + @Override + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } - /** - * Get suggested batch size - * - * @return the suggested batch size - */ - public int getBatchSize() { - return this.batchSize; - } + /** + * Get suggested batch size + * + * @return the suggested batch size + */ + public int getBatchSize() { + return this.batchSize; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java index 53385b1..00096ca 100755 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java @@ -26,108 +26,109 @@ import java.util.Set; /** * Topology abstract class. * - * It manages basic information of a topology: name, sets of Streams and ProcessingItems + * It manages basic information of a topology: name, sets of Streams and + * ProcessingItems * */ public abstract class AbstractTopology implements Topology { - private String topoName; - private Set<Stream> streams; - private Set<IProcessingItem> processingItems; - private Set<EntranceProcessingItem> entranceProcessingItems; + private String topoName; + private Set<Stream> streams; + private Set<IProcessingItem> processingItems; + private Set<EntranceProcessingItem> entranceProcessingItems; - protected AbstractTopology(String name) { - this.topoName = name; - this.streams = new HashSet<>(); - this.processingItems = new HashSet<>(); - this.entranceProcessingItems = new HashSet<>(); - } - - /** - * Gets the name of this topology - * - * @return name of the topology - */ - public String getTopologyName() { - return this.topoName; - } - - /** - * Sets the name of this topology - * - * @param topologyName - * name of the topology - */ - public void setTopologyName(String topologyName) { - this.topoName = topologyName; - } - - /** - * Adds an Entrance processing item to the topology. - * - * @param epi - * Entrance processing item - */ - public void addEntranceProcessingItem(EntranceProcessingItem epi) { - this.entranceProcessingItems.add(epi); - this.addProcessingItem(epi); - } - - /** - * Gets entrance processing items in the topology - * - * @return the set of processing items - */ - public Set<EntranceProcessingItem> getEntranceProcessingItems() { - return this.entranceProcessingItems; - } + protected AbstractTopology(String name) { + this.topoName = name; + this.streams = new HashSet<>(); + this.processingItems = new HashSet<>(); + this.entranceProcessingItems = new HashSet<>(); + } - /** - * Add processing item to topology. - * - * @param procItem - * Processing item. - */ - public void addProcessingItem(IProcessingItem procItem) { - addProcessingItem(procItem, 1); - } + /** + * Gets the name of this topology + * + * @return name of the topology + */ + public String getTopologyName() { + return this.topoName; + } - /** - * Add processing item to topology. - * - * @param procItem - * Processing item. - * @param parallelismHint - * Processing item parallelism level. - */ - public void addProcessingItem(IProcessingItem procItem, int parallelismHint) { - this.processingItems.add(procItem); - } - - /** - * Gets processing items in the topology (including entrance processing items) - * - * @return the set of processing items - */ - public Set<IProcessingItem> getProcessingItems() { - return this.processingItems; - } + /** + * Sets the name of this topology + * + * @param topologyName + * name of the topology + */ + public void setTopologyName(String topologyName) { + this.topoName = topologyName; + } - /** - * Add stream to topology. - * - * @param stream - */ - public void addStream(Stream stream) { - this.streams.add(stream); - } - - /** - * Gets streams in the topology - * - * @return the set of streams - */ - public Set<Stream> getStreams() { - return this.streams; - } + /** + * Adds an Entrance processing item to the topology. + * + * @param epi + * Entrance processing item + */ + public void addEntranceProcessingItem(EntranceProcessingItem epi) { + this.entranceProcessingItems.add(epi); + this.addProcessingItem(epi); + } + + /** + * Gets entrance processing items in the topology + * + * @return the set of processing items + */ + public Set<EntranceProcessingItem> getEntranceProcessingItems() { + return this.entranceProcessingItems; + } + + /** + * Add processing item to topology. + * + * @param procItem + * Processing item. + */ + public void addProcessingItem(IProcessingItem procItem) { + addProcessingItem(procItem, 1); + } + + /** + * Add processing item to topology. + * + * @param procItem + * Processing item. + * @param parallelismHint + * Processing item parallelism level. + */ + public void addProcessingItem(IProcessingItem procItem, int parallelismHint) { + this.processingItems.add(procItem); + } + + /** + * Gets processing items in the topology (including entrance processing items) + * + * @return the set of processing items + */ + public Set<IProcessingItem> getProcessingItems() { + return this.processingItems; + } + + /** + * Add stream to topology. + * + * @param stream + */ + public void addStream(Stream stream) { + this.streams.add(stream); + } + + /** + * Gets streams in the topology + * + * @return the set of streams + */ + public Set<Stream> getStreams() { + return this.streams; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java index 433f516..f1e82dc 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java @@ -28,51 +28,55 @@ import com.yahoo.labs.samoa.core.Processor; */ public interface ComponentFactory { - /** - * Creates a platform specific processing item with the specified processor. - * - * @param processor - * contains the logic for this processing item. - * @return ProcessingItem - */ - public ProcessingItem createPi(Processor processor); + /** + * Creates a platform specific processing item with the specified processor. + * + * @param processor + * contains the logic for this processing item. + * @return ProcessingItem + */ + public ProcessingItem createPi(Processor processor); - /** - * Creates a platform specific processing item with the specified processor. Additionally sets the parallelism level. - * - * @param processor - * contains the logic for this processing item. - * @param parallelism - * defines the amount of instances of this processing item will be created. - * @return ProcessingItem - */ - public ProcessingItem createPi(Processor processor, int parallelism); + /** + * Creates a platform specific processing item with the specified processor. + * Additionally sets the parallelism level. + * + * @param processor + * contains the logic for this processing item. + * @param parallelism + * defines the amount of instances of this processing item will be + * created. + * @return ProcessingItem + */ + public ProcessingItem createPi(Processor processor, int parallelism); - /** - * Creates a platform specific processing item with the specified processor that is the entrance point in the topology. This processing item can either - * generate a stream of data or connect to an external stream of data. - * - * @param entranceProcessor - * contains the logic for this processing item. - * @return EntranceProcessingItem - */ - public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor); + /** + * Creates a platform specific processing item with the specified processor + * that is the entrance point in the topology. This processing item can either + * generate a stream of data or connect to an external stream of data. + * + * @param entranceProcessor + * contains the logic for this processing item. + * @return EntranceProcessingItem + */ + public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor); - /** - * Creates a platform specific stream. - * - * @param sourcePi - * source processing item which will provide the events for this stream. - * @return Stream - */ - public Stream createStream(IProcessingItem sourcePi); + /** + * Creates a platform specific stream. + * + * @param sourcePi + * source processing item which will provide the events for this + * stream. + * @return Stream + */ + public Stream createStream(IProcessingItem sourcePi); - /** - * Creates a platform specific topology. - * - * @param topoName - * Topology name. - * @return Topology - */ - public Topology createTopology(String topoName); + /** + * Creates a platform specific topology. + * + * @param topoName + * Topology name. + * @return Topology + */ + public Topology createTopology(String topoName); } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java index 32ed109..9698cc3 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java @@ -27,20 +27,21 @@ import com.yahoo.labs.samoa.core.EntranceProcessor; */ public interface EntranceProcessingItem extends IProcessingItem { - @Override - /** - * Gets the processing item processor. - * - * @return the embedded EntranceProcessor. - */ - public EntranceProcessor getProcessor(); + @Override + /** + * Gets the processing item processor. + * + * @return the embedded EntranceProcessor. + */ + public EntranceProcessor getProcessor(); - /** - * Set the single output stream for this EntranceProcessingItem. - * - * @param stream - * the stream - * @return the current instance of the EntranceProcessingItem for fluent interface. - */ - public EntranceProcessingItem setOutputStream(Stream stream); + /** + * Set the single output stream for this EntranceProcessingItem. + * + * @param stream + * the stream + * @return the current instance of the EntranceProcessingItem for fluent + * interface. + */ + public EntranceProcessingItem setOutputStream(Stream stream); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java index 7a70dc4..97ea9a4 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java @@ -26,22 +26,22 @@ import com.yahoo.labs.samoa.core.Processor; * ProcessingItem interface specific for entrance processing items. * * @author severien - * + * */ public interface IProcessingItem { - - /** - * Gets the processing item processor. - * - * @return Processor - */ - public Processor getProcessor(); - - /** - * Sets processing item name. - * - * @param name - */ - //public void setName(String name); + + /** + * Gets the processing item processor. + * + * @return Processor + */ + public Processor getProcessor(); + + /** + * Sets processing item name. + * + * @param name + */ + // public void setName(String name); } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java index 8499f80..10c0690 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java @@ -23,24 +23,25 @@ package com.yahoo.labs.samoa.topology; import com.yahoo.labs.samoa.tasks.Task; /** - * Submitter interface for programatically deploying platform specific topologies. + * Submitter interface for programatically deploying platform specific + * topologies. * * @author severien - * + * */ public interface ISubmitter { - /** - * Deploy a specific task to a platform. - * - * @param task - */ - public void deployTask(Task task); - - /** - * Sets if the task should run locally or distributed. - * - * @param bool - */ - public void setLocal(boolean bool); + /** + * Deploy a specific task to a platform. + * + * @param task + */ + public void deployTask(Task task); + + /** + * Sets if the task should run locally or distributed. + * + * @param bool + */ + public void setLocal(boolean bool); } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java index 2e8758f..1fe4642 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java @@ -24,62 +24,63 @@ import com.yahoo.labs.samoa.core.ContentEvent; import com.yahoo.labs.samoa.core.EntranceProcessor; /** - * Implementation of EntranceProcessingItem for local engines (Simple, Multithreads) + * Implementation of EntranceProcessingItem for local engines (Simple, + * Multithreads) * * @author Anh Thu Vu - * + * */ public class LocalEntranceProcessingItem extends AbstractEntranceProcessingItem { - public LocalEntranceProcessingItem(EntranceProcessor processor) { - super(processor); - } - - /** - * If there are available events, first event in the queue will be - * sent out on the output stream. - * @return true if there is (at least) one available event and it was sent out - * false otherwise - */ - public boolean injectNextEvent() { - if (this.getProcessor().hasNext()) { - ContentEvent event = this.getProcessor().nextEvent(); - this.getOutputStream().put(event); - return true; - } - return false; - } + public LocalEntranceProcessingItem(EntranceProcessor processor) { + super(processor); + } + + /** + * If there are available events, first event in the queue will be sent out on + * the output stream. + * + * @return true if there is (at least) one available event and it was sent out + * false otherwise + */ + public boolean injectNextEvent() { + if (this.getProcessor().hasNext()) { + ContentEvent event = this.getProcessor().nextEvent(); + this.getOutputStream().put(event); + return true; + } + return false; + } - /** - * Start sending events by calling {@link #injectNextEvent()}. If there are no available events, - * and that the stream is not entirely consumed, it will wait by calling - * {@link #waitForNewEvents()} before attempting to send again. - * </p> - * When the stream is entirely consumed, the last event is tagged accordingly and the processor gets the - * finished status. - * - */ - public void startSendingEvents () { - if (this.getOutputStream() == null) - throw new IllegalStateException("Try sending events from EntrancePI while outputStream is not set."); - - while (!this.getProcessor().isFinished()) { - if (!this.injectNextEvent()) { - try { - waitForNewEvents(); - } catch (Exception e) { - e.printStackTrace(); - break; - } - } + /** + * Start sending events by calling {@link #injectNextEvent()}. If there are no + * available events, and that the stream is not entirely consumed, it will + * wait by calling {@link #waitForNewEvents()} before attempting to send + * again. </p> When the stream is entirely consumed, the last event is tagged + * accordingly and the processor gets the finished status. + * + */ + public void startSendingEvents() { + if (this.getOutputStream() == null) + throw new IllegalStateException("Try sending events from EntrancePI while outputStream is not set."); + + while (!this.getProcessor().isFinished()) { + if (!this.injectNextEvent()) { + try { + waitForNewEvents(); + } catch (Exception e) { + e.printStackTrace(); + break; } - } - - /** - * Method to wait for an amount of time when there are no available events. - * Implementation of EntranceProcessingItem should override this method to - * implement non-blocking wait or to adjust the amount of time. - */ - protected void waitForNewEvents() throws Exception { - Thread.sleep(100); - } + } + } + } + + /** + * Method to wait for an amount of time when there are no available events. + * Implementation of EntranceProcessingItem should override this method to + * implement non-blocking wait or to adjust the amount of time. + */ + protected void waitForNewEvents() throws Exception { + Thread.sleep(100); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java index 02fb84d..c1ecabc 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java @@ -28,43 +28,42 @@ package com.yahoo.labs.samoa.topology; */ public interface ProcessingItem extends IProcessingItem { - /** - * Connects this processing item in a round robin fashion. The events will - * be distributed evenly between the instantiated processing items. - * - * @param inputStream - * Stream to connect this processing item. - * @return ProcessingItem - */ - public ProcessingItem connectInputShuffleStream(Stream inputStream); + /** + * Connects this processing item in a round robin fashion. The events will be + * distributed evenly between the instantiated processing items. + * + * @param inputStream + * Stream to connect this processing item. + * @return ProcessingItem + */ + public ProcessingItem connectInputShuffleStream(Stream inputStream); - /** - * Connects this processing item taking the event key into account. Events - * will be routed to the processing item according to the modulus of its key - * and the paralellism level. Ex.: key = 5 and paralellism = 2, 5 mod 2 = 1. - * Processing item responsible for 1 will receive this event. - * - * @param inputStream - * Stream to connect this processing item. - * @return ProcessingItem - */ - public ProcessingItem connectInputKeyStream(Stream inputStream); + /** + * Connects this processing item taking the event key into account. Events + * will be routed to the processing item according to the modulus of its key + * and the paralellism level. Ex.: key = 5 and paralellism = 2, 5 mod 2 = 1. + * Processing item responsible for 1 will receive this event. + * + * @param inputStream + * Stream to connect this processing item. + * @return ProcessingItem + */ + public ProcessingItem connectInputKeyStream(Stream inputStream); - /** - * Connects this processing item to the stream in a broadcast fashion. All - * processing items of this type will receive copy of the original event. - * - * @param inputStream - * Stream to connect this processing item. - * @return ProcessingItem - */ - public ProcessingItem connectInputAllStream(Stream inputStream); + /** + * Connects this processing item to the stream in a broadcast fashion. All + * processing items of this type will receive copy of the original event. + * + * @param inputStream + * Stream to connect this processing item. + * @return ProcessingItem + */ + public ProcessingItem connectInputAllStream(Stream inputStream); - - /** - * Gets processing item parallelism level. - * - * @return int - */ - public int getParallelism(); + /** + * Gets processing item parallelism level. + * + * @return int + */ + public int getParallelism(); } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java index b496d35..0c54232 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java @@ -24,39 +24,38 @@ import com.yahoo.labs.samoa.core.ContentEvent; /** * Stream interface. - * + * * @author severien - * + * */ public interface Stream { - - /** - * Puts events into a platform specific data stream. - * - * @param event - */ - public void put(ContentEvent event); - - /** - * Sets the stream id which is represented by a name. - * - * @param stream - */ - //public void setStreamId(String stream); - - - /** - * Gets stream id. - * - * @return id - */ - public String getStreamId(); - - /** - * Set batch size - * - * @param batchSize - * the suggested size for batching messages on this stream - */ - public void setBatchSize(int batchsize); + + /** + * Puts events into a platform specific data stream. + * + * @param event + */ + public void put(ContentEvent event); + + /** + * Sets the stream id which is represented by a name. + * + * @param stream + */ + // public void setStreamId(String stream); + + /** + * Gets stream id. + * + * @return id + */ + public String getStreamId(); + + /** + * Set batch size + * + * @param batchSize + * the suggested size for batching messages on this stream + */ + public void setBatchSize(int batchsize); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java index 6ad93ed..dce4974 100755 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java @@ -21,65 +21,63 @@ package com.yahoo.labs.samoa.topology; */ public interface Topology { - /* - * Name - */ - /** - * Get the topology's name - * - * @return the name of the topology - */ - public String getTopologyName(); + /* + * Name + */ + /** + * Get the topology's name + * + * @return the name of the topology + */ + public String getTopologyName(); - /** - * Set the topology's name - * - * @param topologyName - * the name of the topology - */ - public void setTopologyName(String topologyName) ; + /** + * Set the topology's name + * + * @param topologyName + * the name of the topology + */ + public void setTopologyName(String topologyName); - /* - * Entrance Processing Items - */ - /** - * Add an EntranceProcessingItem to this topology - * - * @param epi - * the EntranceProcessingItem to be added - */ - void addEntranceProcessingItem(EntranceProcessingItem epi); - - - /* - * Processing Items - */ - /** - * Add a ProcessingItem to this topology - * with default parallelism level (i.e. 1) - * - * @param procItem - * the ProcessingItem to be added - */ - void addProcessingItem(IProcessingItem procItem); - - /** - * Add a ProcessingItem to this topology - * with an associated parallelism level - * - * @param procItem - * the ProcessingItem to be added - * @param parallelismHint - * the parallelism level - */ - void addProcessingItem(IProcessingItem procItem, int parallelismHint); - - /* - * Streams - */ - /** - * - * @param stream - */ - void addStream(Stream stream); + /* + * Entrance Processing Items + */ + /** + * Add an EntranceProcessingItem to this topology + * + * @param epi + * the EntranceProcessingItem to be added + */ + void addEntranceProcessingItem(EntranceProcessingItem epi); + + /* + * Processing Items + */ + /** + * Add a ProcessingItem to this topology with default parallelism level (i.e. + * 1) + * + * @param procItem + * the ProcessingItem to be added + */ + void addProcessingItem(IProcessingItem procItem); + + /** + * Add a ProcessingItem to this topology with an associated parallelism level + * + * @param procItem + * the ProcessingItem to be added + * @param parallelismHint + * the parallelism level + */ + void addProcessingItem(IProcessingItem procItem, int parallelismHint); + + /* + * Streams + */ + /** + * + * @param stream + */ + void addStream(Stream stream); }
