http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java index 1351159..da5644d 100644 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java @@ -35,154 +35,154 @@ import com.yahoo.labs.samoa.topology.ProcessingItem; import com.yahoo.labs.samoa.topology.Stream; /** - * S4 Platform platform specific processing item, inherits from S4 ProcessinElemnt. + * S4 Platform platform specific processing item, inherits from S4 + * ProcessinElemnt. * * @author severien - * + * */ public class S4ProcessingItem extends ProcessingElement implements - ProcessingItem { - - public static final Logger logger = LoggerFactory - .getLogger(S4ProcessingItem.class); - - private Processor processor; - private int paralellismLevel; - private S4DoTask app; - - private static final String NAME="PROCESSING-ITEM-"; - private static int OBJ_COUNTER=0; - - /** - * Constructor of S4 ProcessingItem. - * - * @param app : S4 application - */ - public S4ProcessingItem(App app) { - super(app); - super.setName(NAME+OBJ_COUNTER); - OBJ_COUNTER++; - this.app = (S4DoTask) app; - this.paralellismLevel = 1; - } - - @Override - public String getName() { - return super.getName(); - } - - /** - * Gets processing item paralellism level. - * - * @return int - */ - public int getParalellismLevel() { - return paralellismLevel; - } - - /** - * Sets processing item paralellism level. - * - * @param paralellismLevel - */ - public void setParalellismLevel(int paralellismLevel) { - this.paralellismLevel = paralellismLevel; - } - - /** - * onEvent method. - * - * @param event - */ - public void onEvent(S4Event event) { - if (processor.process(event.getContentEvent()) == true) { - close(); - } - } - - /** - * Sets S4 processing item processor. - * - * @param processor - */ - public void setProcessor(Processor processor) { - this.processor = processor; - } - - // Methods from ProcessingItem - @Override - public Processor getProcessor() { - return processor; - } - - /** - * KeyFinder sets the keys for a specific event. - * - * @return KeyFinder - */ - private KeyFinder<S4Event> getKeyFinder() { - KeyFinder<S4Event> keyFinder = new KeyFinder<S4Event>() { - @Override - public List<String> get(S4Event s4event) { - List<String> results = new ArrayList<String>(); - results.add(s4event.getKey()); - return results; - } - }; - - return keyFinder; - } - - - @Override - public ProcessingItem connectInputAllStream(Stream inputStream) { - - S4Stream stream = (S4Stream) inputStream; - stream.setParallelism(this.paralellismLevel); - stream.addStream(inputStream.getStreamId(), - getKeyFinder(), this, S4Stream.BROADCAST); - return this; - } - - - @Override - public ProcessingItem connectInputKeyStream(Stream inputStream) { - - S4Stream stream = (S4Stream) inputStream; - stream.setParallelism(this.paralellismLevel); - stream.addStream(inputStream.getStreamId(), - getKeyFinder(), this,S4Stream.GROUP_BY_KEY); - - return this; - } - - @Override - public ProcessingItem connectInputShuffleStream(Stream inputStream) { - S4Stream stream = (S4Stream) inputStream; - stream.setParallelism(this.paralellismLevel); - stream.addStream(inputStream.getStreamId(), - getKeyFinder(), this,S4Stream.SHUFFLE); - - return this; - } - - // Methods from ProcessingElement - @Override - protected void onCreate() { - logger.debug("PE ID {}", getId()); - if (this.processor != null) { - this.processor = this.processor.newProcessor(this.processor); - this.processor.onCreate(Integer.parseInt(getId())); - } - } - - @Override - protected void onRemove() { - // do nothing - } - - @Override - public int getParallelism() { - return this.paralellismLevel; - } + ProcessingItem { + + public static final Logger logger = LoggerFactory + .getLogger(S4ProcessingItem.class); + + private Processor processor; + private int paralellismLevel; + private S4DoTask app; + + private static final String NAME = "PROCESSING-ITEM-"; + private static int OBJ_COUNTER = 0; + + /** + * Constructor of S4 ProcessingItem. + * + * @param app + * : S4 application + */ + public S4ProcessingItem(App app) { + super(app); + super.setName(NAME + OBJ_COUNTER); + OBJ_COUNTER++; + this.app = (S4DoTask) app; + this.paralellismLevel = 1; + } + + @Override + public String getName() { + return super.getName(); + } + + /** + * Gets processing item paralellism level. + * + * @return int + */ + public int getParalellismLevel() { + return paralellismLevel; + } + + /** + * Sets processing item paralellism level. + * + * @param paralellismLevel + */ + public void setParalellismLevel(int paralellismLevel) { + this.paralellismLevel = paralellismLevel; + } + + /** + * onEvent method. + * + * @param event + */ + public void onEvent(S4Event event) { + if (processor.process(event.getContentEvent()) == true) { + close(); + } + } + + /** + * Sets S4 processing item processor. + * + * @param processor + */ + public void setProcessor(Processor processor) { + this.processor = processor; + } + + // Methods from ProcessingItem + @Override + public Processor getProcessor() { + return processor; + } + + /** + * KeyFinder sets the keys for a specific event. + * + * @return KeyFinder + */ + private KeyFinder<S4Event> getKeyFinder() { + KeyFinder<S4Event> keyFinder = new KeyFinder<S4Event>() { + @Override + public List<String> get(S4Event s4event) { + List<String> results = new ArrayList<String>(); + results.add(s4event.getKey()); + return results; + } + }; + + return keyFinder; + } + + @Override + public ProcessingItem connectInputAllStream(Stream inputStream) { + + S4Stream stream = (S4Stream) inputStream; + stream.setParallelism(this.paralellismLevel); + stream.addStream(inputStream.getStreamId(), + getKeyFinder(), this, S4Stream.BROADCAST); + return this; + } + + @Override + public ProcessingItem connectInputKeyStream(Stream inputStream) { + + S4Stream stream = (S4Stream) inputStream; + stream.setParallelism(this.paralellismLevel); + stream.addStream(inputStream.getStreamId(), + getKeyFinder(), this, S4Stream.GROUP_BY_KEY); + + return this; + } + + @Override + public ProcessingItem connectInputShuffleStream(Stream inputStream) { + S4Stream stream = (S4Stream) inputStream; + stream.setParallelism(this.paralellismLevel); + stream.addStream(inputStream.getStreamId(), + getKeyFinder(), this, S4Stream.SHUFFLE); + + return this; + } + + // Methods from ProcessingElement + @Override + protected void onCreate() { + logger.debug("PE ID {}", getId()); + if (this.processor != null) { + this.processor = this.processor.newProcessor(this.processor); + this.processor.onCreate(Integer.parseInt(getId())); + } + } + + @Override + protected void onRemove() { + // do nothing + } + + @Override + public int getParallelism() { + return this.paralellismLevel; + } }
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java index 78a3266..67a1385 100644 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java @@ -35,151 +35,151 @@ import com.yahoo.labs.samoa.topology.AbstractStream; * S4 Platform specific stream. * * @author severien - * + * */ public class S4Stream extends AbstractStream { - public static final int SHUFFLE = 0; - public static final int GROUP_BY_KEY = 1; - public static final int BROADCAST = 2; - - private static final Logger logger = LoggerFactory.getLogger(S4Stream.class); - - private S4DoTask app; - private int processingItemParalellism; - private int shuffleCounter; - - private static final String NAME = "STREAM-"; - private static int OBJ_COUNTER = 0; - - /* The stream list */ - public List<StreamType> streams; - - public S4Stream(S4DoTask app) { - super(); - this.app = app; - this.processingItemParalellism = 1; - this.shuffleCounter = 0; - this.streams = new ArrayList<StreamType>(); - this.setStreamId(NAME+OBJ_COUNTER); - OBJ_COUNTER++; - } - - public S4Stream(S4DoTask app, S4ProcessingItem pi) { - super(); - this.app = app; - this.processingItemParalellism = 1; - this.shuffleCounter = 0; - this.streams = new ArrayList<StreamType>(); - this.setStreamId(NAME+OBJ_COUNTER); - OBJ_COUNTER++; - - } - - /** - * - * @return - */ - public int getParallelism() { - return processingItemParalellism; - } - - public void setParallelism(int parallelism) { - this.processingItemParalellism = parallelism; - } - - public void addStream(String streamID, KeyFinder<S4Event> finder, - S4ProcessingItem pi, int type) { - String streamName = streamID +"_"+pi.getName(); - org.apache.s4.core.Stream<S4Event> stream = this.app.createStream( - streamName, pi); - stream.setName(streamName); - logger.debug("Stream name S4Stream {}", streamName); - if (finder != null) - stream.setKey(finder); - this.streams.add(new StreamType(stream, type)); - - } - - @Override - public void put(ContentEvent event) { - - for (int i = 0; i < streams.size(); i++) { - - switch (streams.get(i).getType()) { - case SHUFFLE: - S4Event s4event = new S4Event(event); - s4event.setStreamId(streams.get(i).getStream().getName()); - if(getParallelism() == 1) { - s4event.setKey("0"); - }else { - s4event.setKey(Integer.toString(shuffleCounter)); - } - streams.get(i).getStream().put(s4event); - shuffleCounter++; - if (shuffleCounter >= (getParallelism())) { - shuffleCounter = 0; - } - - break; - - case GROUP_BY_KEY: - S4Event s4event1 = new S4Event(event); - s4event1.setStreamId(streams.get(i).getStream().getName()); - HashCodeBuilder hb = new HashCodeBuilder(); - hb.append(event.getKey()); - String key = Integer.toString(hb.build() % getParallelism()); - s4event1.setKey(key); - streams.get(i).getStream().put(s4event1); - break; - - case BROADCAST: - for (int p = 0; p < this.getParallelism(); p++) { - S4Event s4event2 = new S4Event(event); - s4event2.setStreamId(streams.get(i).getStream().getName()); - s4event2.setKey(Integer.toString(p)); - streams.get(i).getStream().put(s4event2); - } - break; - - default: - break; - } - - - } - - } - - /** - * Subclass for definig stream connection type - * @author severien - * - */ - class StreamType { - org.apache.s4.core.Stream<S4Event> stream; - int type; - - public StreamType(org.apache.s4.core.Stream<S4Event> s, int t) { - this.stream = s; - this.type = t; - } - - public org.apache.s4.core.Stream<S4Event> getStream() { - return stream; - } - - public void setStream(org.apache.s4.core.Stream<S4Event> stream) { - this.stream = stream; - } - - public int getType() { - return type; - } - - public void setType(int type) { - this.type = type; - } - - } + public static final int SHUFFLE = 0; + public static final int GROUP_BY_KEY = 1; + public static final int BROADCAST = 2; + + private static final Logger logger = LoggerFactory.getLogger(S4Stream.class); + + private S4DoTask app; + private int processingItemParalellism; + private int shuffleCounter; + + private static final String NAME = "STREAM-"; + private static int OBJ_COUNTER = 0; + + /* The stream list */ + public List<StreamType> streams; + + public S4Stream(S4DoTask app) { + super(); + this.app = app; + this.processingItemParalellism = 1; + this.shuffleCounter = 0; + this.streams = new ArrayList<StreamType>(); + this.setStreamId(NAME + OBJ_COUNTER); + OBJ_COUNTER++; + } + + public S4Stream(S4DoTask app, S4ProcessingItem pi) { + super(); + this.app = app; + this.processingItemParalellism = 1; + this.shuffleCounter = 0; + this.streams = new ArrayList<StreamType>(); + this.setStreamId(NAME + OBJ_COUNTER); + OBJ_COUNTER++; + + } + + /** + * + * @return + */ + public int getParallelism() { + return processingItemParalellism; + } + + public void setParallelism(int parallelism) { + this.processingItemParalellism = parallelism; + } + + public void addStream(String streamID, KeyFinder<S4Event> finder, + S4ProcessingItem pi, int type) { + String streamName = streamID + "_" + pi.getName(); + org.apache.s4.core.Stream<S4Event> stream = this.app.createStream( + streamName, pi); + stream.setName(streamName); + logger.debug("Stream name S4Stream {}", streamName); + if (finder != null) + stream.setKey(finder); + this.streams.add(new StreamType(stream, type)); + + } + + @Override + public void put(ContentEvent event) { + + for (int i = 0; i < streams.size(); i++) { + + switch (streams.get(i).getType()) { + case SHUFFLE: + S4Event s4event = new S4Event(event); + s4event.setStreamId(streams.get(i).getStream().getName()); + if (getParallelism() == 1) { + s4event.setKey("0"); + } else { + s4event.setKey(Integer.toString(shuffleCounter)); + } + streams.get(i).getStream().put(s4event); + shuffleCounter++; + if (shuffleCounter >= (getParallelism())) { + shuffleCounter = 0; + } + + break; + + case GROUP_BY_KEY: + S4Event s4event1 = new S4Event(event); + s4event1.setStreamId(streams.get(i).getStream().getName()); + HashCodeBuilder hb = new HashCodeBuilder(); + hb.append(event.getKey()); + String key = Integer.toString(hb.build() % getParallelism()); + s4event1.setKey(key); + streams.get(i).getStream().put(s4event1); + break; + + case BROADCAST: + for (int p = 0; p < this.getParallelism(); p++) { + S4Event s4event2 = new S4Event(event); + s4event2.setStreamId(streams.get(i).getStream().getName()); + s4event2.setKey(Integer.toString(p)); + streams.get(i).getStream().put(s4event2); + } + break; + + default: + break; + } + + } + + } + + /** + * Subclass for definig stream connection type + * + * @author severien + * + */ + class StreamType { + org.apache.s4.core.Stream<S4Event> stream; + int type; + + public StreamType(org.apache.s4.core.Stream<S4Event> s, int t) { + this.stream = s; + this.type = t; + } + + public org.apache.s4.core.Stream<S4Event> getStream() { + return stream; + } + + public void setStream(org.apache.s4.core.Stream<S4Event> stream) { + this.stream = stream; + } + + public int getType() { + return type; + } + + public void setType(int type) { + this.type = type; + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java index c7ef92c..cf5a9b3 100644 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java @@ -44,103 +44,102 @@ import com.beust.jcommander.Parameters; public class S4Submitter implements ISubmitter { - private static Logger logger = LoggerFactory.getLogger(S4Submitter.class); - - @Override - public void deployTask(Task task) { - // TODO: Get application FROM HTTP server - // TODO: Initializa a http server to serve the app package - - String appURIString = null; -// File app = new File(System.getProperty("user.dir") -// + "/src/site/dist/SAMOA-S4-0.1-dist.jar"); - - // TODO: String app url http://localhost:8000/SAMOA-S4-0.1-dist.jar - try { - URL appURL = new URL("http://localhost:8000/SAMOA-S4-0.1.jar"); - appURIString = appURL.toString(); - } catch (MalformedURLException e1) { - e1.printStackTrace(); - } - -// try { -// appURIString = app.toURI().toURL().toString(); -// } catch (MalformedURLException e) { -// e.printStackTrace(); -// } - if (task == null) { - logger.error("Can't execute since evaluation task is not set!"); - return; - } else { - logger.info("Deploying SAMOA S4 task [{}] from location [{}]. ", - task.getClass().getSimpleName(), appURIString); - } - - String[] args = { "-c=testCluster2", - "-appClass=" + S4DoTask.class.getName(), - "-appName=" + "samoaApp", - "-p=evalTask=" + task.getClass().getSimpleName(), - "-zk=localhost:2181", "-s4r=" + appURIString , "-emc=" + SamoaSerializerModule.class.getName()}; - // "-emc=" + S4MOAModule.class.getName(), - // "@" + - // Resources.getResource("s4moa.properties").getFile(), - - S4Config s4config = new S4Config(); - JCommander jc = new JCommander(s4config); - jc.parse(args); - - Map<String, String> namedParameters = new HashMap<String, String>(); - for (String parameter : s4config.namedParameters) { - String[] param = parameter.split("="); - namedParameters.put(param[0], param[1]); - } - - AppConfig config = new AppConfig.Builder() - .appClassName(s4config.appClass).appName(s4config.appName) - .appURI(s4config.appURI).namedParameters(namedParameters) - .build(); - - DeploymentUtils.initAppConfig(config, s4config.clusterName, true, - s4config.zkString); - - System.out.println("Suposedly deployed on S4"); - } - - - public void initHTTPServer() { - - } - - @Parameters(separators = "=") - public static class S4Config { - - @Parameter(names = { "-c", "-cluster" }, description = "Cluster name", required = true) - String clusterName = null; - - @Parameter(names = "-appClass", description = "Main App class", required = false) - String appClass = null; - - @Parameter(names = "-appName", description = "Application name", required = false) - String appName = null; - - @Parameter(names = "-s4r", description = "Application URI", required = false) - String appURI = null; - - @Parameter(names = "-zk", description = "ZooKeeper connection string", required = false) - String zkString = null; - - @Parameter(names = { "-extraModulesClasses", "-emc" }, description = "Comma-separated list of additional configuration modules (they will be instantiated through their constructor without arguments).", required = false) - List<String> extraModules = new ArrayList<String>(); - - @Parameter(names = { "-p", "-namedStringParameters" }, description = "Comma-separated list of inline configuration " - + "parameters, taking precedence over homonymous configuration parameters from configuration files. " - + "Syntax: '-p=name1=value1,name2=value2 '", required = false, converter = ParsingUtils.InlineConfigParameterConverter.class) - List<String> namedParameters = new ArrayList<String>(); - - } - - @Override - public void setLocal(boolean bool) { - // TODO S4 works the same for local and distributed environments - } + private static Logger logger = LoggerFactory.getLogger(S4Submitter.class); + + @Override + public void deployTask(Task task) { + // TODO: Get application FROM HTTP server + // TODO: Initializa a http server to serve the app package + + String appURIString = null; + // File app = new File(System.getProperty("user.dir") + // + "/src/site/dist/SAMOA-S4-0.1-dist.jar"); + + // TODO: String app url http://localhost:8000/SAMOA-S4-0.1-dist.jar + try { + URL appURL = new URL("http://localhost:8000/SAMOA-S4-0.1.jar"); + appURIString = appURL.toString(); + } catch (MalformedURLException e1) { + e1.printStackTrace(); + } + + // try { + // appURIString = app.toURI().toURL().toString(); + // } catch (MalformedURLException e) { + // e.printStackTrace(); + // } + if (task == null) { + logger.error("Can't execute since evaluation task is not set!"); + return; + } else { + logger.info("Deploying SAMOA S4 task [{}] from location [{}]. ", + task.getClass().getSimpleName(), appURIString); + } + + String[] args = { "-c=testCluster2", + "-appClass=" + S4DoTask.class.getName(), + "-appName=" + "samoaApp", + "-p=evalTask=" + task.getClass().getSimpleName(), + "-zk=localhost:2181", "-s4r=" + appURIString, "-emc=" + SamoaSerializerModule.class.getName() }; + // "-emc=" + S4MOAModule.class.getName(), + // "@" + + // Resources.getResource("s4moa.properties").getFile(), + + S4Config s4config = new S4Config(); + JCommander jc = new JCommander(s4config); + jc.parse(args); + + Map<String, String> namedParameters = new HashMap<String, String>(); + for (String parameter : s4config.namedParameters) { + String[] param = parameter.split("="); + namedParameters.put(param[0], param[1]); + } + + AppConfig config = new AppConfig.Builder() + .appClassName(s4config.appClass).appName(s4config.appName) + .appURI(s4config.appURI).namedParameters(namedParameters) + .build(); + + DeploymentUtils.initAppConfig(config, s4config.clusterName, true, + s4config.zkString); + + System.out.println("Suposedly deployed on S4"); + } + + public void initHTTPServer() { + + } + + @Parameters(separators = "=") + public static class S4Config { + + @Parameter(names = { "-c", "-cluster" }, description = "Cluster name", required = true) + String clusterName = null; + + @Parameter(names = "-appClass", description = "Main App class", required = false) + String appClass = null; + + @Parameter(names = "-appName", description = "Application name", required = false) + String appName = null; + + @Parameter(names = "-s4r", description = "Application URI", required = false) + String appURI = null; + + @Parameter(names = "-zk", description = "ZooKeeper connection string", required = false) + String zkString = null; + + @Parameter(names = { "-extraModulesClasses", "-emc" }, description = "Comma-separated list of additional configuration modules (they will be instantiated through their constructor without arguments).", required = false) + List<String> extraModules = new ArrayList<String>(); + + @Parameter(names = { "-p", "-namedStringParameters" }, description = "Comma-separated list of inline configuration " + + "parameters, taking precedence over homonymous configuration parameters from configuration files. " + + "Syntax: '-p=name1=value1,name2=value2 '", required = false, converter = ParsingUtils.InlineConfigParameterConverter.class) + List<String> namedParameters = new ArrayList<String>(); + + } + + @Override + public void setLocal(boolean bool) { + // TODO S4 works the same for local and distributed environments + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java index 6bef0e8..2f7661d 100644 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java @@ -24,38 +24,40 @@ import com.yahoo.labs.samoa.topology.EntranceProcessingItem; import com.yahoo.labs.samoa.topology.AbstractTopology; public class S4Topology extends AbstractTopology { - - // CASEY: it seems evaluationTask is not used. - // Remove it for now - -// private String _evaluationTask; - -// S4Topology(String topoName, String evalTask) { -// super(topoName); -// } -// -// S4Topology(String topoName) { -// this(topoName, null); -// } - -// @Override -// public void setEvaluationTask(String evalTask) { -// _evaluationTask = evalTask; -// } -// -// @Override -// public String getEvaluationTask() { -// return _evaluationTask; -// } - - S4Topology(String topoName) { - super(topoName); - } - - protected EntranceProcessingItem getEntranceProcessingItem() { - if (this.getEntranceProcessingItems() == null) return null; - if (this.getEntranceProcessingItems().size() < 1) return null; - // TODO: support multiple entrance PIs - return (EntranceProcessingItem)this.getEntranceProcessingItems().toArray()[0]; - } + + // CASEY: it seems evaluationTask is not used. + // Remove it for now + + // private String _evaluationTask; + + // S4Topology(String topoName, String evalTask) { + // super(topoName); + // } + // + // S4Topology(String topoName) { + // this(topoName, null); + // } + + // @Override + // public void setEvaluationTask(String evalTask) { + // _evaluationTask = evalTask; + // } + // + // @Override + // public String getEvaluationTask() { + // return _evaluationTask; + // } + + S4Topology(String topoName) { + super(topoName); + } + + protected EntranceProcessingItem getEntranceProcessingItem() { + if (this.getEntranceProcessingItems() == null) + return null; + if (this.getEntranceProcessingItems().size() < 1) + return null; + // TODO: support multiple entrance PIs + return (EntranceProcessingItem) this.getEntranceProcessingItems().toArray()[0]; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java index 4ae2296..61648e6 100644 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java @@ -32,68 +32,69 @@ import com.google.inject.assistedinject.Assisted; import com.yahoo.labs.samoa.learners.classifiers.trees.AttributeContentEvent; import com.yahoo.labs.samoa.learners.classifiers.trees.ComputeContentEvent; -public class SamoaSerializer implements SerializerDeserializer{ +public class SamoaSerializer implements SerializerDeserializer { - private ThreadLocal<Kryo> kryoThreadLocal; - private ThreadLocal<Output> outputThreadLocal; + private ThreadLocal<Kryo> kryoThreadLocal; + private ThreadLocal<Output> outputThreadLocal; - private int initialBufferSize = 2048; - private int maxBufferSize = 256 * 1024; + private int initialBufferSize = 2048; + private int maxBufferSize = 256 * 1024; - public void setMaxBufferSize(int maxBufferSize) { - this.maxBufferSize = maxBufferSize; - } + public void setMaxBufferSize(int maxBufferSize) { + this.maxBufferSize = maxBufferSize; + } - /** - * - * @param classLoader - * classloader able to handle classes to serialize/deserialize. For instance, application-level events - * can only be handled by the application classloader. - */ - @Inject - public SamoaSerializer(@Assisted final ClassLoader classLoader) { - kryoThreadLocal = new ThreadLocal<Kryo>() { + /** + * + * @param classLoader + * classloader able to handle classes to serialize/deserialize. For + * instance, application-level events can only be handled by the + * application classloader. + */ + @Inject + public SamoaSerializer(@Assisted final ClassLoader classLoader) { + kryoThreadLocal = new ThreadLocal<Kryo>() { - @Override - protected Kryo initialValue() { - Kryo kryo = new Kryo(); - kryo.setClassLoader(classLoader); - kryo.register(AttributeContentEvent.class, new AttributeContentEvent.AttributeCEFullPrecSerializer()); - kryo.register(ComputeContentEvent.class, new ComputeContentEvent.ComputeCEFullPrecSerializer()); - kryo.setRegistrationRequired(false); - return kryo; - } - }; + @Override + protected Kryo initialValue() { + Kryo kryo = new Kryo(); + kryo.setClassLoader(classLoader); + kryo.register(AttributeContentEvent.class, new AttributeContentEvent.AttributeCEFullPrecSerializer()); + kryo.register(ComputeContentEvent.class, new ComputeContentEvent.ComputeCEFullPrecSerializer()); + kryo.setRegistrationRequired(false); + return kryo; + } + }; - outputThreadLocal = new ThreadLocal<Output>() { - @Override - protected Output initialValue() { - Output output = new Output(initialBufferSize, maxBufferSize); - return output; - } - }; + outputThreadLocal = new ThreadLocal<Output>() { + @Override + protected Output initialValue() { + Output output = new Output(initialBufferSize, maxBufferSize); + return output; + } + }; - } + } - @Override - public Object deserialize(ByteBuffer rawMessage) { - Input input = new Input(rawMessage.array()); - try { - return kryoThreadLocal.get().readClassAndObject(input); - } finally { - input.close(); - } + @Override + public Object deserialize(ByteBuffer rawMessage) { + Input input = new Input(rawMessage.array()); + try { + return kryoThreadLocal.get().readClassAndObject(input); + } finally { + input.close(); } + } - @SuppressWarnings("resource") - @Override - public ByteBuffer serialize(Object message) { - Output output = outputThreadLocal.get(); - try { - kryoThreadLocal.get().writeClassAndObject(output, message); - return ByteBuffer.wrap(output.toBytes()); - } finally { - output.clear(); - } + @SuppressWarnings("resource") + @Override + public ByteBuffer serialize(Object message) { + Output output = outputThreadLocal.get(); + try { + kryoThreadLocal.get().writeClassAndObject(output, message); + return ByteBuffer.wrap(output.toBytes()); + } finally { + output.clear(); } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java index 311e449..a367eb5 100644 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java @@ -26,10 +26,10 @@ import com.google.inject.AbstractModule; public class SamoaSerializerModule extends AbstractModule { - @Override - protected void configure() { - bind(SerializerDeserializer.class).to(SamoaSerializer.class); - - } + @Override + protected void configure() { + bind(SerializerDeserializer.class).to(SamoaSerializer.class); + + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java index 45dd901..6c6103c 100644 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java @@ -44,184 +44,183 @@ import com.yahoo.labs.samoa.utils.SystemsUtils; */ public class SamzaDoTask { - private static final Logger logger = LoggerFactory.getLogger(SamzaDoTask.class); - - private static final String LOCAL_MODE = "local"; - private static final String YARN_MODE = "yarn"; - - // FLAGS - private static final String YARN_CONF_FLAG = "--yarn_home"; - private static final String MODE_FLAG = "--mode"; - private static final String ZK_FLAG = "--zookeeper"; - private static final String KAFKA_FLAG = "--kafka"; - private static final String KAFKA_REPLICATION_FLAG = "--kafka_replication_factor"; - private static final String CHECKPOINT_FREQ_FLAG = "--checkpoint_frequency"; - private static final String JAR_PACKAGE_FLAG = "--jar_package"; - private static final String SAMOA_HDFS_DIR_FLAG = "--samoa_hdfs_dir"; - private static final String AM_MEMORY_FLAG = "--yarn_am_mem"; - private static final String CONTAINER_MEMORY_FLAG = "--yarn_container_mem"; - private static final String PI_PER_CONTAINER_FLAG = "--pi_per_container"; - - private static final String KRYO_REGISTER_FLAG = "--kryo_register"; - - // config values - private static int kafkaReplicationFactor = 1; - private static int checkpointFrequency = 60000; - private static String kafka = "localhost:9092"; - private static String zookeeper = "localhost:2181"; - private static boolean isLocal = true; - private static String yarnConfHome = null; - private static String samoaHDFSDir = null; - private static String jarPackagePath = null; - private static int amMem = 1024; - private static int containerMem = 1024; - private static int piPerContainer = 2; - private static String kryoRegisterFile = null; - - /* - * 1. Read arguments - * 2. Construct topology/task - * 3. Upload the JAR to HDFS if we are running on YARN - * 4. Submit topology to SamzaEngine - */ - public static void main(String[] args) { - // Read arguments - List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); - parseArguments(tmpArgs); - - args = tmpArgs.toArray(new String[0]); - - // Init Task - StringBuilder cliString = new StringBuilder(); - for (int i = 0; i < args.length; i++) { - cliString.append(" ").append(args[i]); + private static final Logger logger = LoggerFactory.getLogger(SamzaDoTask.class); + + private static final String LOCAL_MODE = "local"; + private static final String YARN_MODE = "yarn"; + + // FLAGS + private static final String YARN_CONF_FLAG = "--yarn_home"; + private static final String MODE_FLAG = "--mode"; + private static final String ZK_FLAG = "--zookeeper"; + private static final String KAFKA_FLAG = "--kafka"; + private static final String KAFKA_REPLICATION_FLAG = "--kafka_replication_factor"; + private static final String CHECKPOINT_FREQ_FLAG = "--checkpoint_frequency"; + private static final String JAR_PACKAGE_FLAG = "--jar_package"; + private static final String SAMOA_HDFS_DIR_FLAG = "--samoa_hdfs_dir"; + private static final String AM_MEMORY_FLAG = "--yarn_am_mem"; + private static final String CONTAINER_MEMORY_FLAG = "--yarn_container_mem"; + private static final String PI_PER_CONTAINER_FLAG = "--pi_per_container"; + + private static final String KRYO_REGISTER_FLAG = "--kryo_register"; + + // config values + private static int kafkaReplicationFactor = 1; + private static int checkpointFrequency = 60000; + private static String kafka = "localhost:9092"; + private static String zookeeper = "localhost:2181"; + private static boolean isLocal = true; + private static String yarnConfHome = null; + private static String samoaHDFSDir = null; + private static String jarPackagePath = null; + private static int amMem = 1024; + private static int containerMem = 1024; + private static int piPerContainer = 2; + private static String kryoRegisterFile = null; + + /* + * 1. Read arguments 2. Construct topology/task 3. Upload the JAR to HDFS if + * we are running on YARN 4. Submit topology to SamzaEngine + */ + public static void main(String[] args) { + // Read arguments + List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + parseArguments(tmpArgs); + + args = tmpArgs.toArray(new String[0]); + + // Init Task + StringBuilder cliString = new StringBuilder(); + for (int i = 0; i < args.length; i++) { + cliString.append(" ").append(args[i]); + } + logger.debug("Command line string = {}", cliString.toString()); + System.out.println("Command line string = " + cliString.toString()); + + Task task = null; + try { + task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, null); + logger.info("Sucessfully instantiating {}", task.getClass().getCanonicalName()); + } catch (Exception e) { + logger.error("Fail to initialize the task", e); + System.out.println("Fail to initialize the task" + e); + return; + } + task.setFactory(new SamzaComponentFactory()); + task.init(); + + // Upload JAR file to HDFS + String hdfsPath = null; + if (!isLocal) { + Path path = FileSystems.getDefault().getPath(jarPackagePath); + hdfsPath = uploadJarToHDFS(path.toFile()); + if (hdfsPath == null) { + System.out.println("Fail uploading JAR file \"" + path.toAbsolutePath().toString() + "\" to HDFS."); + return; + } + } + + // Set parameters + SamzaEngine.getEngine() + .setLocalMode(isLocal) + .setZooKeeper(zookeeper) + .setKafka(kafka) + .setYarnPackage(hdfsPath) + .setKafkaReplicationFactor(kafkaReplicationFactor) + .setConfigHome(yarnConfHome) + .setAMMemory(amMem) + .setContainerMemory(containerMem) + .setPiPerContainerRatio(piPerContainer) + .setKryoRegisterFile(kryoRegisterFile) + .setCheckpointFrequency(checkpointFrequency); + + // Submit topology + SamzaEngine.submitTopology((SamzaTopology) task.getTopology()); + + } + + private static boolean isLocalMode(String mode) { + return mode.equals(LOCAL_MODE); + } + + private static void parseArguments(List<String> args) { + for (int i = args.size() - 1; i >= 0; i--) { + String arg = args.get(i).trim(); + String[] splitted = arg.split("=", 2); + + if (splitted.length >= 2) { + // YARN config folder which contains conf/core-site.xml, + // conf/hdfs-site.xml, conf/yarn-site.xml + if (splitted[0].equals(YARN_CONF_FLAG)) { + yarnConfHome = splitted[1]; + args.remove(i); + } + // host:port for zookeeper cluster + else if (splitted[0].equals(ZK_FLAG)) { + zookeeper = splitted[1]; + args.remove(i); + } + // host:port,... for kafka broker(s) + else if (splitted[0].equals(KAFKA_FLAG)) { + kafka = splitted[1]; + args.remove(i); + } + // whether we are running Samza in Local mode or YARN mode + else if (splitted[0].equals(MODE_FLAG)) { + isLocal = isLocalMode(splitted[1]); + args.remove(i); + } + // memory requirement for YARN application master + else if (splitted[0].equals(AM_MEMORY_FLAG)) { + amMem = Integer.parseInt(splitted[1]); + args.remove(i); + } + // memory requirement for YARN worker container + else if (splitted[0].equals(CONTAINER_MEMORY_FLAG)) { + containerMem = Integer.parseInt(splitted[1]); + args.remove(i); } - logger.debug("Command line string = {}", cliString.toString()); - System.out.println("Command line string = " + cliString.toString()); - - Task task = null; - try { - task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, null); - logger.info("Sucessfully instantiating {}", task.getClass().getCanonicalName()); - } catch (Exception e) { - logger.error("Fail to initialize the task", e); - System.out.println("Fail to initialize the task" + e); - return; + // the path to JAR archive that we need to upload to HDFS + else if (splitted[0].equals(JAR_PACKAGE_FLAG)) { + jarPackagePath = splitted[1]; + args.remove(i); } - task.setFactory(new SamzaComponentFactory()); - task.init(); - - // Upload JAR file to HDFS - String hdfsPath = null; - if (!isLocal) { - Path path = FileSystems.getDefault().getPath(jarPackagePath); - hdfsPath = uploadJarToHDFS(path.toFile()); - if(hdfsPath == null) { - System.out.println("Fail uploading JAR file \""+path.toAbsolutePath().toString()+"\" to HDFS."); - return; - } - } - - // Set parameters - SamzaEngine.getEngine() - .setLocalMode(isLocal) - .setZooKeeper(zookeeper) - .setKafka(kafka) - .setYarnPackage(hdfsPath) - .setKafkaReplicationFactor(kafkaReplicationFactor) - .setConfigHome(yarnConfHome) - .setAMMemory(amMem) - .setContainerMemory(containerMem) - .setPiPerContainerRatio(piPerContainer) - .setKryoRegisterFile(kryoRegisterFile) - .setCheckpointFrequency(checkpointFrequency); - - // Submit topology - SamzaEngine.submitTopology((SamzaTopology)task.getTopology()); - - } - - private static boolean isLocalMode(String mode) { - return mode.equals(LOCAL_MODE); - } - - private static void parseArguments(List<String> args) { - for (int i=args.size()-1; i>=0; i--) { - String arg = args.get(i).trim(); - String[] splitted = arg.split("=",2); - - if (splitted.length >= 2) { - // YARN config folder which contains conf/core-site.xml, - // conf/hdfs-site.xml, conf/yarn-site.xml - if (splitted[0].equals(YARN_CONF_FLAG)) { - yarnConfHome = splitted[1]; - args.remove(i); - } - // host:port for zookeeper cluster - else if (splitted[0].equals(ZK_FLAG)) { - zookeeper = splitted[1]; - args.remove(i); - } - // host:port,... for kafka broker(s) - else if (splitted[0].equals(KAFKA_FLAG)) { - kafka = splitted[1]; - args.remove(i); - } - // whether we are running Samza in Local mode or YARN mode - else if (splitted[0].equals(MODE_FLAG)) { - isLocal = isLocalMode(splitted[1]); - args.remove(i); - } - // memory requirement for YARN application master - else if (splitted[0].equals(AM_MEMORY_FLAG)) { - amMem = Integer.parseInt(splitted[1]); - args.remove(i); - } - // memory requirement for YARN worker container - else if (splitted[0].equals(CONTAINER_MEMORY_FLAG)) { - containerMem = Integer.parseInt(splitted[1]); - args.remove(i); - } - // the path to JAR archive that we need to upload to HDFS - else if (splitted[0].equals(JAR_PACKAGE_FLAG)) { - jarPackagePath = splitted[1]; - args.remove(i); - } - // the HDFS dir for SAMOA files - else if (splitted[0].equals(SAMOA_HDFS_DIR_FLAG)) { - samoaHDFSDir = splitted[1]; - if (samoaHDFSDir.length() < 1) samoaHDFSDir = null; - args.remove(i); - } - // number of max PI instances per container - // this will be used to compute the number of containers - // AM will request for the job - else if (splitted[0].equals(PI_PER_CONTAINER_FLAG)) { - piPerContainer = Integer.parseInt(splitted[1]); - args.remove(i); - } - // kafka streams replication factor - else if (splitted[0].equals(KAFKA_REPLICATION_FLAG)) { - kafkaReplicationFactor = Integer.parseInt(splitted[1]); - args.remove(i); - } - // checkpoint frequency in ms - else if (splitted[0].equals(CHECKPOINT_FREQ_FLAG)) { - checkpointFrequency = Integer.parseInt(splitted[1]); - args.remove(i); - } - // the file contains registration information for Kryo serializer - else if (splitted[0].equals(KRYO_REGISTER_FLAG)) { - kryoRegisterFile = splitted[1]; - args.remove(i); - } - } - } - } - - private static String uploadJarToHDFS(File file) { - SystemsUtils.setHadoopConfigHome(yarnConfHome); - SystemsUtils.setSAMOADir(samoaHDFSDir); - return SystemsUtils.copyToHDFS(file, file.getName()); - } + // the HDFS dir for SAMOA files + else if (splitted[0].equals(SAMOA_HDFS_DIR_FLAG)) { + samoaHDFSDir = splitted[1]; + if (samoaHDFSDir.length() < 1) + samoaHDFSDir = null; + args.remove(i); + } + // number of max PI instances per container + // this will be used to compute the number of containers + // AM will request for the job + else if (splitted[0].equals(PI_PER_CONTAINER_FLAG)) { + piPerContainer = Integer.parseInt(splitted[1]); + args.remove(i); + } + // kafka streams replication factor + else if (splitted[0].equals(KAFKA_REPLICATION_FLAG)) { + kafkaReplicationFactor = Integer.parseInt(splitted[1]); + args.remove(i); + } + // checkpoint frequency in ms + else if (splitted[0].equals(CHECKPOINT_FREQ_FLAG)) { + checkpointFrequency = Integer.parseInt(splitted[1]); + args.remove(i); + } + // the file contains registration information for Kryo serializer + else if (splitted[0].equals(KRYO_REGISTER_FLAG)) { + kryoRegisterFile = splitted[1]; + args.remove(i); + } + } + } + } + + private static String uploadJarToHDFS(File file) { + SystemsUtils.setHadoopConfigHome(yarnConfHome); + SystemsUtils.setSAMOADir(samoaHDFSDir); + return SystemsUtils.copyToHDFS(file, file.getName()); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java index 362e0a5..1a4b57f 100644 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java @@ -32,26 +32,25 @@ import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin; import com.yahoo.labs.samoa.topology.impl.SamzaEntranceProcessingItem.SamoaSystemConsumer; /** - * Implementation of Samza's SystemFactory - * Samza will use this factory to get our custom consumer - * which gets the events from SAMOA EntranceProcessor - * and feed them to EntranceProcessingItem task + * Implementation of Samza's SystemFactory Samza will use this factory to get + * our custom consumer which gets the events from SAMOA EntranceProcessor and + * feed them to EntranceProcessingItem task * * @author Anh Thu Vu */ public class SamoaSystemFactory implements SystemFactory { - @Override - public SystemAdmin getAdmin(String systemName, Config config) { - return new SinglePartitionWithoutOffsetsSystemAdmin(); - } + @Override + public SystemAdmin getAdmin(String systemName, Config config) { + return new SinglePartitionWithoutOffsetsSystemAdmin(); + } - @Override - public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { - return new SamoaSystemConsumer(systemName, config); - } + @Override + public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { + return new SamoaSystemConsumer(systemName, config); + } - @Override - public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { - throw new SamzaException("This implementation is not supposed to produce anything."); - } + @Override + public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { + throw new SamzaException("This implementation is not supposed to produce anything."); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java index d71d97b..278b1f2 100644 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java @@ -35,28 +35,28 @@ import com.yahoo.labs.samoa.topology.Topology; * @author Anh Thu Vu */ public class SamzaComponentFactory implements ComponentFactory { - @Override - public ProcessingItem createPi(Processor processor) { - return this.createPi(processor, 1); - } + @Override + public ProcessingItem createPi(Processor processor) { + return this.createPi(processor, 1); + } - @Override - public ProcessingItem createPi(Processor processor, int parallelism) { - return new SamzaProcessingItem(processor, parallelism); - } + @Override + public ProcessingItem createPi(Processor processor, int parallelism) { + return new SamzaProcessingItem(processor, parallelism); + } - @Override - public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { - return new SamzaEntranceProcessingItem(entranceProcessor); - } - - @Override - public Stream createStream(IProcessingItem sourcePi) { - return new SamzaStream(sourcePi); - } - - @Override - public Topology createTopology(String topoName) { - return new SamzaTopology(topoName); - } + @Override + public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { + return new SamzaEntranceProcessingItem(entranceProcessor); + } + + @Override + public Stream createStream(IProcessingItem sourcePi) { + return new SamzaStream(sourcePi); + } + + @Override + public Topology createTopology(String topoName) { + return new SamzaTopology(topoName); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java index 7339443..e3141f8 100644 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java @@ -35,163 +35,162 @@ import com.yahoo.labs.samoa.utils.SamzaConfigFactory; import com.yahoo.labs.samoa.utils.SystemsUtils; /** - * This class will submit a list of Samza jobs with - * the Configs generated from the input topology + * This class will submit a list of Samza jobs with the Configs generated from + * the input topology * * @author Anh Thu Vu - * + * */ public class SamzaEngine { - - private static final Logger logger = LoggerFactory.getLogger(SamzaEngine.class); - - /* - * Singleton instance - */ - private static SamzaEngine engine = new SamzaEngine(); - - private String zookeeper; - private String kafka; - private int kafkaReplicationFactor; - private boolean isLocalMode; - private String yarnPackagePath; - private String yarnConfHome; - - private String kryoRegisterFile; - - private int amMem; - private int containerMem; - private int piPerContainerRatio; - - private int checkpointFrequency; - - private void _submitTopology(SamzaTopology topology) { - - // Setup SamzaConfigFactory - SamzaConfigFactory configFactory = new SamzaConfigFactory(); - configFactory.setLocalMode(isLocalMode) - .setZookeeper(zookeeper) - .setKafka(kafka) - .setYarnPackage(yarnPackagePath) - .setAMMemory(amMem) - .setContainerMemory(containerMem) - .setPiPerContainerRatio(piPerContainerRatio) - .setKryoRegisterFile(kryoRegisterFile) - .setCheckpointFrequency(checkpointFrequency) - .setReplicationFactor(kafkaReplicationFactor); - - // Generate the list of Configs - List<MapConfig> configs; - try { - // ConfigFactory generate a list of configs - // Serialize a map of PIs and store in a file in the jar at jarFilePath - // (in dat/ folder) - configs = configFactory.getMapConfigsForTopology(topology); - } catch (Exception e) { - e.printStackTrace(); - return; - } - - // Create kafka streams - Set<Stream> streams = topology.getStreams(); - for (Stream stream:streams) { - SamzaStream samzaStream = (SamzaStream) stream; - List<SamzaSystemStream> systemStreams = samzaStream.getSystemStreams(); - for (SamzaSystemStream systemStream:systemStreams) { - // all streams should be kafka streams - SystemsUtils.createKafkaTopic(systemStream.getStream(),systemStream.getParallelism(),kafkaReplicationFactor); - } - } - - // Submit the jobs with those configs - for (MapConfig config:configs) { - logger.info("Config:{}",config); - JobRunner jobRunner = new JobRunner(config); - jobRunner.run(); - } - } - - private void _setupSystemsUtils() { - // Setup Utils - if (!isLocalMode) - SystemsUtils.setHadoopConfigHome(yarnConfHome); - SystemsUtils.setZookeeper(zookeeper); - } - - /* - * Setter methods - */ - public static SamzaEngine getEngine() { - return engine; - } - - public SamzaEngine setZooKeeper(String zk) { - this.zookeeper = zk; - return this; - } - - public SamzaEngine setKafka(String kafka) { - this.kafka = kafka; - return this; - } - - public SamzaEngine setKafkaReplicationFactor(int replicationFactor) { - this.kafkaReplicationFactor = replicationFactor; - return this; - } - - public SamzaEngine setCheckpointFrequency(int freq) { - this.checkpointFrequency = freq; - return this; - } - - public SamzaEngine setLocalMode(boolean isLocal) { - this.isLocalMode = isLocal; - return this; - } - - public SamzaEngine setYarnPackage(String yarnPackagePath) { - this.yarnPackagePath = yarnPackagePath; - return this; - } - - public SamzaEngine setConfigHome(String configHome) { - this.yarnConfHome = configHome; - return this; - } - - public SamzaEngine setAMMemory(int mem) { - this.amMem = mem; - return this; - } - - public SamzaEngine setContainerMemory(int mem) { - this.containerMem = mem; - return this; - } - - public SamzaEngine setPiPerContainerRatio(int piPerContainer) { - this.piPerContainerRatio = piPerContainer; - return this; - } - - public SamzaEngine setKryoRegisterFile(String registerFile) { - this.kryoRegisterFile = registerFile; - return this; - } - - /** - * Submit a list of Samza jobs correspond to the submitted - * topology - * - * @param topo - * the submitted topology - */ - public static void submitTopology(SamzaTopology topo) { - // Setup SystemsUtils - engine._setupSystemsUtils(); - - // Submit topology - engine._submitTopology(topo); - } + + private static final Logger logger = LoggerFactory.getLogger(SamzaEngine.class); + + /* + * Singleton instance + */ + private static SamzaEngine engine = new SamzaEngine(); + + private String zookeeper; + private String kafka; + private int kafkaReplicationFactor; + private boolean isLocalMode; + private String yarnPackagePath; + private String yarnConfHome; + + private String kryoRegisterFile; + + private int amMem; + private int containerMem; + private int piPerContainerRatio; + + private int checkpointFrequency; + + private void _submitTopology(SamzaTopology topology) { + + // Setup SamzaConfigFactory + SamzaConfigFactory configFactory = new SamzaConfigFactory(); + configFactory.setLocalMode(isLocalMode) + .setZookeeper(zookeeper) + .setKafka(kafka) + .setYarnPackage(yarnPackagePath) + .setAMMemory(amMem) + .setContainerMemory(containerMem) + .setPiPerContainerRatio(piPerContainerRatio) + .setKryoRegisterFile(kryoRegisterFile) + .setCheckpointFrequency(checkpointFrequency) + .setReplicationFactor(kafkaReplicationFactor); + + // Generate the list of Configs + List<MapConfig> configs; + try { + // ConfigFactory generate a list of configs + // Serialize a map of PIs and store in a file in the jar at jarFilePath + // (in dat/ folder) + configs = configFactory.getMapConfigsForTopology(topology); + } catch (Exception e) { + e.printStackTrace(); + return; + } + + // Create kafka streams + Set<Stream> streams = topology.getStreams(); + for (Stream stream : streams) { + SamzaStream samzaStream = (SamzaStream) stream; + List<SamzaSystemStream> systemStreams = samzaStream.getSystemStreams(); + for (SamzaSystemStream systemStream : systemStreams) { + // all streams should be kafka streams + SystemsUtils.createKafkaTopic(systemStream.getStream(), systemStream.getParallelism(), kafkaReplicationFactor); + } + } + + // Submit the jobs with those configs + for (MapConfig config : configs) { + logger.info("Config:{}", config); + JobRunner jobRunner = new JobRunner(config); + jobRunner.run(); + } + } + + private void _setupSystemsUtils() { + // Setup Utils + if (!isLocalMode) + SystemsUtils.setHadoopConfigHome(yarnConfHome); + SystemsUtils.setZookeeper(zookeeper); + } + + /* + * Setter methods + */ + public static SamzaEngine getEngine() { + return engine; + } + + public SamzaEngine setZooKeeper(String zk) { + this.zookeeper = zk; + return this; + } + + public SamzaEngine setKafka(String kafka) { + this.kafka = kafka; + return this; + } + + public SamzaEngine setKafkaReplicationFactor(int replicationFactor) { + this.kafkaReplicationFactor = replicationFactor; + return this; + } + + public SamzaEngine setCheckpointFrequency(int freq) { + this.checkpointFrequency = freq; + return this; + } + + public SamzaEngine setLocalMode(boolean isLocal) { + this.isLocalMode = isLocal; + return this; + } + + public SamzaEngine setYarnPackage(String yarnPackagePath) { + this.yarnPackagePath = yarnPackagePath; + return this; + } + + public SamzaEngine setConfigHome(String configHome) { + this.yarnConfHome = configHome; + return this; + } + + public SamzaEngine setAMMemory(int mem) { + this.amMem = mem; + return this; + } + + public SamzaEngine setContainerMemory(int mem) { + this.containerMem = mem; + return this; + } + + public SamzaEngine setPiPerContainerRatio(int piPerContainer) { + this.piPerContainerRatio = piPerContainer; + return this; + } + + public SamzaEngine setKryoRegisterFile(String registerFile) { + this.kryoRegisterFile = registerFile; + return this; + } + + /** + * Submit a list of Samza jobs correspond to the submitted topology + * + * @param topo + * the submitted topology + */ + public static void submitTopology(SamzaTopology topo) { + // Setup SystemsUtils + engine._setupSystemsUtils(); + + // Submit topology + engine._submitTopology(topo); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java index e89d789..6eea7cb 100644 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java @@ -44,179 +44,196 @@ import com.yahoo.labs.samoa.utils.SamzaConfigFactory; import com.yahoo.labs.samoa.utils.SystemsUtils; /** - * EntranceProcessingItem for Samza - * which is also a Samza task (StreamTask & InitableTask) + * EntranceProcessingItem for Samza which is also a Samza task (StreamTask & + * InitableTask) * * @author Anh Thu Vu - * + * */ public class SamzaEntranceProcessingItem extends AbstractEntranceProcessingItem - implements SamzaProcessingNode, Serializable, StreamTask, InitableTask { + implements SamzaProcessingNode, Serializable, StreamTask, InitableTask { - /** + /** * */ - private static final long serialVersionUID = 7157734520046135039L; - - /* - * Constructors - */ - public SamzaEntranceProcessingItem(EntranceProcessor processor) { - super(processor); - } - - // Need this so Samza can initialize a StreamTask - public SamzaEntranceProcessingItem() {} - - /* - * Simple setters, getters - */ - @Override - public int addOutputStream(SamzaStream stream) { - this.setOutputStream(stream); - return 1; // entrance PI should have only 1 output stream - } - - /* - * Serialization - */ - private Object writeReplace() { - return new SerializationProxy(this); - } - - private static class SerializationProxy implements Serializable { - /** + private static final long serialVersionUID = 7157734520046135039L; + + /* + * Constructors + */ + public SamzaEntranceProcessingItem(EntranceProcessor processor) { + super(processor); + } + + // Need this so Samza can initialize a StreamTask + public SamzaEntranceProcessingItem() { + } + + /* + * Simple setters, getters + */ + @Override + public int addOutputStream(SamzaStream stream) { + this.setOutputStream(stream); + return 1; // entrance PI should have only 1 output stream + } + + /* + * Serialization + */ + private Object writeReplace() { + return new SerializationProxy(this); + } + + private static class SerializationProxy implements Serializable { + /** * */ - private static final long serialVersionUID = 313907132721414634L; - - private EntranceProcessor processor; - private SamzaStream outputStream; - private String name; - - public SerializationProxy(SamzaEntranceProcessingItem epi) { - this.processor = epi.getProcessor(); - this.outputStream = (SamzaStream)epi.getOutputStream(); - this.name = epi.getName(); - } - } - - /* - * Implement Samza Task - */ - @Override - public void init(Config config, TaskContext context) throws Exception { - String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY); - if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property is set , otherwise, assume we are running in - // local mode and ignore this - SystemsUtils.setHadoopConfigHome(yarnConfHome); - - String filename = config.get(SamzaConfigFactory.FILE_KEY); - String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY); - - this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY)); - SerializationProxy wrapper = (SerializationProxy) SystemsUtils.deserializeObjectFromFileAndKey(filesystem, filename, this.getName()); - this.setOutputStream(wrapper.outputStream); - SamzaStream output = (SamzaStream)this.getOutputStream(); - if (output != null) // if output stream exists, set it up - output.onCreate(); - } - - @Override - public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { - SamzaStream output = (SamzaStream)this.getOutputStream(); - if (output == null) return; // if there is no output stream, do nothing - output.setCollector(collector); - ContentEvent event = (ContentEvent) envelope.getMessage(); - output.put(event); - } - - /* - * Implementation of Samza's SystemConsumer to get events from source - * and feed to SAMOA system - * - */ - /* Current implementation: buffer the incoming events and send a batch - * of them when poll() is called by Samza system. - * - * Currently: it has a "soft" limit on the size of the buffer: - * when the buffer size reaches the limit, the reading thread will sleep - * for 100ms. - * A hard limit can be achieved by overriding the method - * protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() - * of BlockingEnvelopeMap - * But then we have handle the case when the queue is full. - * - */ - public static class SamoaSystemConsumer extends BlockingEnvelopeMap { - - private EntranceProcessor entranceProcessor = null; - private SystemStreamPartition systemStreamPartition; - - private static final Logger logger = LoggerFactory.getLogger(SamoaSystemConsumer.class); - - public SamoaSystemConsumer(String systemName, Config config) { - String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY); - if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property is set , otherwise, assume we are running in - // local mode and ignore this - SystemsUtils.setHadoopConfigHome(yarnConfHome); - - String filename = config.get(SamzaConfigFactory.FILE_KEY); - String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY); - String name = config.get(SamzaConfigFactory.JOB_NAME_KEY); - SerializationProxy wrapper = (SerializationProxy) SystemsUtils.deserializeObjectFromFileAndKey(filesystem, filename, name); - - this.entranceProcessor = wrapper.processor; - this.entranceProcessor.onCreate(0); - - // Internal stream from SystemConsumer to EntranceTask, so we - // need only one partition - this.systemStreamPartition = new SystemStreamPartition(systemName, wrapper.name, new Partition(0)); - } - - @Override - public void start() { - Thread processorPollingThread = new Thread( - new Runnable() { - @Override - public void run() { - try { - pollingEntranceProcessor(); - setIsAtHead(systemStreamPartition, true); - } catch (InterruptedException e) { - e.getStackTrace(); - stop(); - } - } - } - ); - - processorPollingThread.start(); - } - - @Override - public void stop() { - - } - - private void pollingEntranceProcessor() throws InterruptedException { - int messageCnt = 0; - while(!this.entranceProcessor.isFinished()) { - messageCnt = this.getNumMessagesInQueue(systemStreamPartition); - if (this.entranceProcessor.hasNext() && messageCnt < 10000) { // soft limit on the size of the queue - this.put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition,null, null,this.entranceProcessor.nextEvent())); - } else { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - break; - } - } - } - - // Send last event - this.put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition,null, null,this.entranceProcessor.nextEvent())); - } - - } + private static final long serialVersionUID = 313907132721414634L; + + private EntranceProcessor processor; + private SamzaStream outputStream; + private String name; + + public SerializationProxy(SamzaEntranceProcessingItem epi) { + this.processor = epi.getProcessor(); + this.outputStream = (SamzaStream) epi.getOutputStream(); + this.name = epi.getName(); + } + } + + /* + * Implement Samza Task + */ + @Override + public void init(Config config, TaskContext context) throws Exception { + String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY); + if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property is + // set , otherwise, + // assume we are + // running in + // local mode and ignore this + SystemsUtils.setHadoopConfigHome(yarnConfHome); + + String filename = config.get(SamzaConfigFactory.FILE_KEY); + String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY); + + this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY)); + SerializationProxy wrapper = (SerializationProxy) SystemsUtils.deserializeObjectFromFileAndKey(filesystem, + filename, this.getName()); + this.setOutputStream(wrapper.outputStream); + SamzaStream output = (SamzaStream) this.getOutputStream(); + if (output != null) // if output stream exists, set it up + output.onCreate(); + } + + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) + throws Exception { + SamzaStream output = (SamzaStream) this.getOutputStream(); + if (output == null) + return; // if there is no output stream, do nothing + output.setCollector(collector); + ContentEvent event = (ContentEvent) envelope.getMessage(); + output.put(event); + } + + /* + * Implementation of Samza's SystemConsumer to get events from source and feed + * to SAMOA system + */ + /* + * Current implementation: buffer the incoming events and send a batch of them + * when poll() is called by Samza system. + * + * Currently: it has a "soft" limit on the size of the buffer: when the buffer + * size reaches the limit, the reading thread will sleep for 100ms. A hard + * limit can be achieved by overriding the method protected + * BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() of + * BlockingEnvelopeMap But then we have handle the case when the queue is + * full. + */ + public static class SamoaSystemConsumer extends BlockingEnvelopeMap { + + private EntranceProcessor entranceProcessor = null; + private SystemStreamPartition systemStreamPartition; + + private static final Logger logger = LoggerFactory.getLogger(SamoaSystemConsumer.class); + + public SamoaSystemConsumer(String systemName, Config config) { + String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY); + if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property + // is set , + // otherwise, + // assume we are + // running in + // local mode and ignore this + SystemsUtils.setHadoopConfigHome(yarnConfHome); + + String filename = config.get(SamzaConfigFactory.FILE_KEY); + String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY); + String name = config.get(SamzaConfigFactory.JOB_NAME_KEY); + SerializationProxy wrapper = (SerializationProxy) SystemsUtils.deserializeObjectFromFileAndKey(filesystem, + filename, name); + + this.entranceProcessor = wrapper.processor; + this.entranceProcessor.onCreate(0); + + // Internal stream from SystemConsumer to EntranceTask, so we + // need only one partition + this.systemStreamPartition = new SystemStreamPartition(systemName, wrapper.name, new Partition(0)); + } + + @Override + public void start() { + Thread processorPollingThread = new Thread( + new Runnable() { + @Override + public void run() { + try { + pollingEntranceProcessor(); + setIsAtHead(systemStreamPartition, true); + } catch (InterruptedException e) { + e.getStackTrace(); + stop(); + } + } + } + ); + + processorPollingThread.start(); + } + + @Override + public void stop() { + + } + + private void pollingEntranceProcessor() throws InterruptedException { + int messageCnt = 0; + while (!this.entranceProcessor.isFinished()) { + messageCnt = this.getNumMessagesInQueue(systemStreamPartition); + if (this.entranceProcessor.hasNext() && messageCnt < 10000) { // soft + // limit + // on the + // size of + // the + // queue + this.put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, null, null, + this.entranceProcessor.nextEvent())); + } else { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + break; + } + } + } + + // Send last event + this.put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, null, null, + this.entranceProcessor.nextEvent())); + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java index db72e7c..7c97e65 100644 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java @@ -46,120 +46,127 @@ import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; /** - * ProcessingItem for Samza - * which is also a Samza task (StreamTask and InitableTask) + * ProcessingItem for Samza which is also a Samza task (StreamTask and + * InitableTask) * * @author Anh Thu Vu */ -public class SamzaProcessingItem extends AbstractProcessingItem - implements SamzaProcessingNode, Serializable, StreamTask, InitableTask { - - /** +public class SamzaProcessingItem extends AbstractProcessingItem + implements SamzaProcessingNode, Serializable, StreamTask, InitableTask { + + /** * */ - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; - private Set<SamzaSystemStream> inputStreams; // input streams: system.stream - private List<SamzaStream> outputStreams; - - /* - * Constructors - */ - // Need this so Samza can initialize a StreamTask - public SamzaProcessingItem() {} - - /* - * Implement com.yahoo.labs.samoa.topology.ProcessingItem - */ - public SamzaProcessingItem(Processor processor, int parallelismHint) { - super(processor, parallelismHint); - this.inputStreams = new HashSet<SamzaSystemStream>(); - this.outputStreams = new LinkedList<SamzaStream>(); - } - - /* - * Simple setters, getters - */ - public Set<SamzaSystemStream> getInputStreams() { - return this.inputStreams; - } - - /* - * Extends AbstractProcessingItem - */ - @Override - protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { - SamzaSystemStream stream = ((SamzaStream) inputStream).addDestination(new StreamDestination(this,this.getParallelism(),scheme)); - this.inputStreams.add(stream); - return this; - } - - /* - * Implement com.yahoo.samoa.topology.impl.SamzaProcessingNode - */ - @Override - public int addOutputStream(SamzaStream stream) { - this.outputStreams.add(stream); - return this.outputStreams.size(); - } - - public List<SamzaStream> getOutputStreams() { - return this.outputStreams; - } - - /* - * Implement Samza task - */ - @Override - public void init(Config config, TaskContext context) throws Exception { - String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY); - if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property is set , otherwise, assume we are running in - // local mode and ignore this - SystemsUtils.setHadoopConfigHome(yarnConfHome); - - String filename = config.get(SamzaConfigFactory.FILE_KEY); - String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY); - this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY)); - SerializationProxy wrapper = (SerializationProxy) SystemsUtils.deserializeObjectFromFileAndKey(filesystem, filename, this.getName()); - this.setProcessor(wrapper.processor); - this.outputStreams = wrapper.outputStreams; - - // Init Processor and Streams - this.getProcessor().onCreate(0); - for (SamzaStream stream:this.outputStreams) { - stream.onCreate(); - } - - } - - @Override - public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { - for (SamzaStream stream:this.outputStreams) { - stream.setCollector(collector); - } - this.getProcessor().process((ContentEvent) envelope.getMessage()); - } - - /* - * SerializationProxy - */ - private Object writeReplace() { - return new SerializationProxy(this); - } - - private static class SerializationProxy implements Serializable { - /** + private Set<SamzaSystemStream> inputStreams; // input streams: system.stream + private List<SamzaStream> outputStreams; + + /* + * Constructors + */ + // Need this so Samza can initialize a StreamTask + public SamzaProcessingItem() { + } + + /* + * Implement com.yahoo.labs.samoa.topology.ProcessingItem + */ + public SamzaProcessingItem(Processor processor, int parallelismHint) { + super(processor, parallelismHint); + this.inputStreams = new HashSet<SamzaSystemStream>(); + this.outputStreams = new LinkedList<SamzaStream>(); + } + + /* + * Simple setters, getters + */ + public Set<SamzaSystemStream> getInputStreams() { + return this.inputStreams; + } + + /* + * Extends AbstractProcessingItem + */ + @Override + protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { + SamzaSystemStream stream = ((SamzaStream) inputStream).addDestination(new StreamDestination(this, this + .getParallelism(), scheme)); + this.inputStreams.add(stream); + return this; + } + + /* + * Implement com.yahoo.samoa.topology.impl.SamzaProcessingNode + */ + @Override + public int addOutputStream(SamzaStream stream) { + this.outputStreams.add(stream); + return this.outputStreams.size(); + } + + public List<SamzaStream> getOutputStreams() { + return this.outputStreams; + } + + /* + * Implement Samza task + */ + @Override + public void init(Config config, TaskContext context) throws Exception { + String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY); + if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property is + // set , otherwise, + // assume we are + // running in + // local mode and ignore this + SystemsUtils.setHadoopConfigHome(yarnConfHome); + + String filename = config.get(SamzaConfigFactory.FILE_KEY); + String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY); + this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY)); + SerializationProxy wrapper = (SerializationProxy) SystemsUtils.deserializeObjectFromFileAndKey(filesystem, + filename, this.getName()); + this.setProcessor(wrapper.processor); + this.outputStreams = wrapper.outputStreams; + + // Init Processor and Streams + this.getProcessor().onCreate(0); + for (SamzaStream stream : this.outputStreams) { + stream.onCreate(); + } + + } + + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) + throws Exception { + for (SamzaStream stream : this.outputStreams) { + stream.setCollector(collector); + } + this.getProcessor().process((ContentEvent) envelope.getMessage()); + } + + /* + * SerializationProxy + */ + private Object writeReplace() { + return new SerializationProxy(this); + } + + private static class SerializationProxy implements Serializable { + /** * */ - private static final long serialVersionUID = 1534643987559070336L; - - private Processor processor; - private List<SamzaStream> outputStreams; - - public SerializationProxy(SamzaProcessingItem pi) { - this.processor = pi.getProcessor(); - this.outputStreams = pi.getOutputStreams(); - } - } + private static final long serialVersionUID = 1534643987559070336L; + + private Processor processor; + private List<SamzaStream> outputStreams; + + public SerializationProxy(SamzaProcessingItem pi) { + this.processor = pi.getProcessor(); + this.outputStreams = pi.getOutputStreams(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java index be13673..1dbccb6 100644 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java @@ -23,34 +23,36 @@ package com.yahoo.labs.samoa.topology.impl; import com.yahoo.labs.samoa.topology.IProcessingItem; /** - * Common interface of SamzaEntranceProcessingItem and - * SamzaProcessingItem + * Common interface of SamzaEntranceProcessingItem and SamzaProcessingItem * * @author Anh Thu Vu */ public interface SamzaProcessingNode extends IProcessingItem { - /** - * Registers an output stream with this processing item - * - * @param stream - * the output stream - * @return the number of output streams of this processing item - */ - public int addOutputStream(SamzaStream stream); - - /** - * Gets the name/id of this processing item - * - * @return the name/id of this processing item - */ - // TODO: include getName() and setName() in IProcessingItem and/or AbstractEPI/PI - public String getName(); - - /** - * Sets the name/id for this processing item - * @param name - * the name/id of this processing item - */ - // TODO: include getName() and setName() in IProcessingItem and/or AbstractEPI/PI - public void setName(String name); + /** + * Registers an output stream with this processing item + * + * @param stream + * the output stream + * @return the number of output streams of this processing item + */ + public int addOutputStream(SamzaStream stream); + + /** + * Gets the name/id of this processing item + * + * @return the name/id of this processing item + */ + // TODO: include getName() and setName() in IProcessingItem and/or + // AbstractEPI/PI + public String getName(); + + /** + * Sets the name/id for this processing item + * + * @param name + * the name/id of this processing item + */ + // TODO: include getName() and setName() in IProcessingItem and/or + // AbstractEPI/PI + public void setName(String name); } \ No newline at end of file
