http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java index fc0630a..758ae1e 100644 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java @@ -31,87 +31,88 @@ import backtype.storm.Config; import backtype.storm.utils.Utils; /** - * The main class that used by samoa script to execute SAMOA task. + * The main class that used by samoa script to execute SAMOA task. * * @author Arinto Murdopo - * + * */ public class StormDoTask { - private static final Logger logger = LoggerFactory.getLogger(StormDoTask.class); - private static String localFlag = "local"; - private static String clusterFlag = "cluster"; - - /** - * The main method. - * - * @param args the arguments - */ - public static void main(String[] args) { - - List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); - - boolean isLocal = isLocal(tmpArgs); - int numWorker = StormSamoaUtils.numWorkers(tmpArgs); - - args = tmpArgs.toArray(new String[0]); - - //convert the arguments into Storm topology - StormTopology stormTopo = StormSamoaUtils.argsToTopology(args); - String topologyName = stormTopo.getTopologyName(); - - Config conf = new Config(); - conf.putAll(Utils.readStormConfig()); - conf.setDebug(false); - - - if(isLocal){ - //local mode - conf.setMaxTaskParallelism(numWorker); - - backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster(); - cluster.submitTopology(topologyName , conf, stormTopo.getStormBuilder().createTopology()); - - backtype.storm.utils.Utils.sleep(600*1000); - - cluster.killTopology(topologyName); - cluster.shutdown(); - - }else{ - //cluster mode - conf.setNumWorkers(numWorker); - try { - backtype.storm.StormSubmitter.submitTopology(topologyName, conf, - stormTopo.getStormBuilder().createTopology()); - } catch (backtype.storm.generated.AlreadyAliveException ale) { - ale.printStackTrace(); - } catch (backtype.storm.generated.InvalidTopologyException ite) { - ite.printStackTrace(); - } - } - } - - private static boolean isLocal(List<String> tmpArgs){ - ExecutionMode executionMode = ExecutionMode.UNDETERMINED; - - int position = tmpArgs.size() - 1; - String flag = tmpArgs.get(position); - boolean isLocal = true; - - if(flag.equals(clusterFlag)){ - executionMode = ExecutionMode.CLUSTER; - isLocal = false; - }else if(flag.equals(localFlag)){ - executionMode = ExecutionMode.LOCAL; - isLocal = true; - } - - if(executionMode != ExecutionMode.UNDETERMINED){ - tmpArgs.remove(position); - } - - return isLocal; - } - - private enum ExecutionMode {LOCAL, CLUSTER, UNDETERMINED}; + private static final Logger logger = LoggerFactory.getLogger(StormDoTask.class); + private static String localFlag = "local"; + private static String clusterFlag = "cluster"; + + /** + * The main method. + * + * @param args + * the arguments + */ + public static void main(String[] args) { + + List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + + boolean isLocal = isLocal(tmpArgs); + int numWorker = StormSamoaUtils.numWorkers(tmpArgs); + + args = tmpArgs.toArray(new String[0]); + + // convert the arguments into Storm topology + StormTopology stormTopo = StormSamoaUtils.argsToTopology(args); + String topologyName = stormTopo.getTopologyName(); + + Config conf = new Config(); + conf.putAll(Utils.readStormConfig()); + conf.setDebug(false); + + if (isLocal) { + // local mode + conf.setMaxTaskParallelism(numWorker); + + backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster(); + cluster.submitTopology(topologyName, conf, stormTopo.getStormBuilder().createTopology()); + + backtype.storm.utils.Utils.sleep(600 * 1000); + + cluster.killTopology(topologyName); + cluster.shutdown(); + + } else { + // cluster mode + conf.setNumWorkers(numWorker); + try { + backtype.storm.StormSubmitter.submitTopology(topologyName, conf, + stormTopo.getStormBuilder().createTopology()); + } catch (backtype.storm.generated.AlreadyAliveException ale) { + ale.printStackTrace(); + } catch (backtype.storm.generated.InvalidTopologyException ite) { + ite.printStackTrace(); + } + } + } + + private static boolean isLocal(List<String> tmpArgs) { + ExecutionMode executionMode = ExecutionMode.UNDETERMINED; + + int position = tmpArgs.size() - 1; + String flag = tmpArgs.get(position); + boolean isLocal = true; + + if (flag.equals(clusterFlag)) { + executionMode = ExecutionMode.CLUSTER; + isLocal = false; + } else if (flag.equals(localFlag)) { + executionMode = ExecutionMode.LOCAL; + isLocal = true; + } + + if (executionMode != ExecutionMode.UNDETERMINED) { + tmpArgs.remove(position); + } + + return isLocal; + } + + private enum ExecutionMode { + LOCAL, CLUSTER, UNDETERMINED + }; } -
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java index d4d80bf..832ee34 100644 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java @@ -41,168 +41,172 @@ import com.yahoo.labs.samoa.topology.Stream; * EntranceProcessingItem implementation for Storm. */ class StormEntranceProcessingItem extends AbstractEntranceProcessingItem implements StormTopologyNode { - private final StormEntranceSpout piSpout; - - StormEntranceProcessingItem(EntranceProcessor processor) { - this(processor, UUID.randomUUID().toString()); + private final StormEntranceSpout piSpout; + + StormEntranceProcessingItem(EntranceProcessor processor) { + this(processor, UUID.randomUUID().toString()); + } + + StormEntranceProcessingItem(EntranceProcessor processor, String friendlyId) { + super(processor); + this.setName(friendlyId); + this.piSpout = new StormEntranceSpout(processor); + } + + @Override + public EntranceProcessingItem setOutputStream(Stream stream) { + // piSpout.streams.add(stream); + piSpout.setOutputStream((StormStream) stream); + return this; + } + + @Override + public Stream getOutputStream() { + return piSpout.getOutputStream(); + } + + @Override + public void addToTopology(StormTopology topology, int parallelismHint) { + topology.getStormBuilder().setSpout(this.getName(), piSpout, parallelismHint); + } + + @Override + public StormStream createStream() { + return piSpout.createStream(this.getName()); + } + + @Override + public String getId() { + return this.getName(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(super.toString()); + sb.insert(0, String.format("id: %s, ", this.getName())); + return sb.toString(); + } + + /** + * Resulting Spout of StormEntranceProcessingItem + */ + final static class StormEntranceSpout extends BaseRichSpout { + + private static final long serialVersionUID = -9066409791668954099L; + + // private final Set<StormSpoutStream> streams; + private final EntranceProcessor entranceProcessor; + private StormStream outputStream; + + // private transient SpoutStarter spoutStarter; + // private transient Executor spoutExecutors; + // private transient LinkedBlockingQueue<StormTupleInfo> tupleInfoQueue; + + private SpoutOutputCollector collector; + + StormEntranceSpout(EntranceProcessor processor) { + // this.streams = new HashSet<StormSpoutStream>(); + this.entranceProcessor = processor; } - StormEntranceProcessingItem(EntranceProcessor processor, String friendlyId) { - super(processor); - this.setName(friendlyId); - this.piSpout = new StormEntranceSpout(processor); + public StormStream getOutputStream() { + return outputStream; } - @Override - public EntranceProcessingItem setOutputStream(Stream stream) { - // piSpout.streams.add(stream); - piSpout.setOutputStream((StormStream) stream); - return this; - } - - @Override - public Stream getOutputStream() { - return piSpout.getOutputStream(); + public void setOutputStream(StormStream stream) { + this.outputStream = stream; } - + @Override - public void addToTopology(StormTopology topology, int parallelismHint) { - topology.getStormBuilder().setSpout(this.getName(), piSpout, parallelismHint); + public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; + // this.tupleInfoQueue = new LinkedBlockingQueue<StormTupleInfo>(); + + // Processor and this class share the same instance of stream + // for (StormSpoutStream stream : streams) { + // stream.setSpout(this); + // } + // outputStream.setSpout(this); + + this.entranceProcessor.onCreate(context.getThisTaskId()); + // this.spoutStarter = new SpoutStarter(this.starter); + + // this.spoutExecutors = Executors.newSingleThreadExecutor(); + // this.spoutExecutors.execute(spoutStarter); } @Override - public StormStream createStream() { - return piSpout.createStream(this.getName()); + public void nextTuple() { + if (entranceProcessor.hasNext()) { + Values value = newValues(entranceProcessor.nextEvent()); + collector.emit(outputStream.getOutputId(), value); + } else + Utils.sleep(1000); + // StormTupleInfo tupleInfo = tupleInfoQueue.poll(50, + // TimeUnit.MILLISECONDS); + // if (tupleInfo != null) { + // Values value = newValues(tupleInfo.getContentEvent()); + // collector.emit(tupleInfo.getStormStream().getOutputId(), value); + // } } @Override - public String getId() { - return this.getName(); + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // for (StormStream stream : streams) { + // declarer.declareStream(stream.getOutputId(), new + // Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, + // StormSamoaUtils.KEY_FIELD)); + // } + declarer.declareStream(outputStream.getOutputId(), new Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, + StormSamoaUtils.KEY_FIELD)); } - @Override - public String toString() { - StringBuilder sb = new StringBuilder(super.toString()); - sb.insert(0, String.format("id: %s, ", this.getName())); - return sb.toString(); + StormStream createStream(String piId) { + // StormSpoutStream stream = new StormSpoutStream(piId); + StormStream stream = new StormBoltStream(piId); + // streams.add(stream); + return stream; } - /** - * Resulting Spout of StormEntranceProcessingItem - */ - final static class StormEntranceSpout extends BaseRichSpout { - - private static final long serialVersionUID = -9066409791668954099L; - - // private final Set<StormSpoutStream> streams; - private final EntranceProcessor entranceProcessor; - private StormStream outputStream; - - // private transient SpoutStarter spoutStarter; - // private transient Executor spoutExecutors; - // private transient LinkedBlockingQueue<StormTupleInfo> tupleInfoQueue; - - private SpoutOutputCollector collector; - - StormEntranceSpout(EntranceProcessor processor) { - // this.streams = new HashSet<StormSpoutStream>(); - this.entranceProcessor = processor; - } - - public StormStream getOutputStream() { - return outputStream; - } - - public void setOutputStream(StormStream stream) { - this.outputStream = stream; - } - - @Override - public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) { - this.collector = collector; - // this.tupleInfoQueue = new LinkedBlockingQueue<StormTupleInfo>(); - - // Processor and this class share the same instance of stream - // for (StormSpoutStream stream : streams) { - // stream.setSpout(this); - // } - // outputStream.setSpout(this); - - this.entranceProcessor.onCreate(context.getThisTaskId()); - // this.spoutStarter = new SpoutStarter(this.starter); - - // this.spoutExecutors = Executors.newSingleThreadExecutor(); - // this.spoutExecutors.execute(spoutStarter); - } - - @Override - public void nextTuple() { - if (entranceProcessor.hasNext()) { - Values value = newValues(entranceProcessor.nextEvent()); - collector.emit(outputStream.getOutputId(), value); - } else - Utils.sleep(1000); - // StormTupleInfo tupleInfo = tupleInfoQueue.poll(50, TimeUnit.MILLISECONDS); - // if (tupleInfo != null) { - // Values value = newValues(tupleInfo.getContentEvent()); - // collector.emit(tupleInfo.getStormStream().getOutputId(), value); - // } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - // for (StormStream stream : streams) { - // declarer.declareStream(stream.getOutputId(), new Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, StormSamoaUtils.KEY_FIELD)); - // } - declarer.declareStream(outputStream.getOutputId(), new Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, StormSamoaUtils.KEY_FIELD)); - } - - StormStream createStream(String piId) { - // StormSpoutStream stream = new StormSpoutStream(piId); - StormStream stream = new StormBoltStream(piId); - // streams.add(stream); - return stream; - } - - // void put(StormSpoutStream stream, ContentEvent contentEvent) { - // tupleInfoQueue.add(new StormTupleInfo(stream, contentEvent)); - // } - - private Values newValues(ContentEvent contentEvent) { - return new Values(contentEvent, contentEvent.getKey()); - } - - // private final static class StormTupleInfo { - // - // private final StormStream stream; - // private final ContentEvent event; - // - // StormTupleInfo(StormStream stream, ContentEvent event) { - // this.stream = stream; - // this.event = event; - // } - // - // public StormStream getStormStream() { - // return this.stream; - // } - // - // public ContentEvent getContentEvent() { - // return this.event; - // } - // } - - // private final static class SpoutStarter implements Runnable { - // - // private final TopologyStarter topoStarter; - // - // SpoutStarter(TopologyStarter topoStarter) { - // this.topoStarter = topoStarter; - // } - // - // @Override - // public void run() { - // this.topoStarter.start(); - // } - // } + // void put(StormSpoutStream stream, ContentEvent contentEvent) { + // tupleInfoQueue.add(new StormTupleInfo(stream, contentEvent)); + // } + + private Values newValues(ContentEvent contentEvent) { + return new Values(contentEvent, contentEvent.getKey()); } + + // private final static class StormTupleInfo { + // + // private final StormStream stream; + // private final ContentEvent event; + // + // StormTupleInfo(StormStream stream, ContentEvent event) { + // this.stream = stream; + // this.event = event; + // } + // + // public StormStream getStormStream() { + // return this.stream; + // } + // + // public ContentEvent getContentEvent() { + // return this.event; + // } + // } + + // private final static class SpoutStarter implements Runnable { + // + // private final TopologyStarter topoStarter; + // + // SpoutStarter(TopologyStarter topoStarter) { + // this.topoStarter = topoStarter; + // } + // + // @Override + // public void run() { + // this.topoStarter.start(); + // } + // } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java index 5f86855..6594aa7 100644 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java @@ -34,42 +34,42 @@ import backtype.storm.utils.Utils; * Utility class to submit samoa-storm jar to a Storm cluster. * * @author Arinto Murdopo - * + * */ public class StormJarSubmitter { - - public final static String UPLOADED_JAR_LOCATION_KEY = "UploadedJarLocation"; - /** - * @param args - * @throws IOException - */ - public static void main(String[] args) throws IOException { - - Config config = new Config(); - config.putAll(Utils.readCommandLineOpts()); - config.putAll(Utils.readStormConfig()); + public final static String UPLOADED_JAR_LOCATION_KEY = "UploadedJarLocation"; + + /** + * @param args + * @throws IOException + */ + public static void main(String[] args) throws IOException { + + Config config = new Config(); + config.putAll(Utils.readCommandLineOpts()); + config.putAll(Utils.readStormConfig()); + + String nimbusHost = (String) config.get(Config.NIMBUS_HOST); + int nimbusThriftPort = Utils.getInt(config + .get(Config.NIMBUS_THRIFT_PORT)); + + System.out.println("Nimbus host " + nimbusHost); + System.out.println("Nimbus thrift port " + nimbusThriftPort); + + System.out.println("uploading jar from " + args[0]); + String uploadedJarLocation = StormSubmitter.submitJar(config, args[0]); + + System.out.println("Uploaded jar file location: "); + System.out.println(uploadedJarLocation); - String nimbusHost = (String) config.get(Config.NIMBUS_HOST); - int nimbusThriftPort = Utils.getInt(config - .get(Config.NIMBUS_THRIFT_PORT)); + Properties props = StormSamoaUtils.getProperties(); + props.setProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY, uploadedJarLocation); - System.out.println("Nimbus host " + nimbusHost); - System.out.println("Nimbus thrift port " + nimbusThriftPort); + File f = new File("src/main/resources/samoa-storm-cluster.properties"); + f.createNewFile(); - System.out.println("uploading jar from " + args[0]); - String uploadedJarLocation = StormSubmitter.submitJar(config, args[0]); - - System.out.println("Uploaded jar file location: "); - System.out.println(uploadedJarLocation); - - Properties props = StormSamoaUtils.getProperties(); - props.setProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY, uploadedJarLocation); - - File f = new File("src/main/resources/samoa-storm-cluster.properties"); - f.createNewFile(); - - OutputStream out = new FileOutputStream(f); - props.store(out, "properties file to store uploaded jar location from StormJarSubmitter"); - } + OutputStream out = new FileOutputStream(f); + props.store(out, "properties file to store uploaded jar location from StormJarSubmitter"); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java index 73879f6..1a9064c 100644 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java @@ -44,127 +44,126 @@ import backtype.storm.tuple.Tuple; /** * ProcessingItem implementation for Storm. + * * @author Arinto Murdopo - * + * */ class StormProcessingItem extends AbstractProcessingItem implements StormTopologyNode { - private final ProcessingItemBolt piBolt; - private BoltDeclarer piBoltDeclarer; - - //TODO: should we put parallelism hint here? - //imo, parallelism hint only declared when we add this PI in the topology - //open for dicussion :p - - StormProcessingItem(Processor processor, int parallelismHint){ - this(processor, UUID.randomUUID().toString(), parallelismHint); - } - - StormProcessingItem(Processor processor, String friendlyId, int parallelismHint){ - super(processor, parallelismHint); - this.piBolt = new ProcessingItemBolt(processor); - this.setName(friendlyId); - } - - @Override - protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { - StormStream stormInputStream = (StormStream) inputStream; - InputStreamId inputId = stormInputStream.getInputId(); - - switch(scheme) { - case SHUFFLE: - piBoltDeclarer.shuffleGrouping(inputId.getComponentId(),inputId.getStreamId()); - break; - case GROUP_BY_KEY: - piBoltDeclarer.fieldsGrouping( - inputId.getComponentId(), - inputId.getStreamId(), - new Fields(StormSamoaUtils.KEY_FIELD)); - break; - case BROADCAST: - piBoltDeclarer.allGrouping( - inputId.getComponentId(), - inputId.getStreamId()); - break; - } - return this; - } - - @Override - public void addToTopology(StormTopology topology, int parallelismHint) { - if(piBoltDeclarer != null){ - //throw exception that one PI only belong to one topology - }else{ - TopologyBuilder stormBuilder = topology.getStormBuilder(); - this.piBoltDeclarer = stormBuilder.setBolt(this.getName(), - this.piBolt, parallelismHint); - } - } - - @Override - public StormStream createStream() { - return piBolt.createStream(this.getName()); - } - - @Override - public String getId() { - return this.getName(); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(super.toString()); - sb.insert(0, String.format("id: %s, ", this.getName())); - return sb.toString(); - } - - private final static class ProcessingItemBolt extends BaseRichBolt{ - - private static final long serialVersionUID = -6637673741263199198L; - - private final Set<StormBoltStream> streams; - private final Processor processor; - - private OutputCollector collector; - - ProcessingItemBolt(Processor processor){ - this.streams = new HashSet<StormBoltStream>(); - this.processor = processor; - } - - @Override - public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, - OutputCollector collector) { - this.collector = collector; - //Processor and this class share the same instance of stream - for(StormBoltStream stream: streams){ - stream.setCollector(this.collector); - } - - this.processor.onCreate(context.getThisTaskId()); - } - - @Override - public void execute(Tuple input) { - Object sentObject = input.getValue(0); - ContentEvent sentEvent = (ContentEvent)sentObject; - processor.process(sentEvent); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - for(StormStream stream: streams){ - declarer.declareStream(stream.getOutputId(), - new Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, - StormSamoaUtils.KEY_FIELD)); - } - } - - StormStream createStream(String piId){ - StormBoltStream stream = new StormBoltStream(piId); - streams.add(stream); - return stream; - } - } -} + private final ProcessingItemBolt piBolt; + private BoltDeclarer piBoltDeclarer; + + // TODO: should we put parallelism hint here? + // imo, parallelism hint only declared when we add this PI in the topology + // open for dicussion :p + + StormProcessingItem(Processor processor, int parallelismHint) { + this(processor, UUID.randomUUID().toString(), parallelismHint); + } + + StormProcessingItem(Processor processor, String friendlyId, int parallelismHint) { + super(processor, parallelismHint); + this.piBolt = new ProcessingItemBolt(processor); + this.setName(friendlyId); + } + + @Override + protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { + StormStream stormInputStream = (StormStream) inputStream; + InputStreamId inputId = stormInputStream.getInputId(); + + switch (scheme) { + case SHUFFLE: + piBoltDeclarer.shuffleGrouping(inputId.getComponentId(), inputId.getStreamId()); + break; + case GROUP_BY_KEY: + piBoltDeclarer.fieldsGrouping( + inputId.getComponentId(), + inputId.getStreamId(), + new Fields(StormSamoaUtils.KEY_FIELD)); + break; + case BROADCAST: + piBoltDeclarer.allGrouping( + inputId.getComponentId(), + inputId.getStreamId()); + break; + } + return this; + } + + @Override + public void addToTopology(StormTopology topology, int parallelismHint) { + if (piBoltDeclarer != null) { + // throw exception that one PI only belong to one topology + } else { + TopologyBuilder stormBuilder = topology.getStormBuilder(); + this.piBoltDeclarer = stormBuilder.setBolt(this.getName(), + this.piBolt, parallelismHint); + } + } + + @Override + public StormStream createStream() { + return piBolt.createStream(this.getName()); + } + + @Override + public String getId() { + return this.getName(); + } + @Override + public String toString() { + StringBuilder sb = new StringBuilder(super.toString()); + sb.insert(0, String.format("id: %s, ", this.getName())); + return sb.toString(); + } + private final static class ProcessingItemBolt extends BaseRichBolt { + + private static final long serialVersionUID = -6637673741263199198L; + + private final Set<StormBoltStream> streams; + private final Processor processor; + + private OutputCollector collector; + + ProcessingItemBolt(Processor processor) { + this.streams = new HashSet<StormBoltStream>(); + this.processor = processor; + } + + @Override + public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, + OutputCollector collector) { + this.collector = collector; + // Processor and this class share the same instance of stream + for (StormBoltStream stream : streams) { + stream.setCollector(this.collector); + } + + this.processor.onCreate(context.getThisTaskId()); + } + + @Override + public void execute(Tuple input) { + Object sentObject = input.getValue(0); + ContentEvent sentEvent = (ContentEvent) sentObject; + processor.process(sentEvent); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + for (StormStream stream : streams) { + declarer.declareStream(stream.getOutputId(), + new Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, + StormSamoaUtils.KEY_FIELD)); + } + } + + StormStream createStream(String piId) { + StormBoltStream stream = new StormBoltStream(piId); + streams.add(stream); + return stream; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java index 7c4769e..d978a8f 100644 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java @@ -34,75 +34,82 @@ import org.slf4j.LoggerFactory; import com.yahoo.labs.samoa.tasks.Task; /** - * Utility class for samoa-storm project. It is used by StormDoTask to process its arguments. + * Utility class for samoa-storm project. It is used by StormDoTask to process + * its arguments. + * * @author Arinto Murdopo - * + * */ public class StormSamoaUtils { - - private static final Logger logger = LoggerFactory.getLogger(StormSamoaUtils.class); - - static final String KEY_FIELD = "key"; - static final String CONTENT_EVENT_FIELD = "content_event"; - - static Properties getProperties() throws IOException{ - Properties props = new Properties(); - InputStream is; - - File f = new File("src/main/resources/samoa-storm-cluster.properties"); // FIXME it does not exist anymore - is = new FileInputStream(f); - - try { - props.load(is); - } catch (IOException e1) { - System.out.println("Fail to load property file"); - return null; - } finally{ - is.close(); - } - - return props; - } - - public static StormTopology argsToTopology(String[] args){ - StringBuilder cliString = new StringBuilder(); - for (String arg : args) { - cliString.append(" ").append(arg); - } - logger.debug("Command line string = {}", cliString.toString()); - - Task task = getTask(cliString.toString()); - - //TODO: remove setFactory method with DynamicBinding - task.setFactory(new StormComponentFactory()); - task.init(); - - return (StormTopology)task.getTopology(); - } - - public static int numWorkers(List<String> tmpArgs){ - int position = tmpArgs.size() - 1; - int numWorkers; - - try { - numWorkers = Integer.parseInt(tmpArgs.get(position)); - tmpArgs.remove(position); - } catch (NumberFormatException e) { - numWorkers = 4; - } - - return numWorkers; - } - - public static Task getTask(String cliString) { - Task task = null; - try { - logger.debug("Providing task [{}]", cliString); - task = ClassOption.cliStringToObject(cliString, Task.class, null); - } catch (Exception e) { - logger.warn("Fail in initializing the task!"); - e.printStackTrace(); - } - return task; + + private static final Logger logger = LoggerFactory.getLogger(StormSamoaUtils.class); + + static final String KEY_FIELD = "key"; + static final String CONTENT_EVENT_FIELD = "content_event"; + + static Properties getProperties() throws IOException { + Properties props = new Properties(); + InputStream is; + + File f = new File("src/main/resources/samoa-storm-cluster.properties"); // FIXME + // it + // does + // not + // exist + // anymore + is = new FileInputStream(f); + + try { + props.load(is); + } catch (IOException e1) { + System.out.println("Fail to load property file"); + return null; + } finally { + is.close(); + } + + return props; + } + + public static StormTopology argsToTopology(String[] args) { + StringBuilder cliString = new StringBuilder(); + for (String arg : args) { + cliString.append(" ").append(arg); + } + logger.debug("Command line string = {}", cliString.toString()); + + Task task = getTask(cliString.toString()); + + // TODO: remove setFactory method with DynamicBinding + task.setFactory(new StormComponentFactory()); + task.init(); + + return (StormTopology) task.getTopology(); + } + + public static int numWorkers(List<String> tmpArgs) { + int position = tmpArgs.size() - 1; + int numWorkers; + + try { + numWorkers = Integer.parseInt(tmpArgs.get(position)); + tmpArgs.remove(position); + } catch (NumberFormatException e) { + numWorkers = 4; + } + + return numWorkers; + } + + public static Task getTask(String cliString) { + Task task = null; + try { + logger.debug("Providing task [{}]", cliString); + task = ClassOption.cliStringToObject(cliString, Task.class, null); + } catch (Exception e) { + logger.warn("Fail in initializing the task!"); + e.printStackTrace(); } + return task; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java index d066e42..06f5bb2 100644 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java @@ -62,4 +62,4 @@ // return null; // } // -//} +// } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java index f67ab19..ed39a50 100644 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java @@ -27,59 +27,60 @@ import com.yahoo.labs.samoa.topology.Stream; /** * Abstract class to implement Storm Stream + * * @author Arinto Murdopo - * + * */ abstract class StormStream implements Stream, java.io.Serializable { - - /** + + /** * */ - private static final long serialVersionUID = 281835563756514852L; - protected final String outputStreamId; - protected final InputStreamId inputStreamId; - - public StormStream(String stormComponentId){ - this.outputStreamId = UUID.randomUUID().toString(); - this.inputStreamId = new InputStreamId(stormComponentId, this.outputStreamId); - } - - @Override - public abstract void put(ContentEvent contentEvent); - - String getOutputId(){ - return this.outputStreamId; - } - - InputStreamId getInputId(){ - return this.inputStreamId; - } - - final static class InputStreamId implements java.io.Serializable{ - - /** + private static final long serialVersionUID = 281835563756514852L; + protected final String outputStreamId; + protected final InputStreamId inputStreamId; + + public StormStream(String stormComponentId) { + this.outputStreamId = UUID.randomUUID().toString(); + this.inputStreamId = new InputStreamId(stormComponentId, this.outputStreamId); + } + + @Override + public abstract void put(ContentEvent contentEvent); + + String getOutputId() { + return this.outputStreamId; + } + + InputStreamId getInputId() { + return this.inputStreamId; + } + + final static class InputStreamId implements java.io.Serializable { + + /** * */ - private static final long serialVersionUID = -7457995634133691295L; - private final String componentId; - private final String streamId; - - InputStreamId(String componentId, String streamId){ - this.componentId = componentId; - this.streamId = streamId; - } - - String getComponentId(){ - return componentId; - } - - String getStreamId(){ - return streamId; - } - } - - @Override - public void setBatchSize(int batchSize) { - // Ignore batch size - } + private static final long serialVersionUID = -7457995634133691295L; + private final String componentId; + private final String streamId; + + InputStreamId(String componentId, String streamId) { + this.componentId = componentId; + this.streamId = streamId; + } + + String getComponentId() { + return componentId; + } + + String getStreamId() { + return streamId; + } + } + + @Override + public void setBatchSize(int batchSize) { + // Ignore batch size + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java index 7a49d8b..20995d5 100644 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java @@ -27,26 +27,27 @@ import com.yahoo.labs.samoa.topology.AbstractTopology; /** * Adaptation of SAMOA topology in samoa-storm + * * @author Arinto Murdopo - * + * */ public class StormTopology extends AbstractTopology { - - private TopologyBuilder builder; - - public StormTopology(String topologyName){ - super(topologyName); - this.builder = new TopologyBuilder(); - } - - @Override - public void addProcessingItem(IProcessingItem procItem, int parallelismHint){ - StormTopologyNode stormNode = (StormTopologyNode) procItem; - stormNode.addToTopology(this, parallelismHint); - super.addProcessingItem(procItem, parallelismHint); - } - - public TopologyBuilder getStormBuilder(){ - return builder; - } + + private TopologyBuilder builder; + + public StormTopology(String topologyName) { + super(topologyName); + this.builder = new TopologyBuilder(); + } + + @Override + public void addProcessingItem(IProcessingItem procItem, int parallelismHint) { + StormTopologyNode stormNode = (StormTopologyNode) procItem; + stormNode.addToTopology(this, parallelismHint); + super.addProcessingItem(procItem, parallelismHint); + } + + public TopologyBuilder getStormBuilder() { + return builder; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java index 07fccbf..8be3a1b 100644 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java @@ -22,13 +22,16 @@ package com.yahoo.labs.samoa.topology.impl; /** * Interface to represent a node in samoa-storm topology. + * * @author Arinto Murdopo - * + * */ interface StormTopologyNode { - void addToTopology(StormTopology topology, int parallelismHint); - StormStream createStream(); - String getId(); - + void addToTopology(StormTopology topology, int parallelismHint); + + StormStream createStream(); + + String getId(); + } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java index 1e1b048..a4f1f51 100644 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java @@ -41,93 +41,95 @@ import backtype.storm.utils.NimbusClient; import backtype.storm.utils.Utils; /** - * Helper class to submit SAMOA task into Storm without the need of submitting the jar file. - * The jar file must be submitted first using StormJarSubmitter class. + * Helper class to submit SAMOA task into Storm without the need of submitting + * the jar file. The jar file must be submitted first using StormJarSubmitter + * class. + * * @author Arinto Murdopo - * + * */ public class StormTopologySubmitter { - - public static String YJP_OPTIONS_KEY="YjpOptions"; - - private static Logger logger = LoggerFactory.getLogger(StormTopologySubmitter.class); - - public static void main(String[] args) throws IOException{ - Properties props = StormSamoaUtils.getProperties(); - - String uploadedJarLocation = props.getProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY); - if(uploadedJarLocation == null){ - logger.error("Invalid properties file. It must have key {}", - StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY); - return; - } - - List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); - int numWorkers = StormSamoaUtils.numWorkers(tmpArgs); - - args = tmpArgs.toArray(new String[0]); - StormTopology stormTopo = StormSamoaUtils.argsToTopology(args); - - Config conf = new Config(); - conf.putAll(Utils.readStormConfig()); - conf.putAll(Utils.readCommandLineOpts()); - conf.setDebug(false); - conf.setNumWorkers(numWorkers); - - String profilerOption = - props.getProperty(StormTopologySubmitter.YJP_OPTIONS_KEY); - if(profilerOption != null){ - String topoWorkerChildOpts = (String) conf.get(Config.TOPOLOGY_WORKER_CHILDOPTS); - StringBuilder optionBuilder = new StringBuilder(); - if(topoWorkerChildOpts != null){ - optionBuilder.append(topoWorkerChildOpts); - optionBuilder.append(' '); - } - optionBuilder.append(profilerOption); - conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, optionBuilder.toString()); - } - - Map<String, Object> myConfigMap = new HashMap<String, Object>(conf); - StringWriter out = new StringWriter(); - - try { - JSONValue.writeJSONString(myConfigMap, out); - } catch (IOException e) { - System.out.println("Error in writing JSONString"); - e.printStackTrace(); - return; - } - - Config config = new Config(); - config.putAll(Utils.readStormConfig()); - - String nimbusHost = (String) config.get(Config.NIMBUS_HOST); - - NimbusClient nc = new NimbusClient(nimbusHost); - String topologyName = stormTopo.getTopologyName(); - try { - System.out.println("Submitting topology with name: " - + topologyName); - nc.getClient().submitTopology(topologyName, uploadedJarLocation, - out.toString(), stormTopo.getStormBuilder().createTopology()); - System.out.println(topologyName + " is successfully submitted"); - - } catch (AlreadyAliveException aae) { - System.out.println("Fail to submit " + topologyName - + "\nError message: " + aae.get_msg()); - } catch (InvalidTopologyException ite) { - System.out.println("Invalid topology for " + topologyName); - ite.printStackTrace(); - } catch (TException te) { - System.out.println("Texception for " + topologyName); - te.printStackTrace(); - } - } - - private static String uploadedJarLocation(List<String> tmpArgs){ - int position = tmpArgs.size() - 1; - String uploadedJarLocation = tmpArgs.get(position); - tmpArgs.remove(position); - return uploadedJarLocation; - } + + public static String YJP_OPTIONS_KEY = "YjpOptions"; + + private static Logger logger = LoggerFactory.getLogger(StormTopologySubmitter.class); + + public static void main(String[] args) throws IOException { + Properties props = StormSamoaUtils.getProperties(); + + String uploadedJarLocation = props.getProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY); + if (uploadedJarLocation == null) { + logger.error("Invalid properties file. It must have key {}", + StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY); + return; + } + + List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + int numWorkers = StormSamoaUtils.numWorkers(tmpArgs); + + args = tmpArgs.toArray(new String[0]); + StormTopology stormTopo = StormSamoaUtils.argsToTopology(args); + + Config conf = new Config(); + conf.putAll(Utils.readStormConfig()); + conf.putAll(Utils.readCommandLineOpts()); + conf.setDebug(false); + conf.setNumWorkers(numWorkers); + + String profilerOption = + props.getProperty(StormTopologySubmitter.YJP_OPTIONS_KEY); + if (profilerOption != null) { + String topoWorkerChildOpts = (String) conf.get(Config.TOPOLOGY_WORKER_CHILDOPTS); + StringBuilder optionBuilder = new StringBuilder(); + if (topoWorkerChildOpts != null) { + optionBuilder.append(topoWorkerChildOpts); + optionBuilder.append(' '); + } + optionBuilder.append(profilerOption); + conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, optionBuilder.toString()); + } + + Map<String, Object> myConfigMap = new HashMap<String, Object>(conf); + StringWriter out = new StringWriter(); + + try { + JSONValue.writeJSONString(myConfigMap, out); + } catch (IOException e) { + System.out.println("Error in writing JSONString"); + e.printStackTrace(); + return; + } + + Config config = new Config(); + config.putAll(Utils.readStormConfig()); + + String nimbusHost = (String) config.get(Config.NIMBUS_HOST); + + NimbusClient nc = new NimbusClient(nimbusHost); + String topologyName = stormTopo.getTopologyName(); + try { + System.out.println("Submitting topology with name: " + + topologyName); + nc.getClient().submitTopology(topologyName, uploadedJarLocation, + out.toString(), stormTopo.getStormBuilder().createTopology()); + System.out.println(topologyName + " is successfully submitted"); + + } catch (AlreadyAliveException aae) { + System.out.println("Fail to submit " + topologyName + + "\nError message: " + aae.get_msg()); + } catch (InvalidTopologyException ite) { + System.out.println("Invalid topology for " + topologyName); + ite.printStackTrace(); + } catch (TException te) { + System.out.println("Texception for " + topologyName); + te.printStackTrace(); + } + } + + private static String uploadedJarLocation(List<String> tmpArgs) { + int position = tmpArgs.size() - 1; + String uploadedJarLocation = tmpArgs.get(position); + tmpArgs.remove(position); + return uploadedJarLocation; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java b/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java index 15b80b5..9f6089c 100644 --- a/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java +++ b/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java @@ -24,45 +24,43 @@ import org.junit.Test; public class AlgosTest { + @Test(timeout = 60000) + public void testVHTWithStorm() throws Exception { - @Test(timeout = 60000) - public void testVHTWithStorm() throws Exception { + TestParams vhtConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(200_000) + .classifiedInstances(200_000) + .classificationsCorrect(55f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE) + .resultFilePollTimeout(30) + .prePollWait(15) + .taskClassName(LocalStormDoTask.class.getName()) + .build(); + TestUtils.test(vhtConfig); - TestParams vhtConfig = new TestParams.Builder() - .inputInstances(200_000) - .samplingSize(20_000) - .evaluationInstances(200_000) - .classifiedInstances(200_000) - .classificationsCorrect(55f) - .kappaStat(0f) - .kappaTempStat(0f) - .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE) - .resultFilePollTimeout(30) - .prePollWait(15) - .taskClassName(LocalStormDoTask.class.getName()) - .build(); - TestUtils.test(vhtConfig); + } - } - - @Test(timeout = 120000) - public void testBaggingWithStorm() throws Exception { - TestParams baggingConfig = new TestParams.Builder() - .inputInstances(200_000) - .samplingSize(20_000) - .evaluationInstances(180_000) - .classifiedInstances(190_000) - .classificationsCorrect(60f) - .kappaStat(0f) - .kappaTempStat(0f) - .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE) - .resultFilePollTimeout(40) - .prePollWait(20) - .taskClassName(LocalStormDoTask.class.getName()) - .build(); - TestUtils.test(baggingConfig); - - } + @Test(timeout = 120000) + public void testBaggingWithStorm() throws Exception { + TestParams baggingConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(180_000) + .classifiedInstances(190_000) + .classificationsCorrect(60f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE) + .resultFilePollTimeout(40) + .prePollWait(20) + .taskClassName(LocalStormDoTask.class.getName()) + .build(); + TestUtils.test(baggingConfig); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java b/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java index ec8929a..d233ca6 100644 --- a/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java +++ b/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java @@ -38,41 +38,46 @@ import backtype.storm.topology.TopologyBuilder; import com.yahoo.labs.samoa.core.Processor; public class StormProcessingItemTest { - private static final int PARRALLELISM_HINT_2 = 2; - private static final int PARRALLELISM_HINT_4 = 4; - private static final String ID = "id"; - @Tested private StormProcessingItem pi; - @Mocked private Processor processor; - @Mocked private StormTopology topology; - @Mocked private TopologyBuilder stormBuilder = new TopologyBuilder(); + private static final int PARRALLELISM_HINT_2 = 2; + private static final int PARRALLELISM_HINT_4 = 4; + private static final String ID = "id"; + @Tested + private StormProcessingItem pi; + @Mocked + private Processor processor; + @Mocked + private StormTopology topology; + @Mocked + private TopologyBuilder stormBuilder = new TopologyBuilder(); - @Before - public void setUp() { - pi = new StormProcessingItem(processor, ID, PARRALLELISM_HINT_2); - } + @Before + public void setUp() { + pi = new StormProcessingItem(processor, ID, PARRALLELISM_HINT_2); + } - @Test - public void testAddToTopology() { - new Expectations() { - { - topology.getStormBuilder(); - result = stormBuilder; + @Test + public void testAddToTopology() { + new Expectations() { + { + topology.getStormBuilder(); + result = stormBuilder; - stormBuilder.setBolt(ID, (IRichBolt) any, anyInt); - result = new MockUp<BoltDeclarer>() { - }.getMockInstance(); - } - }; + stormBuilder.setBolt(ID, (IRichBolt) any, anyInt); + result = new MockUp<BoltDeclarer>() { + }.getMockInstance(); + } + }; - pi.addToTopology(topology, PARRALLELISM_HINT_4); // this parallelism hint is ignored + pi.addToTopology(topology, PARRALLELISM_HINT_4); // this parallelism hint is + // ignored - new Verifications() { - { - assertEquals(pi.getProcessor(), processor); - // TODO add methods to explore a topology and verify them - assertEquals(pi.getParallelism(), PARRALLELISM_HINT_2); - assertEquals(pi.getId(), ID); - } - }; - } + new Verifications() { + { + assertEquals(pi.getProcessor(), processor); + // TODO add methods to explore a topology and verify them + assertEquals(pi.getParallelism(), PARRALLELISM_HINT_2); + assertEquals(pi.getId(), ID); + } + }; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java ---------------------------------------------------------------------- diff --git a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java index 08ad94f..26b6b74 100644 --- a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java +++ b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java @@ -2,234 +2,234 @@ package com.yahoo.labs.samoa; public class TestParams { - /** - * templates that take the following parameters: - * <ul> - * <li>the output file location as an argument (-d), - * <li>the maximum number of instances for testing/training (-i) - * <li>the sampling size (-f) - * <li>the delay in ms between input instances (-w) , default is zero - * </ul> - * as well as the maximum number of instances for testing/training (-i) and the sampling size (-f) - */ - public static class Templates { - - public final static String PREQEVAL_VHT_RANDOMTREE = "PrequentialEvaluation -d %s -i %d -f %d -w %d " - + "-l (com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree -p 4) " + - "-s (com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator -c 2 -o 10 -u 10)"; - - public final static String PREQEVAL_NAIVEBAYES_HYPERPLANE = "PrequentialEvaluation -d %s -i %d -f %d -w %d " - + "-l (classifiers.SingleClassifier -l com.yahoo.labs.samoa.learners.classifiers.NaiveBayes) " + - "-s (com.yahoo.labs.samoa.moa.streams.generators.HyperplaneGenerator -c 2)"; - - // setting the number of nominal attributes to zero significantly reduces the processing time, - // so that it's acceptable in a test case - public final static String PREQEVAL_BAGGING_RANDOMTREE = "PrequentialEvaluation -d %s -i %d -f %d -w %d " - + "-l (com.yahoo.labs.samoa.learners.classifiers.ensemble.Bagging) " + - "-s (com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator -c 2 -o 0 -u 10)"; - - } - - - public static final String EVALUATION_INSTANCES = "evaluation instances"; - public static final String CLASSIFIED_INSTANCES = "classified instances"; - public static final String CLASSIFICATIONS_CORRECT = "classifications correct (percent)"; - public static final String KAPPA_STAT = "Kappa Statistic (percent)"; - public static final String KAPPA_TEMP_STAT = "Kappa Temporal Statistic (percent)"; - - + /** + * templates that take the following parameters: + * <ul> + * <li>the output file location as an argument (-d), + * <li>the maximum number of instances for testing/training (-i) + * <li>the sampling size (-f) + * <li>the delay in ms between input instances (-w) , default is zero + * </ul> + * as well as the maximum number of instances for testing/training (-i) and + * the sampling size (-f) + */ + public static class Templates { + + public final static String PREQEVAL_VHT_RANDOMTREE = "PrequentialEvaluation -d %s -i %d -f %d -w %d " + + "-l (com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree -p 4) " + + "-s (com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator -c 2 -o 10 -u 10)"; + + public final static String PREQEVAL_NAIVEBAYES_HYPERPLANE = "PrequentialEvaluation -d %s -i %d -f %d -w %d " + + "-l (classifiers.SingleClassifier -l com.yahoo.labs.samoa.learners.classifiers.NaiveBayes) " + + "-s (com.yahoo.labs.samoa.moa.streams.generators.HyperplaneGenerator -c 2)"; + + // setting the number of nominal attributes to zero significantly reduces + // the processing time, + // so that it's acceptable in a test case + public final static String PREQEVAL_BAGGING_RANDOMTREE = "PrequentialEvaluation -d %s -i %d -f %d -w %d " + + "-l (com.yahoo.labs.samoa.learners.classifiers.ensemble.Bagging) " + + "-s (com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator -c 2 -o 0 -u 10)"; + + } + + public static final String EVALUATION_INSTANCES = "evaluation instances"; + public static final String CLASSIFIED_INSTANCES = "classified instances"; + public static final String CLASSIFICATIONS_CORRECT = "classifications correct (percent)"; + public static final String KAPPA_STAT = "Kappa Statistic (percent)"; + public static final String KAPPA_TEMP_STAT = "Kappa Temporal Statistic (percent)"; + + private long inputInstances; + private long samplingSize; + private long evaluationInstances; + private long classifiedInstances; + private float classificationsCorrect; + private float kappaStat; + private float kappaTempStat; + private String cliStringTemplate; + private int pollTimeoutSeconds; + private final int prePollWait; + private int inputDelayMicroSec; + private String taskClassName; + + private TestParams(String taskClassName, + long inputInstances, + long samplingSize, + long evaluationInstances, + long classifiedInstances, + float classificationsCorrect, + float kappaStat, + float kappaTempStat, + String cliStringTemplate, + int pollTimeoutSeconds, + int prePollWait, + int inputDelayMicroSec) { + this.taskClassName = taskClassName; + this.inputInstances = inputInstances; + this.samplingSize = samplingSize; + this.evaluationInstances = evaluationInstances; + this.classifiedInstances = classifiedInstances; + this.classificationsCorrect = classificationsCorrect; + this.kappaStat = kappaStat; + this.kappaTempStat = kappaTempStat; + this.cliStringTemplate = cliStringTemplate; + this.pollTimeoutSeconds = pollTimeoutSeconds; + this.prePollWait = prePollWait; + this.inputDelayMicroSec = inputDelayMicroSec; + } + + public String getTaskClassName() { + return taskClassName; + } + + public long getInputInstances() { + return inputInstances; + } + + public long getSamplingSize() { + return samplingSize; + } + + public int getPollTimeoutSeconds() { + return pollTimeoutSeconds; + } + + public int getPrePollWaitSeconds() { + return prePollWait; + } + + public String getCliStringTemplate() { + return cliStringTemplate; + } + + public long getEvaluationInstances() { + return evaluationInstances; + } + + public long getClassifiedInstances() { + return classifiedInstances; + } + + public float getClassificationsCorrect() { + return classificationsCorrect; + } + + public float getKappaStat() { + return kappaStat; + } + + public float getKappaTempStat() { + return kappaTempStat; + } + + public int getInputDelayMicroSec() { + return inputDelayMicroSec; + } + + @Override + public String toString() { + return "TestParams{\n" + + "inputInstances=" + inputInstances + "\n" + + "samplingSize=" + samplingSize + "\n" + + "evaluationInstances=" + evaluationInstances + "\n" + + "classifiedInstances=" + classifiedInstances + "\n" + + "classificationsCorrect=" + classificationsCorrect + "\n" + + "kappaStat=" + kappaStat + "\n" + + "kappaTempStat=" + kappaTempStat + "\n" + + "cliStringTemplate='" + cliStringTemplate + '\'' + "\n" + + "pollTimeoutSeconds=" + pollTimeoutSeconds + "\n" + + "prePollWait=" + prePollWait + "\n" + + "taskClassName='" + taskClassName + '\'' + "\n" + + "inputDelayMicroSec=" + inputDelayMicroSec + "\n" + + '}'; + } + + public static class Builder { private long inputInstances; private long samplingSize; private long evaluationInstances; private long classifiedInstances; private float classificationsCorrect; - private float kappaStat; - private float kappaTempStat; + private float kappaStat = 0f; + private float kappaTempStat = 0f; private String cliStringTemplate; - private int pollTimeoutSeconds; - private final int prePollWait; - private int inputDelayMicroSec; + private int pollTimeoutSeconds = 10; + private int prePollWaitSeconds = 10; private String taskClassName; + private int inputDelayMicroSec = 0; - private TestParams(String taskClassName, - long inputInstances, - long samplingSize, - long evaluationInstances, - long classifiedInstances, - float classificationsCorrect, - float kappaStat, - float kappaTempStat, - String cliStringTemplate, - int pollTimeoutSeconds, - int prePollWait, - int inputDelayMicroSec) { - this.taskClassName = taskClassName; - this.inputInstances = inputInstances; - this.samplingSize = samplingSize; - this.evaluationInstances = evaluationInstances; - this.classifiedInstances = classifiedInstances; - this.classificationsCorrect = classificationsCorrect; - this.kappaStat = kappaStat; - this.kappaTempStat = kappaTempStat; - this.cliStringTemplate = cliStringTemplate; - this.pollTimeoutSeconds = pollTimeoutSeconds; - this.prePollWait = prePollWait; - this.inputDelayMicroSec = inputDelayMicroSec; - } - - public String getTaskClassName() { - return taskClassName; - } - - public long getInputInstances() { - return inputInstances; + public Builder taskClassName(String taskClassName) { + this.taskClassName = taskClassName; + return this; } - public long getSamplingSize() { - return samplingSize; + public Builder inputInstances(long inputInstances) { + this.inputInstances = inputInstances; + return this; } - public int getPollTimeoutSeconds() { - return pollTimeoutSeconds; + public Builder samplingSize(long samplingSize) { + this.samplingSize = samplingSize; + return this; } - public int getPrePollWaitSeconds() { - return prePollWait; + public Builder evaluationInstances(long evaluationInstances) { + this.evaluationInstances = evaluationInstances; + return this; } - public String getCliStringTemplate() { - return cliStringTemplate; + public Builder classifiedInstances(long classifiedInstances) { + this.classifiedInstances = classifiedInstances; + return this; } - public long getEvaluationInstances() { - return evaluationInstances; + public Builder classificationsCorrect(float classificationsCorrect) { + this.classificationsCorrect = classificationsCorrect; + return this; } - public long getClassifiedInstances() { - return classifiedInstances; + public Builder kappaStat(float kappaStat) { + this.kappaStat = kappaStat; + return this; } - public float getClassificationsCorrect() { - return classificationsCorrect; + public Builder kappaTempStat(float kappaTempStat) { + this.kappaTempStat = kappaTempStat; + return this; } - public float getKappaStat() { - return kappaStat; + public Builder cliStringTemplate(String cliStringTemplate) { + this.cliStringTemplate = cliStringTemplate; + return this; } - public float getKappaTempStat() { - return kappaTempStat; + public Builder resultFilePollTimeout(int pollTimeoutSeconds) { + this.pollTimeoutSeconds = pollTimeoutSeconds; + return this; } - public int getInputDelayMicroSec() { - return inputDelayMicroSec; + public Builder inputDelayMicroSec(int inputDelayMicroSec) { + this.inputDelayMicroSec = inputDelayMicroSec; + return this; } - @Override - public String toString() { - return "TestParams{\n" + - "inputInstances=" + inputInstances + "\n" + - "samplingSize=" + samplingSize + "\n" + - "evaluationInstances=" + evaluationInstances + "\n" + - "classifiedInstances=" + classifiedInstances + "\n" + - "classificationsCorrect=" + classificationsCorrect + "\n" + - "kappaStat=" + kappaStat + "\n" + - "kappaTempStat=" + kappaTempStat + "\n" + - "cliStringTemplate='" + cliStringTemplate + '\'' + "\n" + - "pollTimeoutSeconds=" + pollTimeoutSeconds + "\n" + - "prePollWait=" + prePollWait + "\n" + - "taskClassName='" + taskClassName + '\'' + "\n" + - "inputDelayMicroSec=" + inputDelayMicroSec + "\n" + - '}'; + public Builder prePollWait(int prePollWaitSeconds) { + this.prePollWaitSeconds = prePollWaitSeconds; + return this; } - public static class Builder { - private long inputInstances; - private long samplingSize; - private long evaluationInstances; - private long classifiedInstances; - private float classificationsCorrect; - private float kappaStat =0f; - private float kappaTempStat =0f; - private String cliStringTemplate; - private int pollTimeoutSeconds = 10; - private int prePollWaitSeconds = 10; - private String taskClassName; - private int inputDelayMicroSec = 0; - - public Builder taskClassName(String taskClassName) { - this.taskClassName = taskClassName; - return this; - } - - public Builder inputInstances(long inputInstances) { - this.inputInstances = inputInstances; - return this; - } - - public Builder samplingSize(long samplingSize) { - this.samplingSize = samplingSize; - return this; - } - - public Builder evaluationInstances(long evaluationInstances) { - this.evaluationInstances = evaluationInstances; - return this; - } - - public Builder classifiedInstances(long classifiedInstances) { - this.classifiedInstances = classifiedInstances; - return this; - } - - public Builder classificationsCorrect(float classificationsCorrect) { - this.classificationsCorrect = classificationsCorrect; - return this; - } - - public Builder kappaStat(float kappaStat) { - this.kappaStat = kappaStat; - return this; - } - - public Builder kappaTempStat(float kappaTempStat) { - this.kappaTempStat = kappaTempStat; - return this; - } - - public Builder cliStringTemplate(String cliStringTemplate) { - this.cliStringTemplate = cliStringTemplate; - return this; - } - - public Builder resultFilePollTimeout(int pollTimeoutSeconds) { - this.pollTimeoutSeconds = pollTimeoutSeconds; - return this; - } - - public Builder inputDelayMicroSec(int inputDelayMicroSec) { - this.inputDelayMicroSec = inputDelayMicroSec; - return this; - } - - public Builder prePollWait(int prePollWaitSeconds) { - this.prePollWaitSeconds = prePollWaitSeconds; - return this; - } - - public TestParams build() { - return new TestParams(taskClassName, - inputInstances, - samplingSize, - evaluationInstances, - classifiedInstances, - classificationsCorrect, - kappaStat, - kappaTempStat, - cliStringTemplate, - pollTimeoutSeconds, - prePollWaitSeconds, - inputDelayMicroSec); - } + public TestParams build() { + return new TestParams(taskClassName, + inputInstances, + samplingSize, + evaluationInstances, + classifiedInstances, + classificationsCorrect, + kappaStat, + kappaTempStat, + cliStringTemplate, + pollTimeoutSeconds, + prePollWaitSeconds, + inputDelayMicroSec); } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java ---------------------------------------------------------------------- diff --git a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java index d66f5df..c36706c 100644 --- a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java +++ b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java @@ -41,113 +41,112 @@ import static org.junit.Assert.assertTrue; public class TestUtils { - private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class.getName()); - - - public static void test(final TestParams testParams) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, InterruptedException { - - final File tempFile = File.createTempFile("test", "test"); - - LOG.info("Starting test, output file is {}, test config is \n{}", tempFile.getAbsolutePath(), testParams.toString()); - - Executors.newSingleThreadExecutor().submit(new Callable<Void>() { - - @Override - public Void call() throws Exception { - try { - Class.forName(testParams.getTaskClassName()) - .getMethod("main", String[].class) - .invoke(null, (Object) String.format( - testParams.getCliStringTemplate(), - tempFile.getAbsolutePath(), - testParams.getInputInstances(), - testParams.getSamplingSize(), - testParams.getInputDelayMicroSec() - ).split("[ ]")); - } catch (Exception e) { - LOG.error("Cannot execute test {} {}", e.getMessage(), e.getCause().getMessage()); - } - return null; - } - }); - - Thread.sleep(TimeUnit.SECONDS.toMillis(testParams.getPrePollWaitSeconds())); - - CountDownLatch signalComplete = new CountDownLatch(1); - - final Tailer tailer = Tailer.create(tempFile, new TestResultsTailerAdapter(signalComplete), 1000); - new Thread(new Runnable() { - @Override - public void run() { - tailer.run(); - } - }).start(); - - signalComplete.await(); - tailer.stop(); - - assertResults(tempFile, testParams); - } - - public static void assertResults(File outputFile, com.yahoo.labs.samoa.TestParams testParams) throws IOException { - - LOG.info("Checking results file " + outputFile.getAbsolutePath()); - // 1. parse result file with csv parser - Reader in = new FileReader(outputFile); - Iterable<CSVRecord> records = CSVFormat.EXCEL.withSkipHeaderRecord(false) - .withIgnoreEmptyLines(true).withDelimiter(',').withCommentMarker('#').parse(in); - CSVRecord last = null; - Iterator<CSVRecord> iterator = records.iterator(); - CSVRecord header = iterator.next(); - Assert.assertEquals("Invalid number of columns", 5, header.size()); - - Assert.assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.EVALUATION_INSTANCES, header.get(0).trim()); - Assert.assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.CLASSIFIED_INSTANCES, header.get(1).trim()); - Assert.assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.CLASSIFICATIONS_CORRECT, header.get(2).trim()); - Assert.assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.KAPPA_STAT, header.get(3).trim()); - Assert.assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.KAPPA_TEMP_STAT, header.get(4).trim()); - - // 2. check last line result - while (iterator.hasNext()) { - last = iterator.next(); + private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class.getName()); + + public static void test(final TestParams testParams) throws IOException, ClassNotFoundException, + NoSuchMethodException, InvocationTargetException, IllegalAccessException, InterruptedException { + + final File tempFile = File.createTempFile("test", "test"); + + LOG.info("Starting test, output file is {}, test config is \n{}", tempFile.getAbsolutePath(), testParams.toString()); + + Executors.newSingleThreadExecutor().submit(new Callable<Void>() { + + @Override + public Void call() throws Exception { + try { + Class.forName(testParams.getTaskClassName()) + .getMethod("main", String[].class) + .invoke(null, (Object) String.format( + testParams.getCliStringTemplate(), + tempFile.getAbsolutePath(), + testParams.getInputInstances(), + testParams.getSamplingSize(), + testParams.getInputDelayMicroSec() + ).split("[ ]")); + } catch (Exception e) { + LOG.error("Cannot execute test {} {}", e.getMessage(), e.getCause().getMessage()); } - - assertTrue(String.format("Unmet threshold expected %d got %f", - testParams.getEvaluationInstances(), Float.parseFloat(last.get(0))), - testParams.getEvaluationInstances() <= Float.parseFloat(last.get(0))); - assertTrue(String.format("Unmet threshold expected %d got %f", testParams.getClassifiedInstances(), - Float.parseFloat(last.get(1))), - testParams.getClassifiedInstances() <= Float.parseFloat(last.get(1))); - assertTrue(String.format("Unmet threshold expected %f got %f", - testParams.getClassificationsCorrect(), Float.parseFloat(last.get(2))), - testParams.getClassificationsCorrect() <= Float.parseFloat(last.get(2))); - assertTrue(String.format("Unmet threshold expected %f got %f", - testParams.getKappaStat(), Float.parseFloat(last.get(3))), - testParams.getKappaStat() <= Float.parseFloat(last.get(3))); - assertTrue(String.format("Unmet threshold expected %f got %f", - testParams.getKappaTempStat(), Float.parseFloat(last.get(4))), - testParams.getKappaTempStat() <= Float.parseFloat(last.get(4))); - + return null; + } + }); + + Thread.sleep(TimeUnit.SECONDS.toMillis(testParams.getPrePollWaitSeconds())); + + CountDownLatch signalComplete = new CountDownLatch(1); + + final Tailer tailer = Tailer.create(tempFile, new TestResultsTailerAdapter(signalComplete), 1000); + new Thread(new Runnable() { + @Override + public void run() { + tailer.run(); + } + }).start(); + + signalComplete.await(); + tailer.stop(); + + assertResults(tempFile, testParams); + } + + public static void assertResults(File outputFile, com.yahoo.labs.samoa.TestParams testParams) throws IOException { + + LOG.info("Checking results file " + outputFile.getAbsolutePath()); + // 1. parse result file with csv parser + Reader in = new FileReader(outputFile); + Iterable<CSVRecord> records = CSVFormat.EXCEL.withSkipHeaderRecord(false) + .withIgnoreEmptyLines(true).withDelimiter(',').withCommentMarker('#').parse(in); + CSVRecord last = null; + Iterator<CSVRecord> iterator = records.iterator(); + CSVRecord header = iterator.next(); + Assert.assertEquals("Invalid number of columns", 5, header.size()); + + Assert + .assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.EVALUATION_INSTANCES, header.get(0).trim()); + Assert + .assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.CLASSIFIED_INSTANCES, header.get(1).trim()); + Assert.assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.CLASSIFICATIONS_CORRECT, header.get(2) + .trim()); + Assert.assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.KAPPA_STAT, header.get(3).trim()); + Assert.assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.KAPPA_TEMP_STAT, header.get(4).trim()); + + // 2. check last line result + while (iterator.hasNext()) { + last = iterator.next(); } - - private static class TestResultsTailerAdapter extends TailerListenerAdapter { - - private final CountDownLatch signalComplete; - - public TestResultsTailerAdapter(CountDownLatch signalComplete) { - this.signalComplete = signalComplete; - } - - @Override - public void handle(String line) { - if ("# COMPLETED".equals(line.trim())) { - signalComplete.countDown(); - } - } + assertTrue(String.format("Unmet threshold expected %d got %f", + testParams.getEvaluationInstances(), Float.parseFloat(last.get(0))), + testParams.getEvaluationInstances() <= Float.parseFloat(last.get(0))); + assertTrue(String.format("Unmet threshold expected %d got %f", testParams.getClassifiedInstances(), + Float.parseFloat(last.get(1))), + testParams.getClassifiedInstances() <= Float.parseFloat(last.get(1))); + assertTrue(String.format("Unmet threshold expected %f got %f", + testParams.getClassificationsCorrect(), Float.parseFloat(last.get(2))), + testParams.getClassificationsCorrect() <= Float.parseFloat(last.get(2))); + assertTrue(String.format("Unmet threshold expected %f got %f", + testParams.getKappaStat(), Float.parseFloat(last.get(3))), + testParams.getKappaStat() <= Float.parseFloat(last.get(3))); + assertTrue(String.format("Unmet threshold expected %f got %f", + testParams.getKappaTempStat(), Float.parseFloat(last.get(4))), + testParams.getKappaTempStat() <= Float.parseFloat(last.get(4))); + + } + + private static class TestResultsTailerAdapter extends TailerListenerAdapter { + + private final CountDownLatch signalComplete; + + public TestResultsTailerAdapter(CountDownLatch signalComplete) { + this.signalComplete = signalComplete; } - - + @Override + public void handle(String line) { + if ("# COMPLETED".equals(line.trim())) { + signalComplete.countDown(); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java index 21ccf9e..2ac9ec1 100644 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java @@ -13,58 +13,58 @@ import com.yahoo.labs.samoa.topology.impl.ThreadsEngine; /** * @author Anh Thu Vu - * + * */ public class LocalThreadsDoTask { - private static final Logger logger = LoggerFactory.getLogger(LocalThreadsDoTask.class); + private static final Logger logger = LoggerFactory.getLogger(LocalThreadsDoTask.class); - /** - * The main method. - * - * @param args - * the arguments - */ - public static void main(String[] args) { + /** + * The main method. + * + * @param args + * the arguments + */ + public static void main(String[] args) { - ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); - - // Get number of threads for multithreading mode - int numThreads = 1; - for (int i=0; i<tmpArgs.size()-1; i++) { - if (tmpArgs.get(i).equals("-t")) { - try { - numThreads = Integer.parseInt(tmpArgs.get(i+1)); - tmpArgs.remove(i+1); - tmpArgs.remove(i); - } catch (NumberFormatException e) { - System.err.println("Invalid number of threads."); - System.err.println(e.getStackTrace()); - } - } - } - logger.info("Number of threads:{}", numThreads); - - args = tmpArgs.toArray(new String[0]); - - StringBuilder cliString = new StringBuilder(); - for (int i = 0; i < args.length; i++) { - cliString.append(" ").append(args[i]); - } - logger.debug("Command line string = {}", cliString.toString()); - System.out.println("Command line string = " + cliString.toString()); + ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); - Task task = null; + // Get number of threads for multithreading mode + int numThreads = 1; + for (int i = 0; i < tmpArgs.size() - 1; i++) { + if (tmpArgs.get(i).equals("-t")) { try { - task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, null); - logger.info("Sucessfully instantiating {}", task.getClass().getCanonicalName()); - } catch (Exception e) { - logger.error("Fail to initialize the task", e); - System.out.println("Fail to initialize the task" + e); - return; + numThreads = Integer.parseInt(tmpArgs.get(i + 1)); + tmpArgs.remove(i + 1); + tmpArgs.remove(i); + } catch (NumberFormatException e) { + System.err.println("Invalid number of threads."); + System.err.println(e.getStackTrace()); } - task.setFactory(new ThreadsComponentFactory()); - task.init(); - - ThreadsEngine.submitTopology(task.getTopology(), numThreads); + } } + logger.info("Number of threads:{}", numThreads); + + args = tmpArgs.toArray(new String[0]); + + StringBuilder cliString = new StringBuilder(); + for (int i = 0; i < args.length; i++) { + cliString.append(" ").append(args[i]); + } + logger.debug("Command line string = {}", cliString.toString()); + System.out.println("Command line string = " + cliString.toString()); + + Task task = null; + try { + task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, null); + logger.info("Sucessfully instantiating {}", task.getClass().getCanonicalName()); + } catch (Exception e) { + logger.error("Fail to initialize the task", e); + System.out.println("Fail to initialize the task" + e); + return; + } + task.setFactory(new ThreadsComponentFactory()); + task.init(); + + ThreadsEngine.submitTopology(task.getTopology(), numThreads); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java index ac68da2..91f213b 100644 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java @@ -31,34 +31,35 @@ import com.yahoo.labs.samoa.topology.Topology; /** * ComponentFactory for multithreaded engine + * * @author Anh Thu Vu - * + * */ public class ThreadsComponentFactory implements ComponentFactory { - @Override - public ProcessingItem createPi(Processor processor) { - return this.createPi(processor, 1); - } + @Override + public ProcessingItem createPi(Processor processor) { + return this.createPi(processor, 1); + } - @Override - public ProcessingItem createPi(Processor processor, int paralellism) { - return new ThreadsProcessingItem(processor, paralellism); - } + @Override + public ProcessingItem createPi(Processor processor, int paralellism) { + return new ThreadsProcessingItem(processor, paralellism); + } - @Override - public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { - return new ThreadsEntranceProcessingItem(entranceProcessor); - } + @Override + public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { + return new ThreadsEntranceProcessingItem(entranceProcessor); + } - @Override - public Stream createStream(IProcessingItem sourcePi) { - return new ThreadsStream(sourcePi); - } + @Override + public Stream createStream(IProcessingItem sourcePi) { + return new ThreadsStream(sourcePi); + } - @Override - public Topology createTopology(String topoName) { - return new ThreadsTopology(topoName); - } + @Override + public Topology createTopology(String topoName) { + return new ThreadsTopology(topoName); + } }
