http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java new file mode 100644 index 0000000..e9a5aa1 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java @@ -0,0 +1,229 @@ +package com.yahoo.labs.samoa.streams; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.EntranceProcessor; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.instances.Instance; +import com.yahoo.labs.samoa.instances.Instances; +import com.yahoo.labs.samoa.learners.InstanceContentEvent; +import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler; +import com.yahoo.labs.samoa.moa.streams.InstanceStream; + +/** + * Prequential Source Processor is the processor for Prequential Evaluation Task. + * + * @author Arinto Murdopo + * + */ +public final class PrequentialSourceProcessor implements EntranceProcessor { + + private static final long serialVersionUID = 4169053337917578558L; + + private static final Logger logger = LoggerFactory.getLogger(PrequentialSourceProcessor.class); + private boolean isInited = false; + private StreamSource streamSource; + private Instance firstInstance; + private int numberInstances; + private int numInstanceSent = 0; + + protected InstanceStream sourceStream; + + /* + * ScheduledExecutorService to schedule sending events after each delay interval. + * It is expected to have only one event in the queue at a time, so we need only + * one thread in the pool. + */ + private transient ScheduledExecutorService timer; + private transient ScheduledFuture<?> schedule = null; + private int readyEventIndex = 1; // No waiting for the first event + private int delay = 0; + private int batchSize = 1; + private boolean finished = false; + + @Override + public boolean process(ContentEvent event) { + // TODO: possible refactor of the super-interface implementation + // of source processor does not need this method + return false; + } + + @Override + public boolean isFinished() { + return finished; + } + + @Override + public boolean hasNext() { + return !isFinished() && (delay <= 0 || numInstanceSent < readyEventIndex); + } + + private boolean hasReachedEndOfStream() { + return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && numInstanceSent >= numberInstances)); + } + + @Override + public ContentEvent nextEvent() { + InstanceContentEvent contentEvent = null; + if (hasReachedEndOfStream()) { + contentEvent = new InstanceContentEvent(-1, firstInstance, false, true); + contentEvent.setLast(true); + // set finished status _after_ tagging last event + finished = true; + } + else if (hasNext()) { + numInstanceSent++; + contentEvent = new InstanceContentEvent(numInstanceSent, nextInstance(), true, true); + + // first call to this method will trigger the timer + if (schedule == null && delay > 0) { + schedule = timer.scheduleWithFixedDelay(new DelayTimeoutHandler(this), delay, delay, + TimeUnit.MICROSECONDS); + } + } + return contentEvent; + } + + private void increaseReadyEventIndex() { + readyEventIndex+=batchSize; + // if we exceed the max, cancel the timer + if (schedule != null && isFinished()) { + schedule.cancel(false); + } + } + + @Override + public void onCreate(int id) { + initStreamSource(sourceStream); + timer = Executors.newScheduledThreadPool(1); + logger.debug("Creating PrequentialSourceProcessor with id {}", id); + } + + @Override + public Processor newProcessor(Processor p) { + PrequentialSourceProcessor newProcessor = new PrequentialSourceProcessor(); + PrequentialSourceProcessor originProcessor = (PrequentialSourceProcessor) p; + if (originProcessor.getStreamSource() != null) { + newProcessor.setStreamSource(originProcessor.getStreamSource().getStream()); + } + return newProcessor; + } + +// /** +// * Method to send instances via input stream +// * +// * @param inputStream +// * @param numberInstances +// */ +// public void sendInstances(Stream inputStream, int numberInstances) { +// int numInstanceSent = 0; +// initStreamSource(sourceStream); +// +// while (streamSource.hasMoreInstances() && numInstanceSent < numberInstances) { +// numInstanceSent++; +// InstanceContentEvent contentEvent = new InstanceContentEvent(numInstanceSent, nextInstance(), true, true); +// inputStream.put(contentEvent); +// } +// +// sendEndEvaluationInstance(inputStream); +// } + + public StreamSource getStreamSource() { + return streamSource; + } + + public void setStreamSource(InstanceStream stream) { + this.sourceStream = stream; + } + + public Instances getDataset() { + if (firstInstance == null) { + initStreamSource(sourceStream); + } + return firstInstance.dataset(); + } + + private Instance nextInstance() { + if (this.isInited) { + return streamSource.nextInstance().getData(); + } else { + this.isInited = true; + return firstInstance; + } + } + +// private void sendEndEvaluationInstance(Stream inputStream) { +// InstanceContentEvent contentEvent = new InstanceContentEvent(-1, firstInstance, false, true); +// contentEvent.setLast(true); +// inputStream.put(contentEvent); +// } + + private void initStreamSource(InstanceStream stream) { + if (stream instanceof AbstractOptionHandler) { + ((AbstractOptionHandler) (stream)).prepareForUse(); + } + + this.streamSource = new StreamSource(stream); + firstInstance = streamSource.nextInstance().getData(); + } + + public void setMaxNumInstances(int value) { + numberInstances = value; + } + + public int getMaxNumInstances() { + return this.numberInstances; + } + + public void setSourceDelay(int delay) { + this.delay = delay; + } + + public int getSourceDelay() { + return this.delay; + } + + public void setDelayBatchSize(int batch) { + this.batchSize = batch; + } + + private class DelayTimeoutHandler implements Runnable { + + private PrequentialSourceProcessor processor; + + public DelayTimeoutHandler(PrequentialSourceProcessor processor) { + this.processor = processor; + } + + public void run() { + processor.increaseReadyEventIndex(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java new file mode 100644 index 0000000..453d02d --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java @@ -0,0 +1,90 @@ +package com.yahoo.labs.samoa.streams; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import com.yahoo.labs.samoa.moa.core.Example; +import com.yahoo.labs.samoa.moa.streams.InstanceStream; +import com.yahoo.labs.samoa.instances.Instance; + +/** + * The Class StreamSource. + */ +public class StreamSource implements java.io.Serializable{ + + /** + * + */ + private static final long serialVersionUID = 3974668694861231236L; + + /** + * Instantiates a new stream source. + * + * @param stream the stream + */ + public StreamSource(InstanceStream stream) { + super(); + this.stream = stream; + } + + /** The stream. */ + protected InstanceStream stream; + + /** + * Gets the stream. + * + * @return the stream + */ + public InstanceStream getStream() { + return stream; + } + + /** + * Next instance. + * + * @return the instance + */ + public Example<Instance> nextInstance() { + return stream.nextInstance(); + } + + /** + * Sets the stream. + * + * @param stream the new stream + */ + public void setStream(InstanceStream stream) { + this.stream = stream; + } + + /** + * Checks for more instances. + * + * @return true, if successful + */ + public boolean hasMoreInstances() { + return this.stream.hasMoreInstances(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java new file mode 100644 index 0000000..2e66e4b --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java @@ -0,0 +1,185 @@ +package com.yahoo.labs.samoa.streams; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.instances.Instance; +import com.yahoo.labs.samoa.instances.Instances; +import com.yahoo.labs.samoa.learners.InstanceContentEvent; +import com.yahoo.labs.samoa.moa.streams.InstanceStream; +import com.yahoo.labs.samoa.topology.Stream; + +/** + * The Class StreamSourceProcessor. + */ +public class StreamSourceProcessor implements Processor { + + /** The Constant logger. */ + private static final Logger logger = LoggerFactory + .getLogger(StreamSourceProcessor.class); + + /** + * + */ + private static final long serialVersionUID = -204182279475432739L; + + /** The stream source. */ + private StreamSource streamSource; + + /** + * Gets the stream source. + * + * @return the stream source + */ + public StreamSource getStreamSource() { + return streamSource; + } + + /** + * Sets the stream source. + * + * @param stream the new stream source + */ + public void setStreamSource(InstanceStream stream) { + this.streamSource = new StreamSource(stream); + firstInstance = streamSource.nextInstance().getData(); + } + + /** The number instances sent. */ + private long numberInstancesSent = 0; + + /** + * Send instances. + * @param inputStream the input stream + * @param numberInstances the number instances + * @param isTraining the is training + */ + public void sendInstances(Stream inputStream, + int numberInstances, boolean isTraining, boolean isTesting) { + int numberSamples = 0; + + while (streamSource.hasMoreInstances() + && numberSamples < numberInstances) { + + numberSamples++; + numberInstancesSent++; + InstanceContentEvent instanceContentEvent = new InstanceContentEvent( + numberInstancesSent, nextInstance(), isTraining, isTesting); + + + inputStream.put(instanceContentEvent); + } + + InstanceContentEvent instanceContentEvent = new InstanceContentEvent( + numberInstancesSent, null, isTraining, isTesting); + instanceContentEvent.setLast(true); + inputStream.put(instanceContentEvent); + } + + /** + * Send end evaluation instance. + * + * @param inputStream the input stream + */ + public void sendEndEvaluationInstance(Stream inputStream) { + InstanceContentEvent instanceContentEvent = new InstanceContentEvent(-1, firstInstance,false, true); + inputStream.put(instanceContentEvent); + } + + /** + * Next instance. + * + * @return the instance + */ + protected Instance nextInstance() { + if (this.isInited) { + return streamSource.nextInstance().getData(); + } else { + this.isInited = true; + return firstInstance; + } + } + + /** The is inited. */ + protected boolean isInited = false; + + /** The first instance. */ + protected Instance firstInstance; + + //@Override + /** + * On remove. + */ + protected void onRemove() { + } + + /* (non-Javadoc) + * @see samoa.core.Processor#onCreate(int) + */ + @Override + public void onCreate(int id) { + // TODO Auto-generated method stub + } + + /* (non-Javadoc) + * @see samoa.core.Processor#newProcessor(samoa.core.Processor) + */ + @Override + public Processor newProcessor(Processor sourceProcessor) { +// StreamSourceProcessor newProcessor = new StreamSourceProcessor(); +// StreamSourceProcessor originProcessor = (StreamSourceProcessor) sourceProcessor; +// if (originProcessor.getStreamSource() != null){ +// newProcessor.setStreamSource(originProcessor.getStreamSource().getStream()); +// } + //return newProcessor; + return null; + } + + /** + * On event. + * + * @param event the event + * @return true, if successful + */ + @Override + public boolean process(ContentEvent event) { + return false; + } + + + /** + * Gets the dataset. + * + * @return the dataset + */ + public Instances getDataset() { + return firstInstance.dataset(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java new file mode 100644 index 0000000..25541e2 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java @@ -0,0 +1,67 @@ +package com.yahoo.labs.samoa.streams.fs; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.InputStream; +import java.io.IOException; +import java.io.Serializable; + +/** + * An interface for FileStream's source (Local FS, HDFS,...) + * @author Casey + */ +public interface FileStreamSource extends Serializable { + + /** + * Init the source with file/directory path and file extension + * @param path + * File or directory path + * @param ext + * File extension to be used to filter files in a directory. + * If null, all files in the directory are accepted. + */ + public void init(String path, String ext); + + /** + * Reset the source + */ + public void reset() throws IOException; + + /** + * Retrieve InputStream for next file. + * This method will return null if we are at the last file + * in the list. + * + * @return InputStream for next file in the list + */ + public InputStream getNextInputStream(); + + /** + * Retrieve InputStream for current file. + * The "current pointer" is moved forward + * with getNextInputStream method. So if there was no + * invocation of getNextInputStream, this method will + * return null. + * + * @return InputStream for current file in the list + */ + public InputStream getCurrentInputStream(); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java new file mode 100644 index 0000000..079423c --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java @@ -0,0 +1,150 @@ +package com.yahoo.labs.samoa.streams.fs; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileSystems; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; + +/** + * Source for FileStream for HDFS files + * @author Casey + */ +public class HDFSFileStreamSource implements FileStreamSource { + + /** + * + */ + private static final long serialVersionUID = -3887354805787167400L; + + private transient InputStream fileStream; + private transient Configuration config; + private List<String> filePaths; + private int currentIndex; + + public HDFSFileStreamSource(){ + this.currentIndex = -1; + } + + public void init(String path, String ext) { + this.init(this.getDefaultConfig(), path, ext); + } + + public void init(Configuration config, String path, String ext) { + this.config = config; + this.filePaths = new ArrayList<String>(); + Path hdfsPath = new Path(path); + FileSystem fs; + try { + fs = FileSystem.get(config); + FileStatus fileStat = fs.getFileStatus(hdfsPath); + if (fileStat.isDirectory()) { + Path filterPath = hdfsPath; + if (ext != null) { + filterPath = new Path(path.toString(),"*."+ext); + } + else { + filterPath = new Path(path.toString(),"*"); + } + FileStatus[] filesInDir = fs.globStatus(filterPath); + for (int i=0; i<filesInDir.length; i++) { + if (filesInDir[i].isFile()) { + filePaths.add(filesInDir[i].getPath().toString()); + } + } + } + else { + this.filePaths.add(path); + } + } + catch(IOException ioe) { + throw new RuntimeException("Failed getting list of files at:"+path,ioe); + } + + this.currentIndex = -1; + } + + private Configuration getDefaultConfig() { + String hadoopHome = System.getenv("HADOOP_HOME"); + Configuration conf = new Configuration(); + if (hadoopHome != null) { + java.nio.file.Path coreSitePath = FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/core-site.xml"); + java.nio.file.Path hdfsSitePath = FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/hdfs-site.xml"); + conf.addResource(new Path(coreSitePath.toAbsolutePath().toString())); + conf.addResource(new Path(hdfsSitePath.toAbsolutePath().toString())); + } + return conf; + } + + public void reset() throws IOException { + this.currentIndex = -1; + this.closeFileStream(); + } + + private void closeFileStream() { + IOUtils.closeStream(fileStream); + } + + public InputStream getNextInputStream() { + this.closeFileStream(); + if (this.currentIndex >= (this.filePaths.size()-1)) return null; + + this.currentIndex++; + String filePath = this.filePaths.get(currentIndex); + + Path hdfsPath = new Path(filePath); + FileSystem fs; + try { + fs = FileSystem.get(config); + fileStream = fs.open(hdfsPath); + } + catch(IOException ioe) { + this.closeFileStream(); + throw new RuntimeException("Failed opening file:"+filePath,ioe); + } + + return fileStream; + } + + public InputStream getCurrentInputStream() { + return fileStream; + } + + protected int getFilePathListSize() { + if (filePaths != null) + return filePaths.size(); + return 0; + } + + protected String getFilePathAt(int index) { + if (filePaths != null && filePaths.size() > index) + return filePaths.get(index); + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java new file mode 100644 index 0000000..c0ab44f --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java @@ -0,0 +1,131 @@ +package com.yahoo.labs.samoa.streams.fs; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.io.FileInputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileSystems; +import java.util.ArrayList; +import java.util.List; + +/** + * Source for FileStream for local files + * @author Casey + */ +public class LocalFileStreamSource implements FileStreamSource { + /** + * + */ + private static final long serialVersionUID = 3986511547525870698L; + + private transient InputStream fileStream; + private List<String> filePaths; + private int currentIndex; + + public LocalFileStreamSource(){ + this.currentIndex = -1; + } + + public void init(String path, String ext) { + this.filePaths = new ArrayList<String>(); + File fileAtPath = new File(path); + if (fileAtPath.isDirectory()) { + File[] filesInDir = fileAtPath.listFiles(new FileExtensionFilter(ext)); + for (int i=0; i<filesInDir.length; i++) { + filePaths.add(filesInDir[i].getAbsolutePath()); + } + } + else { + this.filePaths.add(path); + } + this.currentIndex = -1; + } + + public void reset() throws IOException { + this.currentIndex = -1; + this.closeFileStream(); + } + + private void closeFileStream() { + if (fileStream != null) { + try { + fileStream.close(); + } catch (IOException ioe) { + ioe.printStackTrace(); + } + } + } + + public InputStream getNextInputStream() { + this.closeFileStream(); + + if (this.currentIndex >= (this.filePaths.size()-1)) return null; + + this.currentIndex++; + String filePath = this.filePaths.get(currentIndex); + + File file = new File(filePath); + try { + fileStream = new FileInputStream(file); + } + catch(IOException ioe) { + this.closeFileStream(); + throw new RuntimeException("Failed opening file:"+filePath,ioe); + } + + return fileStream; + } + + public InputStream getCurrentInputStream() { + return fileStream; + } + + protected int getFilePathListSize() { + if (filePaths != null) + return filePaths.size(); + return 0; + } + + protected String getFilePathAt(int index) { + if (filePaths != null && filePaths.size() > index) + return filePaths.get(index); + return null; + } + + private class FileExtensionFilter implements FilenameFilter { + private String extension; + FileExtensionFilter(String ext) { + extension = ext; + } + + @Override + public boolean accept(File dir, String name) { + File f = new File(dir,name); + if (extension == null) + return f.isFile(); + else + return f.isFile() && name.toLowerCase().endsWith("."+extension); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java new file mode 100644 index 0000000..4af3764 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java @@ -0,0 +1,174 @@ +package com.yahoo.labs.samoa.tasks; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import java.text.SimpleDateFormat; +import java.util.Date; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.ClassOption; +import com.github.javacliparser.Configurable; +import com.github.javacliparser.FileOption; +import com.github.javacliparser.FloatOption; +import com.github.javacliparser.IntOption; +import com.github.javacliparser.StringOption; +import com.yahoo.labs.samoa.evaluation.ClusteringEvaluatorProcessor; +import com.yahoo.labs.samoa.learners.Learner; +import com.yahoo.labs.samoa.learners.clusterers.simple.ClusteringDistributorProcessor; +import com.yahoo.labs.samoa.learners.clusterers.simple.DistributedClusterer; +import com.yahoo.labs.samoa.moa.streams.InstanceStream; +import com.yahoo.labs.samoa.moa.streams.clustering.ClusteringStream; +import com.yahoo.labs.samoa.moa.streams.clustering.RandomRBFGeneratorEvents; +import com.yahoo.labs.samoa.streams.ClusteringEntranceProcessor; +import com.yahoo.labs.samoa.topology.ComponentFactory; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.Topology; +import com.yahoo.labs.samoa.topology.TopologyBuilder; + +/** + * A task that runs and evaluates a distributed clustering algorithm. + * + */ +public class ClusteringEvaluation implements Task, Configurable { + + private static final long serialVersionUID = -8246537378371580550L; + + private static final int DISTRIBUTOR_PARALLELISM = 1; + + private static final Logger logger = LoggerFactory.getLogger(ClusteringEvaluation.class); + + public ClassOption learnerOption = new ClassOption("learner", 'l', "Clustering to run.", Learner.class, DistributedClusterer.class.getName()); + + public ClassOption streamTrainOption = new ClassOption("streamTrain", 's', "Input stream.", InstanceStream.class, + RandomRBFGeneratorEvents.class.getName()); + + public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i', "Maximum number of instances to test/train on (-1 = no limit).", 100000, -1, + Integer.MAX_VALUE); + + public IntOption measureCollectionTypeOption = new IntOption("measureCollectionType", 'm', "Type of measure collection", 0, 0, Integer.MAX_VALUE); + + public IntOption timeLimitOption = new IntOption("timeLimit", 't', "Maximum number of seconds to test/train for (-1 = no limit).", -1, -1, + Integer.MAX_VALUE); + + public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 'f', "How many instances between samples of the learning performance.", 1000, 0, + Integer.MAX_VALUE); + + public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", "Clustering__" + + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); + + public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File to append intermediate csv results to", null, "csv", true); + + public FloatOption samplingThresholdOption = new FloatOption("samplingThreshold", 'a', "Ratio of instances sampled that will be used for evaluation.", 0.5, + 0.0, 1.0); + + private ClusteringEntranceProcessor source; + private InstanceStream streamTrain; + private ClusteringDistributorProcessor distributor; + private Stream distributorStream; + private Stream evaluationStream; + + // Default=0: no delay/waiting + public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w', "How many miliseconds between injections of two instances.", 0, 0, Integer.MAX_VALUE); + + private Learner learner; + private ClusteringEvaluatorProcessor evaluator; + + private Topology topology; + private TopologyBuilder builder; + + public void getDescription(StringBuilder sb) { + sb.append("Clustering evaluation"); + } + + @Override + public void init() { + // TODO remove the if statement theoretically, dynamic binding will work here! for now, the if statement is used by Storm + + if (builder == null) { + logger.warn("Builder was not initialized, initializing it from the Task"); + + builder = new TopologyBuilder(); + logger.debug("Successfully instantiating TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue(), sourceDelayOption.getValue()); + logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + } + + // instantiate ClusteringEntranceProcessor and its output stream (sourceStream) + source = new ClusteringEntranceProcessor(); + streamTrain = this.streamTrainOption.getValue(); + source.setStreamSource(streamTrain); + builder.addEntranceProcessor(source); + source.setSamplingThreshold(samplingThresholdOption.getValue()); + source.setMaxNumInstances(instanceLimitOption.getValue()); + logger.debug("Successfully instantiated ClusteringEntranceProcessor"); + + Stream sourceStream = builder.createStream(source); + // starter.setInputStream(sourcePiOutputStream); // FIXME set stream in the EntrancePI + + // distribution of instances and sampling for evaluation + distributor = new ClusteringDistributorProcessor(); + builder.addProcessor(distributor, DISTRIBUTOR_PARALLELISM); + builder.connectInputShuffleStream(sourceStream, distributor); + distributorStream = builder.createStream(distributor); + distributor.setOutputStream(distributorStream); + evaluationStream = builder.createStream(distributor); + distributor.setEvaluationStream(evaluationStream); // passes evaluation events along + logger.debug("Successfully instantiated Distributor"); + + // instantiate learner and connect it to distributorStream + learner = this.learnerOption.getValue(); + learner.init(builder, source.getDataset(), 1); + builder.connectInputShuffleStream(distributorStream, learner.getInputProcessor()); + logger.debug("Successfully instantiated Learner"); + + evaluator = new ClusteringEvaluatorProcessor.Builder( + sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile()) + .decayHorizon(((ClusteringStream) streamTrain).getDecayHorizon()).build(); + + builder.addProcessor(evaluator); + for (Stream evaluatorPiInputStream:learner.getResultStreams()) { + builder.connectInputShuffleStream(evaluatorPiInputStream, evaluator); + } + builder.connectInputAllStream(evaluationStream, evaluator); + logger.debug("Successfully instantiated EvaluatorProcessor"); + + topology = builder.build(); + logger.debug("Successfully built the topology"); + } + + @Override + public void setFactory(ComponentFactory factory) { + // TODO unify this code with init() for now, it's used by S4 App + // dynamic binding theoretically will solve this problem + builder = new TopologyBuilder(factory); + logger.debug("Successfully instantiated TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue()); + logger.debug("Successfully initialized SAMOA topology with name {}", evaluationNameOption.getValue()); + + } + + public Topology getTopology() { + return topology; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java new file mode 100644 index 0000000..70c44a1 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java @@ -0,0 +1,206 @@ +package com.yahoo.labs.samoa.tasks; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import java.text.SimpleDateFormat; +import java.util.Date; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.ClassOption; +import com.github.javacliparser.Configurable; +import com.github.javacliparser.FileOption; +import com.github.javacliparser.IntOption; +import com.github.javacliparser.StringOption; +import com.yahoo.labs.samoa.evaluation.BasicClassificationPerformanceEvaluator; +import com.yahoo.labs.samoa.evaluation.BasicRegressionPerformanceEvaluator; +import com.yahoo.labs.samoa.evaluation.ClassificationPerformanceEvaluator; +import com.yahoo.labs.samoa.evaluation.PerformanceEvaluator; +import com.yahoo.labs.samoa.evaluation.EvaluatorProcessor; +import com.yahoo.labs.samoa.evaluation.RegressionPerformanceEvaluator; +import com.yahoo.labs.samoa.learners.ClassificationLearner; +import com.yahoo.labs.samoa.learners.Learner; +import com.yahoo.labs.samoa.learners.RegressionLearner; +import com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree; +import com.yahoo.labs.samoa.moa.streams.InstanceStream; +import com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator; +import com.yahoo.labs.samoa.streams.PrequentialSourceProcessor; +import com.yahoo.labs.samoa.topology.ComponentFactory; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.Topology; +import com.yahoo.labs.samoa.topology.TopologyBuilder; + +/** + * Prequential Evaluation task is a scheme in evaluating performance of online classifiers which uses each instance for testing online classifiers model and + * then it further uses the same instance for training the model(Test-then-train) + * + * @author Arinto Murdopo + * + */ +public class PrequentialEvaluation implements Task, Configurable { + + private static final long serialVersionUID = -8246537378371580550L; + + private static Logger logger = LoggerFactory.getLogger(PrequentialEvaluation.class); + + public ClassOption learnerOption = new ClassOption("learner", 'l', "Classifier to train.", Learner.class, VerticalHoeffdingTree.class.getName()); + + public ClassOption streamTrainOption = new ClassOption("trainStream", 's', "Stream to learn from.", InstanceStream.class, + RandomTreeGenerator.class.getName()); + + public ClassOption evaluatorOption = new ClassOption("evaluator", 'e', "Classification performance evaluation method.", + PerformanceEvaluator.class, BasicClassificationPerformanceEvaluator.class.getName()); + + public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i', "Maximum number of instances to test/train on (-1 = no limit).", 1000000, -1, + Integer.MAX_VALUE); + + public IntOption timeLimitOption = new IntOption("timeLimit", 't', "Maximum number of seconds to test/train for (-1 = no limit).", -1, -1, + Integer.MAX_VALUE); + + public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 'f', "How many instances between samples of the learning performance.", 100000, + 0, Integer.MAX_VALUE); + + public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", "Prequential_" + + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); + + public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File to append intermediate csv results to", null, "csv", true); + + // Default=0: no delay/waiting + public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w', "How many microseconds between injections of two instances.", 0, 0, Integer.MAX_VALUE); + // Batch size to delay the incoming stream: delay of x milliseconds after each batch + public IntOption batchDelayOption = new IntOption("delayBatchSize", 'b', "The delay batch size: delay of x milliseconds after each batch ", 1, 1, Integer.MAX_VALUE); + + private PrequentialSourceProcessor preqSource; + + // private PrequentialSourceTopologyStarter preqStarter; + + // private EntranceProcessingItem sourcePi; + + private Stream sourcePiOutputStream; + + private Learner classifier; + + private EvaluatorProcessor evaluator; + + // private ProcessingItem evaluatorPi; + + // private Stream evaluatorPiInputStream; + + private Topology prequentialTopology; + + private TopologyBuilder builder; + + public void getDescription(StringBuilder sb, int indent) { + sb.append("Prequential evaluation"); + } + + @Override + public void init() { + // TODO remove the if statement + // theoretically, dynamic binding will work here! + // test later! + // for now, the if statement is used by Storm + + if (builder == null) { + builder = new TopologyBuilder(); + logger.debug("Successfully instantiating TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue()); + logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + } + + // instantiate PrequentialSourceProcessor and its output stream (sourcePiOutputStream) + preqSource = new PrequentialSourceProcessor(); + preqSource.setStreamSource((InstanceStream) this.streamTrainOption.getValue()); + preqSource.setMaxNumInstances(instanceLimitOption.getValue()); + preqSource.setSourceDelay(sourceDelayOption.getValue()); + preqSource.setDelayBatchSize(batchDelayOption.getValue()); + builder.addEntranceProcessor(preqSource); + logger.debug("Successfully instantiating PrequentialSourceProcessor"); + + // preqStarter = new PrequentialSourceTopologyStarter(preqSource, instanceLimitOption.getValue()); + // sourcePi = builder.createEntrancePi(preqSource, preqStarter); + // sourcePiOutputStream = builder.createStream(sourcePi); + + sourcePiOutputStream = builder.createStream(preqSource); + // preqStarter.setInputStream(sourcePiOutputStream); + + // instantiate classifier and connect it to sourcePiOutputStream + classifier = this.learnerOption.getValue(); + classifier.init(builder, preqSource.getDataset(), 1); + builder.connectInputShuffleStream(sourcePiOutputStream, classifier.getInputProcessor()); + logger.debug("Successfully instantiating Classifier"); + + PerformanceEvaluator evaluatorOptionValue = this.evaluatorOption.getValue(); + if (!PrequentialEvaluation.isLearnerAndEvaluatorCompatible(classifier, evaluatorOptionValue)) { + evaluatorOptionValue = getDefaultPerformanceEvaluatorForLearner(classifier); + } + evaluator = new EvaluatorProcessor.Builder(evaluatorOptionValue) + .samplingFrequency(sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile()).build(); + + // evaluatorPi = builder.createPi(evaluator); + // evaluatorPi.connectInputShuffleStream(evaluatorPiInputStream); + builder.addProcessor(evaluator); + for (Stream evaluatorPiInputStream:classifier.getResultStreams()) { + builder.connectInputShuffleStream(evaluatorPiInputStream, evaluator); + } + + logger.debug("Successfully instantiating EvaluatorProcessor"); + + prequentialTopology = builder.build(); + logger.debug("Successfully building the topology"); + } + + @Override + public void setFactory(ComponentFactory factory) { + // TODO unify this code with init() + // for now, it's used by S4 App + // dynamic binding theoretically will solve this problem + builder = new TopologyBuilder(factory); + logger.debug("Successfully instantiating TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue()); + logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + + } + + public Topology getTopology() { + return prequentialTopology; + } + // + // @Override + // public TopologyStarter getTopologyStarter() { + // return this.preqStarter; + // } + + private static boolean isLearnerAndEvaluatorCompatible(Learner learner, PerformanceEvaluator evaluator) { + return (learner instanceof RegressionLearner && evaluator instanceof RegressionPerformanceEvaluator) || + (learner instanceof ClassificationLearner && evaluator instanceof ClassificationPerformanceEvaluator); + } + + private static PerformanceEvaluator getDefaultPerformanceEvaluatorForLearner(Learner learner) { + if (learner instanceof RegressionLearner) { + return new BasicRegressionPerformanceEvaluator(); + } + // Default to BasicClassificationPerformanceEvaluator for all other cases + return new BasicClassificationPerformanceEvaluator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java new file mode 100644 index 0000000..41b47e4 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java @@ -0,0 +1,61 @@ +package com.yahoo.labs.samoa.tasks; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.topology.ComponentFactory; +import com.yahoo.labs.samoa.topology.Topology; + +/** + * Task interface, the mother of all SAMOA tasks! + */ +public interface Task { + + /** + * Initialize this SAMOA task, + * i.e. create and connect ProcessingItems and Streams + * and initialize the topology + */ + public void init(); + + /** + * Return the final topology object to be executed in the cluster + * @return topology object to be submitted to be executed in the cluster + */ + public Topology getTopology(); + + // /** + // * Return the entrance processor to start SAMOA topology + // * The logic to start the topology should be implemented here + // * @return entrance processor to start the topology + // */ + // public TopologyStarter getTopologyStarter(); + + /** + * Sets the factory. + * TODO: propose to hide factory from task, + * i.e. Task will only see TopologyBuilder, + * and factory creation will be handled by TopologyBuilder + * + * @param factory the new factory + */ + public void setFactory(ComponentFactory factory) ; + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java new file mode 100644 index 0000000..c0f0cc3 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java @@ -0,0 +1,108 @@ +package com.yahoo.labs.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.EntranceProcessor; + +/** + * Helper class for EntranceProcessingItem implementation. + * @author Anh Thu Vu + * + */ +public abstract class AbstractEntranceProcessingItem implements EntranceProcessingItem { + private EntranceProcessor processor; + private String name; + private Stream outputStream; + + /* + * Constructor + */ + public AbstractEntranceProcessingItem() { + this(null); + } + public AbstractEntranceProcessingItem(EntranceProcessor processor) { + this.processor = processor; + } + + /* + * Processor + */ + /** + * Set the entrance processor for this EntranceProcessingItem + * @param processor + * the processor + */ + protected void setProcessor(EntranceProcessor processor) { + this.processor = processor; + } + + /** + * Get the EntranceProcessor of this EntranceProcessingItem. + * @return the EntranceProcessor + */ + public EntranceProcessor getProcessor() { + return this.processor; + } + + /* + * Name/ID + */ + /** + * Set the name (or ID) of this EntranceProcessingItem + * @param name + */ + public void setName(String name) { + this.name = name; + } + + /** + * Get the name (or ID) of this EntranceProcessingItem + * @return the name (or ID) + */ + public String getName() { + return this.name; + } + + /* + * Output Stream + */ + /** + * Set the output stream of this EntranceProcessingItem. + * An EntranceProcessingItem should have only 1 single output stream and + * should not be re-assigned. + * @return this EntranceProcessingItem + */ + public EntranceProcessingItem setOutputStream(Stream outputStream) { + if (this.outputStream != null && this.outputStream != outputStream) { + throw new IllegalStateException("Cannot overwrite output stream of EntranceProcessingItem"); + } else + this.outputStream = outputStream; + return this; + } + + /** + * Get the output stream of this EntranceProcessingItem. + * @return the output stream + */ + public Stream getOutputStream() { + return this.outputStream; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java new file mode 100644 index 0000000..d0f04f7 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java @@ -0,0 +1,161 @@ +package com.yahoo.labs.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.utils.PartitioningScheme; + +/** + * Abstract ProcessingItem + * + * Helper for implementation of ProcessingItem. It has basic information + * for a ProcessingItem: name, parallelismLevel and a processor. + * Subclass of this class needs to implement {@link #addInputStream(Stream, PartitioningScheme)}. + * + * @author Anh Thu Vu + * + */ +public abstract class AbstractProcessingItem implements ProcessingItem { + private String name; + private int parallelism; + private Processor processor; + + /* + * Constructor + */ + public AbstractProcessingItem() { + this(null); + } + public AbstractProcessingItem(Processor processor) { + this(processor,1); + } + public AbstractProcessingItem(Processor processor, int parallelism) { + this.processor = processor; + this.parallelism = parallelism; + } + + /* + * Processor + */ + /** + * Set the processor for this ProcessingItem + * @param processor + * the processor + */ + protected void setProcessor(Processor processor) { + this.processor = processor; + } + + /** + * Get the processor of this ProcessingItem + * @return the processor + */ + public Processor getProcessor() { + return this.processor; + } + + /* + * Parallelism + */ + /** + * Set the parallelism factor of this ProcessingItem + * @param parallelism + */ + protected void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + /** + * Get the parallelism factor of this ProcessingItem + * @return the parallelism factor + */ + @Override + public int getParallelism() { + return this.parallelism; + } + + /* + * Name/ID + */ + /** + * Set the name (or ID) of this ProcessingItem + * @param name + * the name/ID + */ + public void setName(String name) { + this.name = name; + } + + /** + * Get the name (or ID) of this ProcessingItem + * @return the name/ID + */ + public String getName() { + return this.name; + } + + /* + * Add input streams + */ + /** + * Add an input stream to this ProcessingItem + * + * @param inputStream + * the input stream to add + * @param scheme + * partitioning scheme associated with this ProcessingItem and the input stream + * @return this ProcessingItem + */ + protected abstract ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme); + + /** + * Add an input stream to this ProcessingItem with SHUFFLE scheme + * + * @param inputStream + * the input stream + * @return this ProcessingItem + */ + public ProcessingItem connectInputShuffleStream(Stream inputStream) { + return this.addInputStream(inputStream, PartitioningScheme.SHUFFLE); + } + + /** + * Add an input stream to this ProcessingItem with GROUP_BY_KEY scheme + * + * @param inputStream + * the input stream + * @return this ProcessingItem + */ + public ProcessingItem connectInputKeyStream(Stream inputStream) { + return this.addInputStream(inputStream, PartitioningScheme.GROUP_BY_KEY); + } + + /** + * Add an input stream to this ProcessingItem with BROADCAST scheme + * + * @param inputStream + * the input stream + * @return this ProcessingItem + */ + public ProcessingItem connectInputAllStream(Stream inputStream) { + return this.addInputStream(inputStream, PartitioningScheme.BROADCAST); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java new file mode 100644 index 0000000..b3544ed --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java @@ -0,0 +1,115 @@ +package com.yahoo.labs.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.ContentEvent; + +/** + * Abstract Stream + * + * Helper for implementation of Stream. It has basic information + * for a Stream: streamID and source ProcessingItem. + * Subclass of this class needs to implement {@link #put(ContentEvent)}. + * + * @author Anh Thu Vu + * + */ + +public abstract class AbstractStream implements Stream { + private String streamID; + private IProcessingItem sourcePi; + private int batchSize; + + /* + * Constructor + */ + public AbstractStream() { + this(null); + } + public AbstractStream(IProcessingItem sourcePi) { + this.sourcePi = sourcePi; + this.batchSize = 1; + } + + /** + * Get source processing item of this stream + * @return + */ + public IProcessingItem getSourceProcessingItem() { + return this.sourcePi; + } + + /* + * Process event + */ + @Override + /** + * Send a ContentEvent + * @param event + * the ContentEvent to be sent + */ + public abstract void put(ContentEvent event); + + /* + * Stream name + */ + /** + * Get name (ID) of this stream + * @return the name (ID) + */ + @Override + public String getStreamId() { + return this.streamID; + } + + /** + * Set the name (ID) of this stream + * @param streamID + * the name (ID) + */ + public void setStreamId (String streamID) { + this.streamID = streamID; + } + + /* + * Batch size + */ + /** + * Set suggested batch size + * + * @param batchSize + * the suggested batch size + * + */ + @Override + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + /** + * Get suggested batch size + * + * @return the suggested batch size + */ + public int getBatchSize() { + return this.batchSize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java new file mode 100755 index 0000000..53385b1 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java @@ -0,0 +1,133 @@ +package com.yahoo.labs.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.HashSet; +import java.util.Set; + +/** + * Topology abstract class. + * + * It manages basic information of a topology: name, sets of Streams and ProcessingItems + * + */ +public abstract class AbstractTopology implements Topology { + + private String topoName; + private Set<Stream> streams; + private Set<IProcessingItem> processingItems; + private Set<EntranceProcessingItem> entranceProcessingItems; + + protected AbstractTopology(String name) { + this.topoName = name; + this.streams = new HashSet<>(); + this.processingItems = new HashSet<>(); + this.entranceProcessingItems = new HashSet<>(); + } + + /** + * Gets the name of this topology + * + * @return name of the topology + */ + public String getTopologyName() { + return this.topoName; + } + + /** + * Sets the name of this topology + * + * @param topologyName + * name of the topology + */ + public void setTopologyName(String topologyName) { + this.topoName = topologyName; + } + + /** + * Adds an Entrance processing item to the topology. + * + * @param epi + * Entrance processing item + */ + public void addEntranceProcessingItem(EntranceProcessingItem epi) { + this.entranceProcessingItems.add(epi); + this.addProcessingItem(epi); + } + + /** + * Gets entrance processing items in the topology + * + * @return the set of processing items + */ + public Set<EntranceProcessingItem> getEntranceProcessingItems() { + return this.entranceProcessingItems; + } + + /** + * Add processing item to topology. + * + * @param procItem + * Processing item. + */ + public void addProcessingItem(IProcessingItem procItem) { + addProcessingItem(procItem, 1); + } + + /** + * Add processing item to topology. + * + * @param procItem + * Processing item. + * @param parallelismHint + * Processing item parallelism level. + */ + public void addProcessingItem(IProcessingItem procItem, int parallelismHint) { + this.processingItems.add(procItem); + } + + /** + * Gets processing items in the topology (including entrance processing items) + * + * @return the set of processing items + */ + public Set<IProcessingItem> getProcessingItems() { + return this.processingItems; + } + + /** + * Add stream to topology. + * + * @param stream + */ + public void addStream(Stream stream) { + this.streams.add(stream); + } + + /** + * Gets streams in the topology + * + * @return the set of streams + */ + public Set<Stream> getStreams() { + return this.streams; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java new file mode 100644 index 0000000..433f516 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java @@ -0,0 +1,78 @@ +package com.yahoo.labs.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.EntranceProcessor; +import com.yahoo.labs.samoa.core.Processor; + +/** + * ComponentFactory interface. Provides platform specific components. + */ +public interface ComponentFactory { + + /** + * Creates a platform specific processing item with the specified processor. + * + * @param processor + * contains the logic for this processing item. + * @return ProcessingItem + */ + public ProcessingItem createPi(Processor processor); + + /** + * Creates a platform specific processing item with the specified processor. Additionally sets the parallelism level. + * + * @param processor + * contains the logic for this processing item. + * @param parallelism + * defines the amount of instances of this processing item will be created. + * @return ProcessingItem + */ + public ProcessingItem createPi(Processor processor, int parallelism); + + /** + * Creates a platform specific processing item with the specified processor that is the entrance point in the topology. This processing item can either + * generate a stream of data or connect to an external stream of data. + * + * @param entranceProcessor + * contains the logic for this processing item. + * @return EntranceProcessingItem + */ + public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor); + + /** + * Creates a platform specific stream. + * + * @param sourcePi + * source processing item which will provide the events for this stream. + * @return Stream + */ + public Stream createStream(IProcessingItem sourcePi); + + /** + * Creates a platform specific topology. + * + * @param topoName + * Topology name. + * @return Topology + */ + public Topology createTopology(String topoName); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java new file mode 100644 index 0000000..32ed109 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java @@ -0,0 +1,46 @@ +package com.yahoo.labs.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.EntranceProcessor; + +/** + * Entrance processing item interface. + */ +public interface EntranceProcessingItem extends IProcessingItem { + + @Override + /** + * Gets the processing item processor. + * + * @return the embedded EntranceProcessor. + */ + public EntranceProcessor getProcessor(); + + /** + * Set the single output stream for this EntranceProcessingItem. + * + * @param stream + * the stream + * @return the current instance of the EntranceProcessingItem for fluent interface. + */ + public EntranceProcessingItem setOutputStream(Stream stream); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java new file mode 100644 index 0000000..7a70dc4 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java @@ -0,0 +1,47 @@ +package com.yahoo.labs.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.Processor; + +/** + * ProcessingItem interface specific for entrance processing items. + * + * @author severien + * + */ +public interface IProcessingItem { + + /** + * Gets the processing item processor. + * + * @return Processor + */ + public Processor getProcessor(); + + /** + * Sets processing item name. + * + * @param name + */ + //public void setName(String name); + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java new file mode 100644 index 0000000..8499f80 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java @@ -0,0 +1,46 @@ +package com.yahoo.labs.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.tasks.Task; + +/** + * Submitter interface for programatically deploying platform specific topologies. + * + * @author severien + * + */ +public interface ISubmitter { + + /** + * Deploy a specific task to a platform. + * + * @param task + */ + public void deployTask(Task task); + + /** + * Sets if the task should run locally or distributed. + * + * @param bool + */ + public void setLocal(boolean bool); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java new file mode 100644 index 0000000..2e8758f --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java @@ -0,0 +1,85 @@ +package com.yahoo.labs.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.EntranceProcessor; + +/** + * Implementation of EntranceProcessingItem for local engines (Simple, Multithreads) + * + * @author Anh Thu Vu + * + */ +public class LocalEntranceProcessingItem extends AbstractEntranceProcessingItem { + public LocalEntranceProcessingItem(EntranceProcessor processor) { + super(processor); + } + + /** + * If there are available events, first event in the queue will be + * sent out on the output stream. + * @return true if there is (at least) one available event and it was sent out + * false otherwise + */ + public boolean injectNextEvent() { + if (this.getProcessor().hasNext()) { + ContentEvent event = this.getProcessor().nextEvent(); + this.getOutputStream().put(event); + return true; + } + return false; + } + + /** + * Start sending events by calling {@link #injectNextEvent()}. If there are no available events, + * and that the stream is not entirely consumed, it will wait by calling + * {@link #waitForNewEvents()} before attempting to send again. + * </p> + * When the stream is entirely consumed, the last event is tagged accordingly and the processor gets the + * finished status. + * + */ + public void startSendingEvents () { + if (this.getOutputStream() == null) + throw new IllegalStateException("Try sending events from EntrancePI while outputStream is not set."); + + while (!this.getProcessor().isFinished()) { + if (!this.injectNextEvent()) { + try { + waitForNewEvents(); + } catch (Exception e) { + e.printStackTrace(); + break; + } + } + } + } + + /** + * Method to wait for an amount of time when there are no available events. + * Implementation of EntranceProcessingItem should override this method to + * implement non-blocking wait or to adjust the amount of time. + */ + protected void waitForNewEvents() throws Exception { + Thread.sleep(100); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java new file mode 100644 index 0000000..02fb84d --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java @@ -0,0 +1,70 @@ +package com.yahoo.labs.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * Processing item interface. + * + * @author severien + * + */ +public interface ProcessingItem extends IProcessingItem { + + /** + * Connects this processing item in a round robin fashion. The events will + * be distributed evenly between the instantiated processing items. + * + * @param inputStream + * Stream to connect this processing item. + * @return ProcessingItem + */ + public ProcessingItem connectInputShuffleStream(Stream inputStream); + + /** + * Connects this processing item taking the event key into account. Events + * will be routed to the processing item according to the modulus of its key + * and the paralellism level. Ex.: key = 5 and paralellism = 2, 5 mod 2 = 1. + * Processing item responsible for 1 will receive this event. + * + * @param inputStream + * Stream to connect this processing item. + * @return ProcessingItem + */ + public ProcessingItem connectInputKeyStream(Stream inputStream); + + /** + * Connects this processing item to the stream in a broadcast fashion. All + * processing items of this type will receive copy of the original event. + * + * @param inputStream + * Stream to connect this processing item. + * @return ProcessingItem + */ + public ProcessingItem connectInputAllStream(Stream inputStream); + + + /** + * Gets processing item parallelism level. + * + * @return int + */ + public int getParallelism(); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java new file mode 100644 index 0000000..b496d35 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java @@ -0,0 +1,62 @@ +package com.yahoo.labs.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.ContentEvent; + +/** + * Stream interface. + * + * @author severien + * + */ +public interface Stream { + + /** + * Puts events into a platform specific data stream. + * + * @param event + */ + public void put(ContentEvent event); + + /** + * Sets the stream id which is represented by a name. + * + * @param stream + */ + //public void setStreamId(String stream); + + + /** + * Gets stream id. + * + * @return id + */ + public String getStreamId(); + + /** + * Set batch size + * + * @param batchSize + * the suggested size for batching messages on this stream + */ + public void setBatchSize(int batchsize); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java new file mode 100755 index 0000000..6ad93ed --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java @@ -0,0 +1,85 @@ +package com.yahoo.labs.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +public interface Topology { + /* + * Name + */ + /** + * Get the topology's name + * + * @return the name of the topology + */ + public String getTopologyName(); + + /** + * Set the topology's name + * + * @param topologyName + * the name of the topology + */ + public void setTopologyName(String topologyName) ; + + /* + * Entrance Processing Items + */ + /** + * Add an EntranceProcessingItem to this topology + * + * @param epi + * the EntranceProcessingItem to be added + */ + void addEntranceProcessingItem(EntranceProcessingItem epi); + + + /* + * Processing Items + */ + /** + * Add a ProcessingItem to this topology + * with default parallelism level (i.e. 1) + * + * @param procItem + * the ProcessingItem to be added + */ + void addProcessingItem(IProcessingItem procItem); + + /** + * Add a ProcessingItem to this topology + * with an associated parallelism level + * + * @param procItem + * the ProcessingItem to be added + * @param parallelismHint + * the parallelism level + */ + void addProcessingItem(IProcessingItem procItem, int parallelismHint); + + /* + * Streams + */ + /** + * + * @param stream + */ + void addStream(Stream stream); +}
