http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java deleted file mode 100644 index cba6b28..0000000 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java +++ /dev/null @@ -1,212 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.Map; -import java.util.UUID; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.topology.AbstractEntranceProcessingItem; -import com.yahoo.labs.samoa.topology.EntranceProcessingItem; -import com.yahoo.labs.samoa.topology.Stream; - -/** - * EntranceProcessingItem implementation for Storm. - */ -class StormEntranceProcessingItem extends AbstractEntranceProcessingItem implements StormTopologyNode { - private final StormEntranceSpout piSpout; - - StormEntranceProcessingItem(EntranceProcessor processor) { - this(processor, UUID.randomUUID().toString()); - } - - StormEntranceProcessingItem(EntranceProcessor processor, String friendlyId) { - super(processor); - this.setName(friendlyId); - this.piSpout = new StormEntranceSpout(processor); - } - - @Override - public EntranceProcessingItem setOutputStream(Stream stream) { - // piSpout.streams.add(stream); - piSpout.setOutputStream((StormStream) stream); - return this; - } - - @Override - public Stream getOutputStream() { - return piSpout.getOutputStream(); - } - - @Override - public void addToTopology(StormTopology topology, int parallelismHint) { - topology.getStormBuilder().setSpout(this.getName(), piSpout, parallelismHint); - } - - @Override - public StormStream createStream() { - return piSpout.createStream(this.getName()); - } - - @Override - public String getId() { - return this.getName(); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(super.toString()); - sb.insert(0, String.format("id: %s, ", this.getName())); - return sb.toString(); - } - - /** - * Resulting Spout of StormEntranceProcessingItem - */ - final static class StormEntranceSpout extends BaseRichSpout { - - private static final long serialVersionUID = -9066409791668954099L; - - // private final Set<StormSpoutStream> streams; - private final EntranceProcessor entranceProcessor; - private StormStream outputStream; - - // private transient SpoutStarter spoutStarter; - // private transient Executor spoutExecutors; - // private transient LinkedBlockingQueue<StormTupleInfo> tupleInfoQueue; - - private SpoutOutputCollector collector; - - StormEntranceSpout(EntranceProcessor processor) { - // this.streams = new HashSet<StormSpoutStream>(); - this.entranceProcessor = processor; - } - - public StormStream getOutputStream() { - return outputStream; - } - - public void setOutputStream(StormStream stream) { - this.outputStream = stream; - } - - @Override - public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) { - this.collector = collector; - // this.tupleInfoQueue = new LinkedBlockingQueue<StormTupleInfo>(); - - // Processor and this class share the same instance of stream - // for (StormSpoutStream stream : streams) { - // stream.setSpout(this); - // } - // outputStream.setSpout(this); - - this.entranceProcessor.onCreate(context.getThisTaskId()); - // this.spoutStarter = new SpoutStarter(this.starter); - - // this.spoutExecutors = Executors.newSingleThreadExecutor(); - // this.spoutExecutors.execute(spoutStarter); - } - - @Override - public void nextTuple() { - if (entranceProcessor.hasNext()) { - Values value = newValues(entranceProcessor.nextEvent()); - collector.emit(outputStream.getOutputId(), value); - } else - Utils.sleep(1000); - // StormTupleInfo tupleInfo = tupleInfoQueue.poll(50, - // TimeUnit.MILLISECONDS); - // if (tupleInfo != null) { - // Values value = newValues(tupleInfo.getContentEvent()); - // collector.emit(tupleInfo.getStormStream().getOutputId(), value); - // } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - // for (StormStream stream : streams) { - // declarer.declareStream(stream.getOutputId(), new - // Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, - // StormSamoaUtils.KEY_FIELD)); - // } - declarer.declareStream(outputStream.getOutputId(), new Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, - StormSamoaUtils.KEY_FIELD)); - } - - StormStream createStream(String piId) { - // StormSpoutStream stream = new StormSpoutStream(piId); - StormStream stream = new StormBoltStream(piId); - // streams.add(stream); - return stream; - } - - // void put(StormSpoutStream stream, ContentEvent contentEvent) { - // tupleInfoQueue.add(new StormTupleInfo(stream, contentEvent)); - // } - - private Values newValues(ContentEvent contentEvent) { - return new Values(contentEvent, contentEvent.getKey()); - } - - // private final static class StormTupleInfo { - // - // private final StormStream stream; - // private final ContentEvent event; - // - // StormTupleInfo(StormStream stream, ContentEvent event) { - // this.stream = stream; - // this.event = event; - // } - // - // public StormStream getStormStream() { - // return this.stream; - // } - // - // public ContentEvent getContentEvent() { - // return this.event; - // } - // } - - // private final static class SpoutStarter implements Runnable { - // - // private final TopologyStarter topoStarter; - // - // SpoutStarter(TopologyStarter topoStarter) { - // this.topoStarter = topoStarter; - // } - // - // @Override - // public void run() { - // this.topoStarter.start(); - // } - // } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java deleted file mode 100644 index bd7a666..0000000 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java +++ /dev/null @@ -1,75 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Properties; - -import backtype.storm.Config; -import backtype.storm.StormSubmitter; -import backtype.storm.utils.Utils; - -/** - * Utility class to submit samoa-storm jar to a Storm cluster. - * - * @author Arinto Murdopo - * - */ -public class StormJarSubmitter { - - public final static String UPLOADED_JAR_LOCATION_KEY = "UploadedJarLocation"; - - /** - * @param args - * @throws IOException - */ - public static void main(String[] args) throws IOException { - - Config config = new Config(); - config.putAll(Utils.readCommandLineOpts()); - config.putAll(Utils.readStormConfig()); - - String nimbusHost = (String) config.get(Config.NIMBUS_HOST); - int nimbusThriftPort = Utils.getInt(config - .get(Config.NIMBUS_THRIFT_PORT)); - - System.out.println("Nimbus host " + nimbusHost); - System.out.println("Nimbus thrift port " + nimbusThriftPort); - - System.out.println("uploading jar from " + args[0]); - String uploadedJarLocation = StormSubmitter.submitJar(config, args[0]); - - System.out.println("Uploaded jar file location: "); - System.out.println(uploadedJarLocation); - - Properties props = StormSamoaUtils.getProperties(); - props.setProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY, uploadedJarLocation); - - File f = new File("src/main/resources/samoa-storm-cluster.properties"); - f.createNewFile(); - - OutputStream out = new FileOutputStream(f); - props.store(out, "properties file to store uploaded jar location from StormJarSubmitter"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java deleted file mode 100644 index 509862b..0000000 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java +++ /dev/null @@ -1,169 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.topology.AbstractProcessingItem; -import com.yahoo.labs.samoa.topology.ProcessingItem; -import com.yahoo.labs.samoa.topology.Stream; -import com.yahoo.labs.samoa.topology.impl.StormStream.InputStreamId; -import com.yahoo.labs.samoa.utils.PartitioningScheme; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.BoltDeclarer; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; - -/** - * ProcessingItem implementation for Storm. - * - * @author Arinto Murdopo - * - */ -class StormProcessingItem extends AbstractProcessingItem implements StormTopologyNode { - private final ProcessingItemBolt piBolt; - private BoltDeclarer piBoltDeclarer; - - // TODO: should we put parallelism hint here? - // imo, parallelism hint only declared when we add this PI in the topology - // open for dicussion :p - - StormProcessingItem(Processor processor, int parallelismHint) { - this(processor, UUID.randomUUID().toString(), parallelismHint); - } - - StormProcessingItem(Processor processor, String friendlyId, int parallelismHint) { - super(processor, parallelismHint); - this.piBolt = new ProcessingItemBolt(processor); - this.setName(friendlyId); - } - - @Override - protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { - StormStream stormInputStream = (StormStream) inputStream; - InputStreamId inputId = stormInputStream.getInputId(); - - switch (scheme) { - case SHUFFLE: - piBoltDeclarer.shuffleGrouping(inputId.getComponentId(), inputId.getStreamId()); - break; - case GROUP_BY_KEY: - piBoltDeclarer.fieldsGrouping( - inputId.getComponentId(), - inputId.getStreamId(), - new Fields(StormSamoaUtils.KEY_FIELD)); - break; - case BROADCAST: - piBoltDeclarer.allGrouping( - inputId.getComponentId(), - inputId.getStreamId()); - break; - } - return this; - } - - @Override - public void addToTopology(StormTopology topology, int parallelismHint) { - if (piBoltDeclarer != null) { - // throw exception that one PI only belong to one topology - } else { - TopologyBuilder stormBuilder = topology.getStormBuilder(); - this.piBoltDeclarer = stormBuilder.setBolt(this.getName(), - this.piBolt, parallelismHint); - } - } - - @Override - public StormStream createStream() { - return piBolt.createStream(this.getName()); - } - - @Override - public String getId() { - return this.getName(); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(super.toString()); - sb.insert(0, String.format("id: %s, ", this.getName())); - return sb.toString(); - } - - private final static class ProcessingItemBolt extends BaseRichBolt { - - private static final long serialVersionUID = -6637673741263199198L; - - private final Set<StormBoltStream> streams; - private final Processor processor; - - private OutputCollector collector; - - ProcessingItemBolt(Processor processor) { - this.streams = new HashSet<StormBoltStream>(); - this.processor = processor; - } - - @Override - public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, - OutputCollector collector) { - this.collector = collector; - // Processor and this class share the same instance of stream - for (StormBoltStream stream : streams) { - stream.setCollector(this.collector); - } - - this.processor.onCreate(context.getThisTaskId()); - } - - @Override - public void execute(Tuple input) { - Object sentObject = input.getValue(0); - ContentEvent sentEvent = (ContentEvent) sentObject; - processor.process(sentEvent); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - for (StormStream stream : streams) { - declarer.declareStream(stream.getOutputId(), - new Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, - StormSamoaUtils.KEY_FIELD)); - } - } - - StormStream createStream(String piId) { - StormBoltStream stream = new StormBoltStream(piId); - streams.add(stream); - return stream; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java deleted file mode 100644 index bb9e3f4..0000000 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java +++ /dev/null @@ -1,109 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.github.javacliparser.ClassOption; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.List; -import java.util.Properties; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.yahoo.labs.samoa.tasks.Task; - -/** - * Utility class for samoa-storm project. It is used by StormDoTask to process its arguments. - * - * @author Arinto Murdopo - * - */ -public class StormSamoaUtils { - - private static final Logger logger = LoggerFactory.getLogger(StormSamoaUtils.class); - - static final String KEY_FIELD = "key"; - static final String CONTENT_EVENT_FIELD = "content_event"; - - static Properties getProperties() throws IOException { - Properties props = new Properties(); - InputStream is; - - File f = new File("src/main/resources/samoa-storm-cluster.properties"); // FIXME it does not exist anymore - is = new FileInputStream(f); - - try { - props.load(is); - } catch (IOException e1) { - System.out.println("Fail to load property file"); - return null; - } finally { - is.close(); - } - - return props; - } - - public static StormTopology argsToTopology(String[] args) { - StringBuilder cliString = new StringBuilder(); - for (String arg : args) { - cliString.append(" ").append(arg); - } - logger.debug("Command line string = {}", cliString.toString()); - - Task task = getTask(cliString.toString()); - - // TODO: remove setFactory method with DynamicBinding - task.setFactory(new StormComponentFactory()); - task.init(); - - return (StormTopology) task.getTopology(); - } - - public static int numWorkers(List<String> tmpArgs) { - int position = tmpArgs.size() - 1; - int numWorkers; - - try { - numWorkers = Integer.parseInt(tmpArgs.get(position)); - tmpArgs.remove(position); - } catch (NumberFormatException e) { - numWorkers = 4; - } - - return numWorkers; - } - - public static Task getTask(String cliString) { - Task task = null; - try { - logger.debug("Providing task [{}]", cliString); - task = ClassOption.cliStringToObject(cliString, Task.class, null); - } catch (Exception e) { - logger.warn("Fail in initializing the task!"); - e.printStackTrace(); - } - return task; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java deleted file mode 100644 index 0aab940..0000000 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java +++ /dev/null @@ -1,65 +0,0 @@ -//package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -// -//import com.yahoo.labs.samoa.core.ContentEvent; -//import com.yahoo.labs.samoa.topology.impl.StormEntranceProcessingItem.StormEntranceSpout; -// -///** -// * Storm Stream that connects into Spout. It wraps the spout itself -// * @author Arinto Murdopo -// * -// */ -//final class StormSpoutStream extends StormStream{ -// -// /** -// * -// */ -// private static final long serialVersionUID = -7444653177614988650L; -// -// private StormEntranceSpout spout; -// -// StormSpoutStream(String stormComponentId) { -// super(stormComponentId); -// } -// -// @Override -// public void put(ContentEvent contentEvent) { -// spout.put(this, contentEvent); -// } -// -// void setSpout(StormEntranceSpout spout){ -// this.spout = spout; -// } -// -//// @Override -//// public void setStreamId(String stream) { -//// // TODO Auto-generated method stub -//// -//// } -// -// @Override -// public String getStreamId() { -// // TODO Auto-generated method stub -// return null; -// } -// -// } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java deleted file mode 100644 index ca1edc9..0000000 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java +++ /dev/null @@ -1,86 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.UUID; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.topology.Stream; - -/** - * Abstract class to implement Storm Stream - * - * @author Arinto Murdopo - * - */ -abstract class StormStream implements Stream, java.io.Serializable { - - /** - * - */ - private static final long serialVersionUID = 281835563756514852L; - protected final String outputStreamId; - protected final InputStreamId inputStreamId; - - public StormStream(String stormComponentId) { - this.outputStreamId = UUID.randomUUID().toString(); - this.inputStreamId = new InputStreamId(stormComponentId, this.outputStreamId); - } - - @Override - public abstract void put(ContentEvent contentEvent); - - String getOutputId() { - return this.outputStreamId; - } - - InputStreamId getInputId() { - return this.inputStreamId; - } - - final static class InputStreamId implements java.io.Serializable { - - /** - * - */ - private static final long serialVersionUID = -7457995634133691295L; - private final String componentId; - private final String streamId; - - InputStreamId(String componentId, String streamId) { - this.componentId = componentId; - this.streamId = streamId; - } - - String getComponentId() { - return componentId; - } - - String getStreamId() { - return streamId; - } - } - - @Override - public void setBatchSize(int batchSize) { - // Ignore batch size - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java deleted file mode 100644 index 8a76c55..0000000 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import backtype.storm.topology.TopologyBuilder; - -import com.yahoo.labs.samoa.topology.IProcessingItem; -import com.yahoo.labs.samoa.topology.AbstractTopology; - -/** - * Adaptation of SAMOA topology in samoa-storm - * - * @author Arinto Murdopo - * - */ -public class StormTopology extends AbstractTopology { - - private TopologyBuilder builder; - - public StormTopology(String topologyName) { - super(topologyName); - this.builder = new TopologyBuilder(); - } - - @Override - public void addProcessingItem(IProcessingItem procItem, int parallelismHint) { - StormTopologyNode stormNode = (StormTopologyNode) procItem; - stormNode.addToTopology(this, parallelismHint); - super.addProcessingItem(procItem, parallelismHint); - } - - public TopologyBuilder getStormBuilder() { - return builder; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java deleted file mode 100644 index 8122258..0000000 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** - * Interface to represent a node in samoa-storm topology. - * - * @author Arinto Murdopo - * - */ -interface StormTopologyNode { - - void addToTopology(StormTopology topology, int parallelismHint); - - StormStream createStream(); - - String getId(); - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java deleted file mode 100644 index bbac521..0000000 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java +++ /dev/null @@ -1,132 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.IOException; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.thrift7.TException; -import org.json.simple.JSONValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.Config; -import backtype.storm.generated.AlreadyAliveException; -import backtype.storm.generated.InvalidTopologyException; -import backtype.storm.utils.NimbusClient; -import backtype.storm.utils.Utils; - -/** - * Helper class to submit SAMOA task into Storm without the need of submitting the jar file. The jar file must be - * submitted first using StormJarSubmitter class. - * - * @author Arinto Murdopo - * - */ -public class StormTopologySubmitter { - - public static String YJP_OPTIONS_KEY = "YjpOptions"; - - private static Logger logger = LoggerFactory.getLogger(StormTopologySubmitter.class); - - public static void main(String[] args) throws IOException { - Properties props = StormSamoaUtils.getProperties(); - - String uploadedJarLocation = props.getProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY); - if (uploadedJarLocation == null) { - logger.error("Invalid properties file. It must have key {}", - StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY); - return; - } - - List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); - int numWorkers = StormSamoaUtils.numWorkers(tmpArgs); - - args = tmpArgs.toArray(new String[0]); - StormTopology stormTopo = StormSamoaUtils.argsToTopology(args); - - Config conf = new Config(); - conf.putAll(Utils.readStormConfig()); - conf.putAll(Utils.readCommandLineOpts()); - conf.setDebug(false); - conf.setNumWorkers(numWorkers); - - String profilerOption = - props.getProperty(StormTopologySubmitter.YJP_OPTIONS_KEY); - if (profilerOption != null) { - String topoWorkerChildOpts = (String) conf.get(Config.TOPOLOGY_WORKER_CHILDOPTS); - StringBuilder optionBuilder = new StringBuilder(); - if (topoWorkerChildOpts != null) { - optionBuilder.append(topoWorkerChildOpts); - optionBuilder.append(' '); - } - optionBuilder.append(profilerOption); - conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, optionBuilder.toString()); - } - - Map<String, Object> myConfigMap = new HashMap<String, Object>(conf); - StringWriter out = new StringWriter(); - - try { - JSONValue.writeJSONString(myConfigMap, out); - } catch (IOException e) { - System.out.println("Error in writing JSONString"); - e.printStackTrace(); - return; - } - - Config config = new Config(); - config.putAll(Utils.readStormConfig()); - - NimbusClient nc = NimbusClient.getConfiguredClient(config); - String topologyName = stormTopo.getTopologyName(); - try { - System.out.println("Submitting topology with name: " - + topologyName); - nc.getClient().submitTopology(topologyName, uploadedJarLocation, - out.toString(), stormTopo.getStormBuilder().createTopology()); - System.out.println(topologyName + " is successfully submitted"); - - } catch (AlreadyAliveException aae) { - System.out.println("Fail to submit " + topologyName - + "\nError message: " + aae.get_msg()); - } catch (InvalidTopologyException ite) { - System.out.println("Invalid topology for " + topologyName); - ite.printStackTrace(); - } catch (TException te) { - System.out.println("Texception for " + topologyName); - te.printStackTrace(); - } - } - - private static String uploadedJarLocation(List<String> tmpArgs) { - int position = tmpArgs.size() - 1; - String uploadedJarLocation = tmpArgs.get(position); - tmpArgs.remove(position); - return uploadedJarLocation; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/LocalStormDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/org/apache/samoa/LocalStormDoTask.java b/samoa-storm/src/main/java/org/apache/samoa/LocalStormDoTask.java new file mode 100644 index 0000000..f2b9c0c --- /dev/null +++ b/samoa-storm/src/main/java/org/apache/samoa/LocalStormDoTask.java @@ -0,0 +1,78 @@ +package org.apache.samoa; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.samoa.topology.impl.StormSamoaUtils; +import org.apache.samoa.topology.impl.StormTopology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.Config; +import backtype.storm.utils.Utils; + +/** + * The main class to execute a SAMOA task in LOCAL mode in Storm. + * + * @author Arinto Murdopo + * + */ +public class LocalStormDoTask { + + private static final Logger logger = LoggerFactory.getLogger(LocalStormDoTask.class); + + /** + * The main method. + * + * @param args + * the arguments + */ + public static void main(String[] args) { + + List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + + int numWorker = StormSamoaUtils.numWorkers(tmpArgs); + + args = tmpArgs.toArray(new String[0]); + + // convert the arguments into Storm topology + StormTopology stormTopo = StormSamoaUtils.argsToTopology(args); + String topologyName = stormTopo.getTopologyName(); + + Config conf = new Config(); + // conf.putAll(Utils.readStormConfig()); + conf.setDebug(false); + + // local mode + conf.setMaxTaskParallelism(numWorker); + + backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster(); + cluster.submitTopology(topologyName, conf, stormTopo.getStormBuilder().createTopology()); + + backtype.storm.utils.Utils.sleep(600 * 1000); + + cluster.killTopology(topologyName); + cluster.shutdown(); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormBoltStream.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormBoltStream.java b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormBoltStream.java new file mode 100644 index 0000000..de9ef0a --- /dev/null +++ b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormBoltStream.java @@ -0,0 +1,67 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.ContentEvent; + +import backtype.storm.task.OutputCollector; +import backtype.storm.tuple.Values; + +/** + * Storm Stream that connects into Bolt. It wraps Storm's outputCollector class + * + * @author Arinto Murdopo + * + */ +class StormBoltStream extends StormStream { + + /** + * + */ + private static final long serialVersionUID = -5712513402991550847L; + + private OutputCollector outputCollector; + + StormBoltStream(String stormComponentId) { + super(stormComponentId); + } + + @Override + public void put(ContentEvent contentEvent) { + outputCollector.emit(this.outputStreamId, new Values(contentEvent, contentEvent.getKey())); + } + + public void setCollector(OutputCollector outputCollector) { + this.outputCollector = outputCollector; + } + + // @Override + // public void setStreamId(String streamId) { + // // TODO Auto-generated method stub + // //this.outputStreamId = streamId; + // } + + @Override + public String getStreamId() { + // TODO Auto-generated method stub + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormComponentFactory.java b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormComponentFactory.java new file mode 100644 index 0000000..353cffc --- /dev/null +++ b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormComponentFactory.java @@ -0,0 +1,90 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.HashMap; +import java.util.Map; + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; + +/** + * Component factory implementation for samoa-storm + */ +public final class StormComponentFactory implements ComponentFactory { + + private final Map<String, Integer> processorList; + + public StormComponentFactory() { + processorList = new HashMap<>(); + } + + @Override + public ProcessingItem createPi(Processor processor) { + return new StormProcessingItem(processor, this.getComponentName(processor.getClass()), 1); + } + + @Override + public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { + return new StormEntranceProcessingItem(processor, this.getComponentName(processor.getClass())); + } + + @Override + public Stream createStream(IProcessingItem sourcePi) { + StormTopologyNode stormCompatiblePi = (StormTopologyNode) sourcePi; + return stormCompatiblePi.createStream(); + } + + @Override + public Topology createTopology(String topoName) { + return new StormTopology(topoName); + } + + private String getComponentName(Class<? extends Processor> clazz) { + StringBuilder componentName = new StringBuilder(clazz.getCanonicalName()); + String key = componentName.toString(); + Integer index; + + if (!processorList.containsKey(key)) { + index = 1; + } else { + index = processorList.get(key) + 1; + } + + processorList.put(key, index); + + componentName.append('_'); + componentName.append(index); + + return componentName.toString(); + } + + @Override + public ProcessingItem createPi(Processor processor, int parallelism) { + return new StormProcessingItem(processor, this.getComponentName(processor.getClass()), parallelism); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormDoTask.java b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormDoTask.java new file mode 100644 index 0000000..436db59 --- /dev/null +++ b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormDoTask.java @@ -0,0 +1,118 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.Config; +import backtype.storm.utils.Utils; + +/** + * The main class that used by samoa script to execute SAMOA task. + * + * @author Arinto Murdopo + * + */ +public class StormDoTask { + private static final Logger logger = LoggerFactory.getLogger(StormDoTask.class); + private static String localFlag = "local"; + private static String clusterFlag = "cluster"; + + /** + * The main method. + * + * @param args + * the arguments + */ + public static void main(String[] args) { + + List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + + boolean isLocal = isLocal(tmpArgs); + int numWorker = StormSamoaUtils.numWorkers(tmpArgs); + + args = tmpArgs.toArray(new String[0]); + + // convert the arguments into Storm topology + StormTopology stormTopo = StormSamoaUtils.argsToTopology(args); + String topologyName = stormTopo.getTopologyName(); + + Config conf = new Config(); + conf.putAll(Utils.readStormConfig()); + conf.setDebug(false); + + if (isLocal) { + // local mode + conf.setMaxTaskParallelism(numWorker); + + backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster(); + cluster.submitTopology(topologyName, conf, stormTopo.getStormBuilder().createTopology()); + + backtype.storm.utils.Utils.sleep(600 * 1000); + + cluster.killTopology(topologyName); + cluster.shutdown(); + + } else { + // cluster mode + conf.setNumWorkers(numWorker); + try { + backtype.storm.StormSubmitter.submitTopology(topologyName, conf, + stormTopo.getStormBuilder().createTopology()); + } catch (backtype.storm.generated.AlreadyAliveException ale) { + ale.printStackTrace(); + } catch (backtype.storm.generated.InvalidTopologyException ite) { + ite.printStackTrace(); + } + } + } + + private static boolean isLocal(List<String> tmpArgs) { + ExecutionMode executionMode = ExecutionMode.UNDETERMINED; + + int position = tmpArgs.size() - 1; + String flag = tmpArgs.get(position); + boolean isLocal = true; + + if (flag.equals(clusterFlag)) { + executionMode = ExecutionMode.CLUSTER; + isLocal = false; + } else if (flag.equals(localFlag)) { + executionMode = ExecutionMode.LOCAL; + isLocal = true; + } + + if (executionMode != ExecutionMode.UNDETERMINED) { + tmpArgs.remove(position); + } + + return isLocal; + } + + private enum ExecutionMode { + LOCAL, CLUSTER, UNDETERMINED + }; +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormEntranceProcessingItem.java b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormEntranceProcessingItem.java new file mode 100644 index 0000000..1183fac --- /dev/null +++ b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormEntranceProcessingItem.java @@ -0,0 +1,212 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.Map; +import java.util.UUID; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.topology.AbstractEntranceProcessingItem; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.Stream; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; + +/** + * EntranceProcessingItem implementation for Storm. + */ +class StormEntranceProcessingItem extends AbstractEntranceProcessingItem implements StormTopologyNode { + private final StormEntranceSpout piSpout; + + StormEntranceProcessingItem(EntranceProcessor processor) { + this(processor, UUID.randomUUID().toString()); + } + + StormEntranceProcessingItem(EntranceProcessor processor, String friendlyId) { + super(processor); + this.setName(friendlyId); + this.piSpout = new StormEntranceSpout(processor); + } + + @Override + public EntranceProcessingItem setOutputStream(Stream stream) { + // piSpout.streams.add(stream); + piSpout.setOutputStream((StormStream) stream); + return this; + } + + @Override + public Stream getOutputStream() { + return piSpout.getOutputStream(); + } + + @Override + public void addToTopology(StormTopology topology, int parallelismHint) { + topology.getStormBuilder().setSpout(this.getName(), piSpout, parallelismHint); + } + + @Override + public StormStream createStream() { + return piSpout.createStream(this.getName()); + } + + @Override + public String getId() { + return this.getName(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(super.toString()); + sb.insert(0, String.format("id: %s, ", this.getName())); + return sb.toString(); + } + + /** + * Resulting Spout of StormEntranceProcessingItem + */ + final static class StormEntranceSpout extends BaseRichSpout { + + private static final long serialVersionUID = -9066409791668954099L; + + // private final Set<StormSpoutStream> streams; + private final EntranceProcessor entranceProcessor; + private StormStream outputStream; + + // private transient SpoutStarter spoutStarter; + // private transient Executor spoutExecutors; + // private transient LinkedBlockingQueue<StormTupleInfo> tupleInfoQueue; + + private SpoutOutputCollector collector; + + StormEntranceSpout(EntranceProcessor processor) { + // this.streams = new HashSet<StormSpoutStream>(); + this.entranceProcessor = processor; + } + + public StormStream getOutputStream() { + return outputStream; + } + + public void setOutputStream(StormStream stream) { + this.outputStream = stream; + } + + @Override + public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; + // this.tupleInfoQueue = new LinkedBlockingQueue<StormTupleInfo>(); + + // Processor and this class share the same instance of stream + // for (StormSpoutStream stream : streams) { + // stream.setSpout(this); + // } + // outputStream.setSpout(this); + + this.entranceProcessor.onCreate(context.getThisTaskId()); + // this.spoutStarter = new SpoutStarter(this.starter); + + // this.spoutExecutors = Executors.newSingleThreadExecutor(); + // this.spoutExecutors.execute(spoutStarter); + } + + @Override + public void nextTuple() { + if (entranceProcessor.hasNext()) { + Values value = newValues(entranceProcessor.nextEvent()); + collector.emit(outputStream.getOutputId(), value); + } else + Utils.sleep(1000); + // StormTupleInfo tupleInfo = tupleInfoQueue.poll(50, + // TimeUnit.MILLISECONDS); + // if (tupleInfo != null) { + // Values value = newValues(tupleInfo.getContentEvent()); + // collector.emit(tupleInfo.getStormStream().getOutputId(), value); + // } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // for (StormStream stream : streams) { + // declarer.declareStream(stream.getOutputId(), new + // Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, + // StormSamoaUtils.KEY_FIELD)); + // } + declarer.declareStream(outputStream.getOutputId(), new Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, + StormSamoaUtils.KEY_FIELD)); + } + + StormStream createStream(String piId) { + // StormSpoutStream stream = new StormSpoutStream(piId); + StormStream stream = new StormBoltStream(piId); + // streams.add(stream); + return stream; + } + + // void put(StormSpoutStream stream, ContentEvent contentEvent) { + // tupleInfoQueue.add(new StormTupleInfo(stream, contentEvent)); + // } + + private Values newValues(ContentEvent contentEvent) { + return new Values(contentEvent, contentEvent.getKey()); + } + + // private final static class StormTupleInfo { + // + // private final StormStream stream; + // private final ContentEvent event; + // + // StormTupleInfo(StormStream stream, ContentEvent event) { + // this.stream = stream; + // this.event = event; + // } + // + // public StormStream getStormStream() { + // return this.stream; + // } + // + // public ContentEvent getContentEvent() { + // return this.event; + // } + // } + + // private final static class SpoutStarter implements Runnable { + // + // private final TopologyStarter topoStarter; + // + // SpoutStarter(TopologyStarter topoStarter) { + // this.topoStarter = topoStarter; + // } + // + // @Override + // public void run() { + // this.topoStarter.start(); + // } + // } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormJarSubmitter.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormJarSubmitter.java b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormJarSubmitter.java new file mode 100644 index 0000000..a3de798 --- /dev/null +++ b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormJarSubmitter.java @@ -0,0 +1,75 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Properties; + +import backtype.storm.Config; +import backtype.storm.StormSubmitter; +import backtype.storm.utils.Utils; + +/** + * Utility class to submit samoa-storm jar to a Storm cluster. + * + * @author Arinto Murdopo + * + */ +public class StormJarSubmitter { + + public final static String UPLOADED_JAR_LOCATION_KEY = "UploadedJarLocation"; + + /** + * @param args + * @throws IOException + */ + public static void main(String[] args) throws IOException { + + Config config = new Config(); + config.putAll(Utils.readCommandLineOpts()); + config.putAll(Utils.readStormConfig()); + + String nimbusHost = (String) config.get(Config.NIMBUS_HOST); + int nimbusThriftPort = Utils.getInt(config + .get(Config.NIMBUS_THRIFT_PORT)); + + System.out.println("Nimbus host " + nimbusHost); + System.out.println("Nimbus thrift port " + nimbusThriftPort); + + System.out.println("uploading jar from " + args[0]); + String uploadedJarLocation = StormSubmitter.submitJar(config, args[0]); + + System.out.println("Uploaded jar file location: "); + System.out.println(uploadedJarLocation); + + Properties props = StormSamoaUtils.getProperties(); + props.setProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY, uploadedJarLocation); + + File f = new File("src/main/resources/samoa-storm-cluster.properties"); + f.createNewFile(); + + OutputStream out = new FileOutputStream(f); + props.store(out, "properties file to store uploaded jar location from StormJarSubmitter"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormProcessingItem.java b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormProcessingItem.java new file mode 100644 index 0000000..28c3746 --- /dev/null +++ b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormProcessingItem.java @@ -0,0 +1,169 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.AbstractProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.impl.StormStream.InputStreamId; +import org.apache.samoa.utils.PartitioningScheme; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.BoltDeclarer; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; + +/** + * ProcessingItem implementation for Storm. + * + * @author Arinto Murdopo + * + */ +class StormProcessingItem extends AbstractProcessingItem implements StormTopologyNode { + private final ProcessingItemBolt piBolt; + private BoltDeclarer piBoltDeclarer; + + // TODO: should we put parallelism hint here? + // imo, parallelism hint only declared when we add this PI in the topology + // open for dicussion :p + + StormProcessingItem(Processor processor, int parallelismHint) { + this(processor, UUID.randomUUID().toString(), parallelismHint); + } + + StormProcessingItem(Processor processor, String friendlyId, int parallelismHint) { + super(processor, parallelismHint); + this.piBolt = new ProcessingItemBolt(processor); + this.setName(friendlyId); + } + + @Override + protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { + StormStream stormInputStream = (StormStream) inputStream; + InputStreamId inputId = stormInputStream.getInputId(); + + switch (scheme) { + case SHUFFLE: + piBoltDeclarer.shuffleGrouping(inputId.getComponentId(), inputId.getStreamId()); + break; + case GROUP_BY_KEY: + piBoltDeclarer.fieldsGrouping( + inputId.getComponentId(), + inputId.getStreamId(), + new Fields(StormSamoaUtils.KEY_FIELD)); + break; + case BROADCAST: + piBoltDeclarer.allGrouping( + inputId.getComponentId(), + inputId.getStreamId()); + break; + } + return this; + } + + @Override + public void addToTopology(StormTopology topology, int parallelismHint) { + if (piBoltDeclarer != null) { + // throw exception that one PI only belong to one topology + } else { + TopologyBuilder stormBuilder = topology.getStormBuilder(); + this.piBoltDeclarer = stormBuilder.setBolt(this.getName(), + this.piBolt, parallelismHint); + } + } + + @Override + public StormStream createStream() { + return piBolt.createStream(this.getName()); + } + + @Override + public String getId() { + return this.getName(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(super.toString()); + sb.insert(0, String.format("id: %s, ", this.getName())); + return sb.toString(); + } + + private final static class ProcessingItemBolt extends BaseRichBolt { + + private static final long serialVersionUID = -6637673741263199198L; + + private final Set<StormBoltStream> streams; + private final Processor processor; + + private OutputCollector collector; + + ProcessingItemBolt(Processor processor) { + this.streams = new HashSet<StormBoltStream>(); + this.processor = processor; + } + + @Override + public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, + OutputCollector collector) { + this.collector = collector; + // Processor and this class share the same instance of stream + for (StormBoltStream stream : streams) { + stream.setCollector(this.collector); + } + + this.processor.onCreate(context.getThisTaskId()); + } + + @Override + public void execute(Tuple input) { + Object sentObject = input.getValue(0); + ContentEvent sentEvent = (ContentEvent) sentObject; + processor.process(sentEvent); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + for (StormStream stream : streams) { + declarer.declareStream(stream.getOutputId(), + new Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, + StormSamoaUtils.KEY_FIELD)); + } + } + + StormStream createStream(String piId) { + StormBoltStream stream = new StormBoltStream(piId); + streams.add(stream); + return stream; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSamoaUtils.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSamoaUtils.java b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSamoaUtils.java new file mode 100644 index 0000000..86a5578 --- /dev/null +++ b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSamoaUtils.java @@ -0,0 +1,109 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.github.javacliparser.ClassOption; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Properties; + +import org.apache.samoa.tasks.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for samoa-storm project. It is used by StormDoTask to process its arguments. + * + * @author Arinto Murdopo + * + */ +public class StormSamoaUtils { + + private static final Logger logger = LoggerFactory.getLogger(StormSamoaUtils.class); + + static final String KEY_FIELD = "key"; + static final String CONTENT_EVENT_FIELD = "content_event"; + + static Properties getProperties() throws IOException { + Properties props = new Properties(); + InputStream is; + + File f = new File("src/main/resources/samoa-storm-cluster.properties"); // FIXME it does not exist anymore + is = new FileInputStream(f); + + try { + props.load(is); + } catch (IOException e1) { + System.out.println("Fail to load property file"); + return null; + } finally { + is.close(); + } + + return props; + } + + public static StormTopology argsToTopology(String[] args) { + StringBuilder cliString = new StringBuilder(); + for (String arg : args) { + cliString.append(" ").append(arg); + } + logger.debug("Command line string = {}", cliString.toString()); + + Task task = getTask(cliString.toString()); + + // TODO: remove setFactory method with DynamicBinding + task.setFactory(new StormComponentFactory()); + task.init(); + + return (StormTopology) task.getTopology(); + } + + public static int numWorkers(List<String> tmpArgs) { + int position = tmpArgs.size() - 1; + int numWorkers; + + try { + numWorkers = Integer.parseInt(tmpArgs.get(position)); + tmpArgs.remove(position); + } catch (NumberFormatException e) { + numWorkers = 4; + } + + return numWorkers; + } + + public static Task getTask(String cliString) { + Task task = null; + try { + logger.debug("Providing task [{}]", cliString); + task = ClassOption.cliStringToObject(cliString, Task.class, null); + } catch (Exception e) { + logger.warn("Fail in initializing the task!"); + e.printStackTrace(); + } + return task; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSpoutStream.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSpoutStream.java b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSpoutStream.java new file mode 100644 index 0000000..14fc424 --- /dev/null +++ b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSpoutStream.java @@ -0,0 +1,66 @@ +package org.apache.samoa.topology.impl; +//package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +// +//import org.apache.samoa.core.ContentEvent; +//import org.apache.samoa.topology.impl.StormEntranceProcessingItem.StormEntranceSpout; +// +///** +// * Storm Stream that connects into Spout. It wraps the spout itself +// * @author Arinto Murdopo +// * +// */ +//final class StormSpoutStream extends StormStream{ +// +// /** +// * +// */ +// private static final long serialVersionUID = -7444653177614988650L; +// +// private StormEntranceSpout spout; +// +// StormSpoutStream(String stormComponentId) { +// super(stormComponentId); +// } +// +// @Override +// public void put(ContentEvent contentEvent) { +// spout.put(this, contentEvent); +// } +// +// void setSpout(StormEntranceSpout spout){ +// this.spout = spout; +// } +// +//// @Override +//// public void setStreamId(String stream) { +//// // TODO Auto-generated method stub +//// +//// } +// +// @Override +// public String getStreamId() { +// // TODO Auto-generated method stub +// return null; +// } +// +// } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormStream.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormStream.java b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormStream.java new file mode 100644 index 0000000..1c62389 --- /dev/null +++ b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormStream.java @@ -0,0 +1,86 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.UUID; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.topology.Stream; + +/** + * Abstract class to implement Storm Stream + * + * @author Arinto Murdopo + * + */ +abstract class StormStream implements Stream, java.io.Serializable { + + /** + * + */ + private static final long serialVersionUID = 281835563756514852L; + protected final String outputStreamId; + protected final InputStreamId inputStreamId; + + public StormStream(String stormComponentId) { + this.outputStreamId = UUID.randomUUID().toString(); + this.inputStreamId = new InputStreamId(stormComponentId, this.outputStreamId); + } + + @Override + public abstract void put(ContentEvent contentEvent); + + String getOutputId() { + return this.outputStreamId; + } + + InputStreamId getInputId() { + return this.inputStreamId; + } + + final static class InputStreamId implements java.io.Serializable { + + /** + * + */ + private static final long serialVersionUID = -7457995634133691295L; + private final String componentId; + private final String streamId; + + InputStreamId(String componentId, String streamId) { + this.componentId = componentId; + this.streamId = streamId; + } + + String getComponentId() { + return componentId; + } + + String getStreamId() { + return streamId; + } + } + + @Override + public void setBatchSize(int batchSize) { + // Ignore batch size + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopology.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopology.java b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopology.java new file mode 100644 index 0000000..51bb909 --- /dev/null +++ b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopology.java @@ -0,0 +1,53 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.topology.AbstractTopology; +import org.apache.samoa.topology.IProcessingItem; + +import backtype.storm.topology.TopologyBuilder; + +/** + * Adaptation of SAMOA topology in samoa-storm + * + * @author Arinto Murdopo + * + */ +public class StormTopology extends AbstractTopology { + + private TopologyBuilder builder; + + public StormTopology(String topologyName) { + super(topologyName); + this.builder = new TopologyBuilder(); + } + + @Override + public void addProcessingItem(IProcessingItem procItem, int parallelismHint) { + StormTopologyNode stormNode = (StormTopologyNode) procItem; + stormNode.addToTopology(this, parallelismHint); + super.addProcessingItem(procItem, parallelismHint); + } + + public TopologyBuilder getStormBuilder() { + return builder; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologyNode.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologyNode.java b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologyNode.java new file mode 100644 index 0000000..994fb9f --- /dev/null +++ b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologyNode.java @@ -0,0 +1,37 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * Interface to represent a node in samoa-storm topology. + * + * @author Arinto Murdopo + * + */ +interface StormTopologyNode { + + void addToTopology(StormTopology topology, int parallelismHint); + + StormStream createStream(); + + String getId(); + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologySubmitter.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologySubmitter.java b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologySubmitter.java new file mode 100644 index 0000000..25985df --- /dev/null +++ b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologySubmitter.java @@ -0,0 +1,132 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.thrift7.TException; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.Config; +import backtype.storm.generated.AlreadyAliveException; +import backtype.storm.generated.InvalidTopologyException; +import backtype.storm.utils.NimbusClient; +import backtype.storm.utils.Utils; + +/** + * Helper class to submit SAMOA task into Storm without the need of submitting the jar file. The jar file must be + * submitted first using StormJarSubmitter class. + * + * @author Arinto Murdopo + * + */ +public class StormTopologySubmitter { + + public static String YJP_OPTIONS_KEY = "YjpOptions"; + + private static Logger logger = LoggerFactory.getLogger(StormTopologySubmitter.class); + + public static void main(String[] args) throws IOException { + Properties props = StormSamoaUtils.getProperties(); + + String uploadedJarLocation = props.getProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY); + if (uploadedJarLocation == null) { + logger.error("Invalid properties file. It must have key {}", + StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY); + return; + } + + List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + int numWorkers = StormSamoaUtils.numWorkers(tmpArgs); + + args = tmpArgs.toArray(new String[0]); + StormTopology stormTopo = StormSamoaUtils.argsToTopology(args); + + Config conf = new Config(); + conf.putAll(Utils.readStormConfig()); + conf.putAll(Utils.readCommandLineOpts()); + conf.setDebug(false); + conf.setNumWorkers(numWorkers); + + String profilerOption = + props.getProperty(StormTopologySubmitter.YJP_OPTIONS_KEY); + if (profilerOption != null) { + String topoWorkerChildOpts = (String) conf.get(Config.TOPOLOGY_WORKER_CHILDOPTS); + StringBuilder optionBuilder = new StringBuilder(); + if (topoWorkerChildOpts != null) { + optionBuilder.append(topoWorkerChildOpts); + optionBuilder.append(' '); + } + optionBuilder.append(profilerOption); + conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, optionBuilder.toString()); + } + + Map<String, Object> myConfigMap = new HashMap<String, Object>(conf); + StringWriter out = new StringWriter(); + + try { + JSONValue.writeJSONString(myConfigMap, out); + } catch (IOException e) { + System.out.println("Error in writing JSONString"); + e.printStackTrace(); + return; + } + + Config config = new Config(); + config.putAll(Utils.readStormConfig()); + + NimbusClient nc = NimbusClient.getConfiguredClient(config); + String topologyName = stormTopo.getTopologyName(); + try { + System.out.println("Submitting topology with name: " + + topologyName); + nc.getClient().submitTopology(topologyName, uploadedJarLocation, + out.toString(), stormTopo.getStormBuilder().createTopology()); + System.out.println(topologyName + " is successfully submitted"); + + } catch (AlreadyAliveException aae) { + System.out.println("Fail to submit " + topologyName + + "\nError message: " + aae.get_msg()); + } catch (InvalidTopologyException ite) { + System.out.println("Invalid topology for " + topologyName); + ite.printStackTrace(); + } catch (TException te) { + System.out.println("Texception for " + topologyName); + te.printStackTrace(); + } + } + + private static String uploadedJarLocation(List<String> tmpArgs) { + int position = tmpArgs.size() - 1; + String uploadedJarLocation = tmpArgs.get(position); + tmpArgs.remove(position); + return uploadedJarLocation; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java b/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java deleted file mode 100644 index f767fa8..0000000 --- a/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.yahoo.labs.samoa; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.junit.Test; - -public class AlgosTest { - - @Test(timeout = 60000) - public void testVHTWithStorm() throws Exception { - - TestParams vhtConfig = new TestParams.Builder() - .inputInstances(200_000) - .samplingSize(20_000) - .evaluationInstances(200_000) - .classifiedInstances(200_000) - .classificationsCorrect(55f) - .kappaStat(0f) - .kappaTempStat(0f) - .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE) - .resultFilePollTimeout(30) - .prePollWait(15) - .taskClassName(LocalStormDoTask.class.getName()) - .build(); - TestUtils.test(vhtConfig); - - } - - @Test(timeout = 120000) - public void testBaggingWithStorm() throws Exception { - TestParams baggingConfig = new TestParams.Builder() - .inputInstances(200_000) - .samplingSize(20_000) - .evaluationInstances(180_000) - .classifiedInstances(190_000) - .classificationsCorrect(60f) - .kappaStat(0f) - .kappaTempStat(0f) - .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE) - .resultFilePollTimeout(40) - .prePollWait(20) - .taskClassName(LocalStormDoTask.class.getName()) - .build(); - TestUtils.test(baggingConfig); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java b/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java deleted file mode 100644 index 27696bd..0000000 --- a/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java +++ /dev/null @@ -1,82 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import static org.junit.Assert.assertEquals; -import mockit.Expectations; -import mockit.MockUp; -import mockit.Mocked; -import mockit.Tested; -import mockit.Verifications; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import backtype.storm.topology.BoltDeclarer; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.TopologyBuilder; - -import com.yahoo.labs.samoa.core.Processor; - -public class StormProcessingItemTest { - private static final int PARRALLELISM_HINT_2 = 2; - private static final int PARRALLELISM_HINT_4 = 4; - private static final String ID = "id"; - @Tested - private StormProcessingItem pi; - @Mocked - private Processor processor; - @Mocked - private StormTopology topology; - @Mocked - private TopologyBuilder stormBuilder = new TopologyBuilder(); - - @Before - public void setUp() { - pi = new StormProcessingItem(processor, ID, PARRALLELISM_HINT_2); - } - - @Test - public void testAddToTopology() { - new Expectations() { - { - topology.getStormBuilder(); - result = stormBuilder; - - stormBuilder.setBolt(ID, (IRichBolt) any, anyInt); - result = new MockUp<BoltDeclarer>() { - }.getMockInstance(); - } - }; - - pi.addToTopology(topology, PARRALLELISM_HINT_4); // this parallelism hint is ignored - - new Verifications() { - { - assertEquals(pi.getProcessor(), processor); - // TODO add methods to explore a topology and verify them - assertEquals(pi.getParallelism(), PARRALLELISM_HINT_2); - assertEquals(pi.getId(), ID); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/test/java/org/apache/samoa/AlgosTest.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/test/java/org/apache/samoa/AlgosTest.java b/samoa-storm/src/test/java/org/apache/samoa/AlgosTest.java new file mode 100644 index 0000000..2208be9 --- /dev/null +++ b/samoa-storm/src/test/java/org/apache/samoa/AlgosTest.java @@ -0,0 +1,69 @@ +package org.apache.samoa; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.LocalStormDoTask; +import org.apache.samoa.TestParams; +import org.apache.samoa.TestUtils; +import org.junit.Test; + +public class AlgosTest { + + @Test(timeout = 60000) + public void testVHTWithStorm() throws Exception { + + TestParams vhtConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(200_000) + .classifiedInstances(200_000) + .classificationsCorrect(55f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE) + .resultFilePollTimeout(30) + .prePollWait(15) + .taskClassName(LocalStormDoTask.class.getName()) + .build(); + TestUtils.test(vhtConfig); + + } + + @Test(timeout = 120000) + public void testBaggingWithStorm() throws Exception { + TestParams baggingConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(180_000) + .classifiedInstances(190_000) + .classificationsCorrect(60f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE) + .resultFilePollTimeout(40) + .prePollWait(20) + .taskClassName(LocalStormDoTask.class.getName()) + .build(); + TestUtils.test(baggingConfig); + + } + +}
