http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/assembly/samoa-s4.xml ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/assembly/samoa-s4.xml b/samoa-s4/src/main/assembly/samoa-s4.xml new file mode 100644 index 0000000..8e5614a --- /dev/null +++ b/samoa-s4/src/main/assembly/samoa-s4.xml @@ -0,0 +1,64 @@ +<!-- + #%L + SAMOA + %% + Copyright (C) 2013 Yahoo! Inc. + %% + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + #L% + --> +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>dist</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + + <fileSets> + <!-- SAMOA API artifacts --> + <fileSet> + <outputDirectory>lib/</outputDirectory> + <directory>../samoa-api/target/lib/</directory> + <includes> + <include>*</include> + </includes> + </fileSet> + <fileSet> + <outputDirectory>app/</outputDirectory> + <directory>../samoa-api/target/</directory> + <includes> + <include>samoa-api-*.jar</include> + </includes> + </fileSet> + + <!-- SAMOA S4 artifacts --> + <fileSet> + <outputDirectory>app/</outputDirectory> + <directory>target/</directory> + <includes> + <include>samoa-s4-*.jar</include> + </includes> + </fileSet> + <fileSet> + <outputDirectory>/</outputDirectory> + <directory>target/</directory> + <includes> + <include>lib/*</include> + </includes> + </fileSet> + </fileSets> + +</assembly> \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java new file mode 100644 index 0000000..33299ac --- /dev/null +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java @@ -0,0 +1,97 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.yahoo.labs.samoa.core.EntranceProcessor; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.topology.ComponentFactory; +import com.yahoo.labs.samoa.topology.EntranceProcessingItem; +import com.yahoo.labs.samoa.topology.IProcessingItem; +import com.yahoo.labs.samoa.topology.ProcessingItem; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.Topology; + +/** + * S4 Platform Component Factory + * + * @author severien + * + */ +public class S4ComponentFactory implements ComponentFactory { + + public static final Logger logger = LoggerFactory.getLogger(S4ComponentFactory.class); + protected S4DoTask app; + + @Override + public ProcessingItem createPi(Processor processor, int paralellism) { + S4ProcessingItem processingItem = new S4ProcessingItem(app); + // TODO refactor how to set the paralellism level + processingItem.setParalellismLevel(paralellism); + processingItem.setProcessor(processor); + + return processingItem; + } + + @Override + public ProcessingItem createPi(Processor processor) { + return this.createPi(processor, 1); + } + + @Override + public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { + // TODO Create source Entry processing item that connects to an external stream + S4EntranceProcessingItem entrancePi = new S4EntranceProcessingItem(entranceProcessor, app); + entrancePi.setParallelism(1); // FIXME should not be set to 1 statically + return entrancePi; + } + + @Override + public Stream createStream(IProcessingItem sourcePi) { + S4Stream aStream = new S4Stream(app); + return aStream; + } + + @Override + public Topology createTopology(String topoName) { + return new S4Topology(topoName); + } + + /** + * Initialization method. + * + * @param evalTask + */ + public void init(String evalTask) { + // Task is initiated in the DoTaskApp + } + + /** + * Sets S4 application. + * + * @param app + */ + public void setApp(S4DoTask app) { + this.app = app; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java new file mode 100644 index 0000000..0f474a4 --- /dev/null +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java @@ -0,0 +1,263 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.s4.base.Event; +import org.apache.s4.base.KeyFinder; +import org.apache.s4.core.App; +import org.apache.s4.core.ProcessingElement; +import org.apache.s4.core.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.github.javacliparser.Option; +import com.github.javacliparser.ClassOption; +import com.yahoo.labs.samoa.core.Globals; +import com.yahoo.labs.samoa.tasks.Task; +import com.yahoo.labs.samoa.topology.ComponentFactory; + +import com.google.inject.Inject; +import com.google.inject.name.Named; + +/* + * S4 App that runs samoa Tasks + * + * */ + +/** + * The Class DoTaskApp. + */ +final public class S4DoTask extends App { + + private final Logger logger = LoggerFactory.getLogger(S4DoTask.class); + Task task; + + @Inject @Named("evalTask") public String evalTask; + + public S4DoTask() { + super(); + } + + /** The engine. */ + protected ComponentFactory componentFactory; + + /** + * Gets the factory. + * + * @return the factory + */ + public ComponentFactory getFactory() { + return componentFactory; + } + + /** + * Sets the factory. + * + * @param factory + * the new factory + */ + public void setFactory(ComponentFactory factory) { + this.componentFactory = factory; + } + + /* + * Build the application + * + * @see org.apache.s4.core.App#onInit() + */ + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#onInit() + */ + @Override + protected void onInit() { + logger.info("DoTaskApp onInit"); + // ConsoleReporters prints S4 metrics + // MetricsRegistry mr = new MetricsRegistry(); + // + // CsvReporter.enable(new File(System.getProperty("user.home") + // + "/monitor/"), 10, TimeUnit.SECONDS); + // ConsoleReporter.enable(10, TimeUnit.SECONDS); + try { + System.err.println(); + System.err.println(Globals.getWorkbenchInfoString()); + System.err.println(); + + } catch (Exception ex) { + ex.printStackTrace(); + } + S4ComponentFactory factory = new S4ComponentFactory(); + factory.setApp(this); + + // logger.debug("LC {}", lc); + + // task = TaskProvider.getTask(evalTask); + + // EXAMPLE OPTIONS + // -l Clustream -g Clustream -i 100000 -s (RandomRBFGeneratorEvents -K + // 5 -N 0.0) + // String[] args = new String[] {evalTask,"-l", "Clustream","-g", + // "Clustream", "-i", "100000", "-s", "(RamdomRBFGeneratorsEvents", + // "-K", "5", "-N", "0.0)"}; + // String[] args = new String[] { evalTask, "-l", "clustream.Clustream", + // "-g", "clustream.Clustream", "-i", "100000", "-s", + // "(RandomRBFGeneratorEvents", "-K", "5", "-N", "0.0)" }; + logger.debug("PARAMETERS {}", evalTask); + // params = params.replace(":", " "); + List<String> parameters = new ArrayList<String>(); + // parameters.add(evalTask); + try { + parameters.addAll(Arrays.asList(URLDecoder.decode(evalTask, "UTF-8").split(" "))); + } catch (UnsupportedEncodingException ex) { + ex.printStackTrace(); + } + String[] args = parameters.toArray(new String[0]); + Option[] extraOptions = new Option[] {}; + // build a single string by concatenating cli options + StringBuilder cliString = new StringBuilder(); + for (int i = 0; i < args.length; i++) { + cliString.append(" ").append(args[i]); + } + + // parse options + try { + task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions); + task.setFactory(factory); + task.init(); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#onStart() + */ + @Override + protected void onStart() { + logger.info("Starting DoTaskApp... App Partition [{}]", this.getPartitionId()); + // <<<<<<< HEAD Task doesn't have start in latest storm-impl + // TODO change the way the app starts + // if (this.getPartitionId() == 0) + S4Topology s4topology = (S4Topology) getTask().getTopology(); + S4EntranceProcessingItem epi = (S4EntranceProcessingItem) s4topology.getEntranceProcessingItem(); + while (epi.injectNextEvent()) + // inject events from the EntrancePI + ; + } + + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#onClose() + */ + @Override + protected void onClose() { + System.out.println("Closing DoTaskApp..."); + + } + + /** + * Gets the task. + * + * @return the task + */ + public Task getTask() { + return task; + } + + // These methods are protected in App and can not be accessed from outside. + // They are + // called from parallel classifiers and evaluations. Is there a better way + // to do that? + + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#createPE(java.lang.Class) + */ + @Override + public <T extends ProcessingElement> T createPE(Class<T> type) { + return super.createPE(type); + } + + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#createStream(java.lang.String, org.apache.s4.base.KeyFinder, org.apache.s4.core.ProcessingElement[]) + */ + @Override + public <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder, ProcessingElement... processingElements) { + return super.createStream(name, finder, processingElements); + } + + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#createStream(java.lang.String, org.apache.s4.core.ProcessingElement[]) + */ + @Override + public <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) { + return super.createStream(name, processingElements); + } + + // @com.beust.jcommander.Parameters(separators = "=") + // class Parameters { + // + // @Parameter(names={"-lc","-local"}, description="Local clustering method") + // private String localClustering; + // + // @Parameter(names={"-gc","-global"}, + // description="Global clustering method") + // private String globalClustering; + // + // } + // + // class ParametersConverter {// implements IStringConverter<String[]> { + // + // + // public String[] convertToArgs(String value) { + // + // String[] params = value.split(","); + // String[] args = new String[params.length*2]; + // for(int i=0; i<params.length ; i++) { + // args[i] = params[i].split("=")[0]; + // args[i+1] = params[i].split("=")[1]; + // i++; + // } + // return args; + // } + // + // } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java new file mode 100644 index 0000000..2b0c595 --- /dev/null +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java @@ -0,0 +1,120 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.s4.core.App; +import org.apache.s4.core.ProcessingElement; + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.EntranceProcessor; +import com.yahoo.labs.samoa.topology.EntranceProcessingItem; +import com.yahoo.labs.samoa.topology.Stream; + +// TODO adapt this entrance processing item to connect to external streams so the application doesnt need to use an AdapterApp + +public class S4EntranceProcessingItem extends ProcessingElement implements EntranceProcessingItem { + + private EntranceProcessor entranceProcessor; + // private S4DoTask app; + private int parallelism; + protected Stream outputStream; + + /** + * Constructor of an S4 entrance processing item. + * + * @param app + * : S4 application + */ + public S4EntranceProcessingItem(EntranceProcessor entranceProcessor, App app) { + super(app); + this.entranceProcessor = entranceProcessor; + // this.app = (S4DoTask) app; + // this.setSingleton(true); + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + public int getParallelism() { + return this.parallelism; + } + + @Override + public EntranceProcessor getProcessor() { + return this.entranceProcessor; + } + + // + // @Override + // public void put(Instance inst) { + // // do nothing + // // may not needed + // } + + @Override + protected void onCreate() { + // was commented + if (this.entranceProcessor != null) { + // TODO revisit if we need to change it to a clone() call + this.entranceProcessor = (EntranceProcessor) this.entranceProcessor.newProcessor(this.entranceProcessor); + this.entranceProcessor.onCreate(Integer.parseInt(getId())); + } + } + + @Override + protected void onRemove() { + // do nothing + } + + // + // /** + // * Sets the entrance processing item processor. + // * + // * @param processor + // */ + // public void setProcessor(Processor processor) { + // this.entranceProcessor = processor; + // } + + @Override + public void setName(String name) { + super.setName(name); + } + + @Override + public EntranceProcessingItem setOutputStream(Stream stream) { + if (this.outputStream != null) + throw new IllegalStateException("Output stream for an EntrancePI sohuld be initialized only once"); + this.outputStream = stream; + return this; + } + + public boolean injectNextEvent() { + if (entranceProcessor.hasNext()) { + ContentEvent nextEvent = this.entranceProcessor.nextEvent(); + outputStream.put(nextEvent); + return entranceProcessor.hasNext(); + } else + return false; + // return !nextEvent.isLastEvent(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java new file mode 100644 index 0000000..8f8ad9f --- /dev/null +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java @@ -0,0 +1,90 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import net.jcip.annotations.Immutable; + +import org.apache.s4.base.Event; + +import com.yahoo.labs.samoa.core.ContentEvent; + +/** + * The Class InstanceEvent. + */ +@Immutable +final public class S4Event extends Event { + + private String key; + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + /** The content event. */ + private ContentEvent contentEvent; + + /** + * Instantiates a new instance event. + */ + public S4Event() { + // Needed for serialization of kryo + } + + /** + * Instantiates a new instance event. + * + * @param contentEvent the content event + */ + public S4Event(ContentEvent contentEvent) { + if (contentEvent != null) { + this.contentEvent = contentEvent; + this.key = contentEvent.getKey(); + + } + } + + /** + * Gets the content event. + * + * @return the content event + */ + public ContentEvent getContentEvent() { + return contentEvent; + } + + /** + * Sets the content event. + * + * @param contentEvent the new content event + */ + public void setContentEvent(ContentEvent contentEvent) { + this.contentEvent = contentEvent; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java new file mode 100644 index 0000000..1351159 --- /dev/null +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java @@ -0,0 +1,188 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.s4.base.KeyFinder; +import org.apache.s4.core.App; +import org.apache.s4.core.ProcessingElement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.topology.ProcessingItem; +import com.yahoo.labs.samoa.topology.Stream; + +/** + * S4 Platform platform specific processing item, inherits from S4 ProcessinElemnt. + * + * @author severien + * + */ +public class S4ProcessingItem extends ProcessingElement implements + ProcessingItem { + + public static final Logger logger = LoggerFactory + .getLogger(S4ProcessingItem.class); + + private Processor processor; + private int paralellismLevel; + private S4DoTask app; + + private static final String NAME="PROCESSING-ITEM-"; + private static int OBJ_COUNTER=0; + + /** + * Constructor of S4 ProcessingItem. + * + * @param app : S4 application + */ + public S4ProcessingItem(App app) { + super(app); + super.setName(NAME+OBJ_COUNTER); + OBJ_COUNTER++; + this.app = (S4DoTask) app; + this.paralellismLevel = 1; + } + + @Override + public String getName() { + return super.getName(); + } + + /** + * Gets processing item paralellism level. + * + * @return int + */ + public int getParalellismLevel() { + return paralellismLevel; + } + + /** + * Sets processing item paralellism level. + * + * @param paralellismLevel + */ + public void setParalellismLevel(int paralellismLevel) { + this.paralellismLevel = paralellismLevel; + } + + /** + * onEvent method. + * + * @param event + */ + public void onEvent(S4Event event) { + if (processor.process(event.getContentEvent()) == true) { + close(); + } + } + + /** + * Sets S4 processing item processor. + * + * @param processor + */ + public void setProcessor(Processor processor) { + this.processor = processor; + } + + // Methods from ProcessingItem + @Override + public Processor getProcessor() { + return processor; + } + + /** + * KeyFinder sets the keys for a specific event. + * + * @return KeyFinder + */ + private KeyFinder<S4Event> getKeyFinder() { + KeyFinder<S4Event> keyFinder = new KeyFinder<S4Event>() { + @Override + public List<String> get(S4Event s4event) { + List<String> results = new ArrayList<String>(); + results.add(s4event.getKey()); + return results; + } + }; + + return keyFinder; + } + + + @Override + public ProcessingItem connectInputAllStream(Stream inputStream) { + + S4Stream stream = (S4Stream) inputStream; + stream.setParallelism(this.paralellismLevel); + stream.addStream(inputStream.getStreamId(), + getKeyFinder(), this, S4Stream.BROADCAST); + return this; + } + + + @Override + public ProcessingItem connectInputKeyStream(Stream inputStream) { + + S4Stream stream = (S4Stream) inputStream; + stream.setParallelism(this.paralellismLevel); + stream.addStream(inputStream.getStreamId(), + getKeyFinder(), this,S4Stream.GROUP_BY_KEY); + + return this; + } + + @Override + public ProcessingItem connectInputShuffleStream(Stream inputStream) { + S4Stream stream = (S4Stream) inputStream; + stream.setParallelism(this.paralellismLevel); + stream.addStream(inputStream.getStreamId(), + getKeyFinder(), this,S4Stream.SHUFFLE); + + return this; + } + + // Methods from ProcessingElement + @Override + protected void onCreate() { + logger.debug("PE ID {}", getId()); + if (this.processor != null) { + this.processor = this.processor.newProcessor(this.processor); + this.processor.onCreate(Integer.parseInt(getId())); + } + } + + @Override + protected void onRemove() { + // do nothing + } + + @Override + public int getParallelism() { + return this.paralellismLevel; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java new file mode 100644 index 0000000..78a3266 --- /dev/null +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java @@ -0,0 +1,185 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.s4.base.KeyFinder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.topology.AbstractStream; + +/** + * S4 Platform specific stream. + * + * @author severien + * + */ +public class S4Stream extends AbstractStream { + + public static final int SHUFFLE = 0; + public static final int GROUP_BY_KEY = 1; + public static final int BROADCAST = 2; + + private static final Logger logger = LoggerFactory.getLogger(S4Stream.class); + + private S4DoTask app; + private int processingItemParalellism; + private int shuffleCounter; + + private static final String NAME = "STREAM-"; + private static int OBJ_COUNTER = 0; + + /* The stream list */ + public List<StreamType> streams; + + public S4Stream(S4DoTask app) { + super(); + this.app = app; + this.processingItemParalellism = 1; + this.shuffleCounter = 0; + this.streams = new ArrayList<StreamType>(); + this.setStreamId(NAME+OBJ_COUNTER); + OBJ_COUNTER++; + } + + public S4Stream(S4DoTask app, S4ProcessingItem pi) { + super(); + this.app = app; + this.processingItemParalellism = 1; + this.shuffleCounter = 0; + this.streams = new ArrayList<StreamType>(); + this.setStreamId(NAME+OBJ_COUNTER); + OBJ_COUNTER++; + + } + + /** + * + * @return + */ + public int getParallelism() { + return processingItemParalellism; + } + + public void setParallelism(int parallelism) { + this.processingItemParalellism = parallelism; + } + + public void addStream(String streamID, KeyFinder<S4Event> finder, + S4ProcessingItem pi, int type) { + String streamName = streamID +"_"+pi.getName(); + org.apache.s4.core.Stream<S4Event> stream = this.app.createStream( + streamName, pi); + stream.setName(streamName); + logger.debug("Stream name S4Stream {}", streamName); + if (finder != null) + stream.setKey(finder); + this.streams.add(new StreamType(stream, type)); + + } + + @Override + public void put(ContentEvent event) { + + for (int i = 0; i < streams.size(); i++) { + + switch (streams.get(i).getType()) { + case SHUFFLE: + S4Event s4event = new S4Event(event); + s4event.setStreamId(streams.get(i).getStream().getName()); + if(getParallelism() == 1) { + s4event.setKey("0"); + }else { + s4event.setKey(Integer.toString(shuffleCounter)); + } + streams.get(i).getStream().put(s4event); + shuffleCounter++; + if (shuffleCounter >= (getParallelism())) { + shuffleCounter = 0; + } + + break; + + case GROUP_BY_KEY: + S4Event s4event1 = new S4Event(event); + s4event1.setStreamId(streams.get(i).getStream().getName()); + HashCodeBuilder hb = new HashCodeBuilder(); + hb.append(event.getKey()); + String key = Integer.toString(hb.build() % getParallelism()); + s4event1.setKey(key); + streams.get(i).getStream().put(s4event1); + break; + + case BROADCAST: + for (int p = 0; p < this.getParallelism(); p++) { + S4Event s4event2 = new S4Event(event); + s4event2.setStreamId(streams.get(i).getStream().getName()); + s4event2.setKey(Integer.toString(p)); + streams.get(i).getStream().put(s4event2); + } + break; + + default: + break; + } + + + } + + } + + /** + * Subclass for definig stream connection type + * @author severien + * + */ + class StreamType { + org.apache.s4.core.Stream<S4Event> stream; + int type; + + public StreamType(org.apache.s4.core.Stream<S4Event> s, int t) { + this.stream = s; + this.type = t; + } + + public org.apache.s4.core.Stream<S4Event> getStream() { + return stream; + } + + public void setStream(org.apache.s4.core.Stream<S4Event> stream) { + this.stream = stream; + } + + public int getType() { + return type; + } + + public void setType(int type) { + this.type = type; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java new file mode 100644 index 0000000..c7ef92c --- /dev/null +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java @@ -0,0 +1,146 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.s4.core.util.AppConfig; +import org.apache.s4.core.util.ParsingUtils; +import org.apache.s4.deploy.DeploymentUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.yahoo.labs.samoa.tasks.Task; +import com.yahoo.labs.samoa.topology.ISubmitter; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; + +public class S4Submitter implements ISubmitter { + + private static Logger logger = LoggerFactory.getLogger(S4Submitter.class); + + @Override + public void deployTask(Task task) { + // TODO: Get application FROM HTTP server + // TODO: Initializa a http server to serve the app package + + String appURIString = null; +// File app = new File(System.getProperty("user.dir") +// + "/src/site/dist/SAMOA-S4-0.1-dist.jar"); + + // TODO: String app url http://localhost:8000/SAMOA-S4-0.1-dist.jar + try { + URL appURL = new URL("http://localhost:8000/SAMOA-S4-0.1.jar"); + appURIString = appURL.toString(); + } catch (MalformedURLException e1) { + e1.printStackTrace(); + } + +// try { +// appURIString = app.toURI().toURL().toString(); +// } catch (MalformedURLException e) { +// e.printStackTrace(); +// } + if (task == null) { + logger.error("Can't execute since evaluation task is not set!"); + return; + } else { + logger.info("Deploying SAMOA S4 task [{}] from location [{}]. ", + task.getClass().getSimpleName(), appURIString); + } + + String[] args = { "-c=testCluster2", + "-appClass=" + S4DoTask.class.getName(), + "-appName=" + "samoaApp", + "-p=evalTask=" + task.getClass().getSimpleName(), + "-zk=localhost:2181", "-s4r=" + appURIString , "-emc=" + SamoaSerializerModule.class.getName()}; + // "-emc=" + S4MOAModule.class.getName(), + // "@" + + // Resources.getResource("s4moa.properties").getFile(), + + S4Config s4config = new S4Config(); + JCommander jc = new JCommander(s4config); + jc.parse(args); + + Map<String, String> namedParameters = new HashMap<String, String>(); + for (String parameter : s4config.namedParameters) { + String[] param = parameter.split("="); + namedParameters.put(param[0], param[1]); + } + + AppConfig config = new AppConfig.Builder() + .appClassName(s4config.appClass).appName(s4config.appName) + .appURI(s4config.appURI).namedParameters(namedParameters) + .build(); + + DeploymentUtils.initAppConfig(config, s4config.clusterName, true, + s4config.zkString); + + System.out.println("Suposedly deployed on S4"); + } + + + public void initHTTPServer() { + + } + + @Parameters(separators = "=") + public static class S4Config { + + @Parameter(names = { "-c", "-cluster" }, description = "Cluster name", required = true) + String clusterName = null; + + @Parameter(names = "-appClass", description = "Main App class", required = false) + String appClass = null; + + @Parameter(names = "-appName", description = "Application name", required = false) + String appName = null; + + @Parameter(names = "-s4r", description = "Application URI", required = false) + String appURI = null; + + @Parameter(names = "-zk", description = "ZooKeeper connection string", required = false) + String zkString = null; + + @Parameter(names = { "-extraModulesClasses", "-emc" }, description = "Comma-separated list of additional configuration modules (they will be instantiated through their constructor without arguments).", required = false) + List<String> extraModules = new ArrayList<String>(); + + @Parameter(names = { "-p", "-namedStringParameters" }, description = "Comma-separated list of inline configuration " + + "parameters, taking precedence over homonymous configuration parameters from configuration files. " + + "Syntax: '-p=name1=value1,name2=value2 '", required = false, converter = ParsingUtils.InlineConfigParameterConverter.class) + List<String> namedParameters = new ArrayList<String>(); + + } + + @Override + public void setLocal(boolean bool) { + // TODO S4 works the same for local and distributed environments + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java new file mode 100644 index 0000000..6bef0e8 --- /dev/null +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java @@ -0,0 +1,61 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.topology.EntranceProcessingItem; +import com.yahoo.labs.samoa.topology.AbstractTopology; + +public class S4Topology extends AbstractTopology { + + // CASEY: it seems evaluationTask is not used. + // Remove it for now + +// private String _evaluationTask; + +// S4Topology(String topoName, String evalTask) { +// super(topoName); +// } +// +// S4Topology(String topoName) { +// this(topoName, null); +// } + +// @Override +// public void setEvaluationTask(String evalTask) { +// _evaluationTask = evalTask; +// } +// +// @Override +// public String getEvaluationTask() { +// return _evaluationTask; +// } + + S4Topology(String topoName) { + super(topoName); + } + + protected EntranceProcessingItem getEntranceProcessingItem() { + if (this.getEntranceProcessingItems() == null) return null; + if (this.getEntranceProcessingItems().size() < 1) return null; + // TODO: support multiple entrance PIs + return (EntranceProcessingItem)this.getEntranceProcessingItems().toArray()[0]; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java new file mode 100644 index 0000000..4ae2296 --- /dev/null +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java @@ -0,0 +1,99 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.nio.ByteBuffer; + +import org.apache.s4.base.SerializerDeserializer; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; +import com.yahoo.labs.samoa.learners.classifiers.trees.AttributeContentEvent; +import com.yahoo.labs.samoa.learners.classifiers.trees.ComputeContentEvent; + +public class SamoaSerializer implements SerializerDeserializer{ + + private ThreadLocal<Kryo> kryoThreadLocal; + private ThreadLocal<Output> outputThreadLocal; + + private int initialBufferSize = 2048; + private int maxBufferSize = 256 * 1024; + + public void setMaxBufferSize(int maxBufferSize) { + this.maxBufferSize = maxBufferSize; + } + + /** + * + * @param classLoader + * classloader able to handle classes to serialize/deserialize. For instance, application-level events + * can only be handled by the application classloader. + */ + @Inject + public SamoaSerializer(@Assisted final ClassLoader classLoader) { + kryoThreadLocal = new ThreadLocal<Kryo>() { + + @Override + protected Kryo initialValue() { + Kryo kryo = new Kryo(); + kryo.setClassLoader(classLoader); + kryo.register(AttributeContentEvent.class, new AttributeContentEvent.AttributeCEFullPrecSerializer()); + kryo.register(ComputeContentEvent.class, new ComputeContentEvent.ComputeCEFullPrecSerializer()); + kryo.setRegistrationRequired(false); + return kryo; + } + }; + + outputThreadLocal = new ThreadLocal<Output>() { + @Override + protected Output initialValue() { + Output output = new Output(initialBufferSize, maxBufferSize); + return output; + } + }; + + } + + @Override + public Object deserialize(ByteBuffer rawMessage) { + Input input = new Input(rawMessage.array()); + try { + return kryoThreadLocal.get().readClassAndObject(input); + } finally { + input.close(); + } + } + + @SuppressWarnings("resource") + @Override + public ByteBuffer serialize(Object message) { + Output output = outputThreadLocal.get(); + try { + kryoThreadLocal.get().writeClassAndObject(output, message); + return ByteBuffer.wrap(output.toBytes()); + } finally { + output.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java new file mode 100644 index 0000000..311e449 --- /dev/null +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java @@ -0,0 +1,35 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.s4.base.SerializerDeserializer; + +import com.google.inject.AbstractModule; + +public class SamoaSerializerModule extends AbstractModule { + + @Override + protected void configure() { + bind(SerializerDeserializer.class).to(SamoaSerializer.class); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-samza/pom.xml b/samoa-samza/pom.xml new file mode 100644 index 0000000..8fd60a7 --- /dev/null +++ b/samoa-samza/pom.xml @@ -0,0 +1,168 @@ +<!-- + #%L + SAMOA + %% + Copyright (C) 2013 Yahoo! Inc. + %% + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + #L% + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <name>samoa-samza</name> + <description>Samza engine for SAMOA</description> + + <artifactId>samoa-samza</artifactId> + <parent> + <groupId>com.yahoo.labs.samoa</groupId> + <artifactId>samoa</artifactId> + <version>0.3.0-SNAPSHOT</version> + </parent> + + <dependencies> + <dependency> + <groupId>com.yahoo.labs.samoa</groupId> + <artifactId>samoa-api</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j-log4j12.version}</version> + </dependency> + + <!--<dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>11.0.2</version> + </dependency> --> + + <dependency> + <groupId>org.apache.samza</groupId> + <artifactId>samza-api</artifactId> + <version>${samza.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.samza</groupId> + <artifactId>samza-core_2.10</artifactId> + <version>${samza.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.samza</groupId> + <artifactId>samza-serializers_2.10</artifactId> + <version>${samza.version}</version> + </dependency> + + <!--<dependency> + <groupId>org.apache.samza</groupId> + <artifactId>samza-shell</artifactId> + <classifier>dist</classifier> + <type>tgz</type> + <version>${samza.version}</version> + </dependency>--> + + <dependency> + <groupId>org.apache.samza</groupId> + <artifactId>samza-yarn_2.10</artifactId> + <version>${samza.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.samza</groupId> + <artifactId>samza-kafka_2.10</artifactId> + <version>${samza.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>${kafka.version}</version> + </dependency> + <dependency> + <groupId>com.101tec</groupId> + <artifactId>zkclient</artifactId> + <version>0.4</version> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.1</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- SAMOA assembly --> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>${maven-assembly-plugin.version}</version> + <configuration> + + <descriptors> + <descriptor>src/main/assembly/samoa-samza.xml</descriptor> + </descriptors> + <finalName>SAMOA-Samza-${project.version}</finalName> + <appendAssemblyId>false</appendAssemblyId> + <attach>false</attach> + <outputDirectory>../target</outputDirectory> + <!-- + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + --> + <archive> + <manifestEntries> + <Bundle-Version>${parsedVersion.osgiVersion}</Bundle-Version> + <Bundle-Description>${project.description}</Bundle-Description> + <Implementation-Version>${project.version}</Implementation-Version> + <Implementation-Vendor>Yahoo Labs</Implementation-Vendor> + <Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id> + </manifestEntries> + </archive> + </configuration> + <executions> + <execution> + <id>make-assembly</id> <!-- this is used for inheritance merges --> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + <resources> + <resource> + <directory>src/main/resources</directory> + <filtering>true</filtering> + </resource> + </resources> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/assembly/samoa-samza.xml ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/assembly/samoa-samza.xml b/samoa-samza/src/main/assembly/samoa-samza.xml new file mode 100644 index 0000000..ead1c0a --- /dev/null +++ b/samoa-samza/src/main/assembly/samoa-samza.xml @@ -0,0 +1,108 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + #%L + SAMOA + %% + Copyright (C) 2013 - 2014 Yahoo! Inc. + %% + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + #L% + --> + +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + you under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>dist</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/..</directory> + <includes> + <include>README*</include> + <include>LICENSE*</include> + <include>NOTICE*</include> + </includes> + </fileSet> + <fileSet> + <outputDirectory>bin</outputDirectory> + <directory>${basedir}/../bin/samza-dist</directory> + <includes> + <include>run-class.sh</include> + <include>run-am.sh</include> + <include>run-container.sh</include> + <include>run-job.sh</include> + </includes> + </fileSet> + </fileSets> + <files> + <file> + <outputDirectory>lib</outputDirectory> + <source>${basedir}/src/main/resources/log4j.xml</source> + </file> + </files> + <dependencySets> + <!-- + <dependencySet> + <outputDirectory>bin</outputDirectory> + <includes> + <include>org.apache.samza:samza-shell:tgz:dist:*</include> + </includes> + <fileMode>0744</fileMode> + <unpack>true</unpack> + </dependencySet> + --> + <dependencySet> + <unpack>true</unpack> + <includes> + <include>org.slf4j:slf4j-log4j12</include> + <include>org.apache.samza:samza-api</include> + <include>org.apache.samza:samza-core_2.10</include> + <include>org.apache.samza:samza-serializers_2.10</include> + <include>org.apache.samza:samza-yarn_2.10</include> + <include>org.apache.samza:samza-kafka_2.10</include> + <include>org.apache.kafka:kafka_2.10</include> + </includes> + <useTransitiveFiltering>true</useTransitiveFiltering> + <useTransitiveDependencies>true</useTransitiveDependencies> + </dependencySet> + <dependencySet> + <unpack>true</unpack> + <includes> + <include>${groupId}:samoa-api</include> + <include>${groupId}:${artifactId}</include> + </includes> + <unpackOptions> + <excludes> + <exclude>META-INF/services/org.apache.hadoop.security.*</exclude> + </excludes> + </unpackOptions> + <useTransitiveFiltering>true</useTransitiveFiltering> + <useTransitiveDependencies>true</useTransitiveDependencies> + </dependencySet> + </dependencySets> +</assembly> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java new file mode 100644 index 0000000..45dd901 --- /dev/null +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java @@ -0,0 +1,227 @@ +package com.yahoo.labs.samoa; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.ClassOption; +import com.yahoo.labs.samoa.tasks.Task; +import com.yahoo.labs.samoa.topology.impl.SamzaComponentFactory; +import com.yahoo.labs.samoa.topology.impl.SamzaEngine; +import com.yahoo.labs.samoa.topology.impl.SamzaTopology; +import com.yahoo.labs.samoa.utils.SystemsUtils; + +/** + * Main class to run the task on Samza + * + * @author Anh Thu Vu + */ +public class SamzaDoTask { + + private static final Logger logger = LoggerFactory.getLogger(SamzaDoTask.class); + + private static final String LOCAL_MODE = "local"; + private static final String YARN_MODE = "yarn"; + + // FLAGS + private static final String YARN_CONF_FLAG = "--yarn_home"; + private static final String MODE_FLAG = "--mode"; + private static final String ZK_FLAG = "--zookeeper"; + private static final String KAFKA_FLAG = "--kafka"; + private static final String KAFKA_REPLICATION_FLAG = "--kafka_replication_factor"; + private static final String CHECKPOINT_FREQ_FLAG = "--checkpoint_frequency"; + private static final String JAR_PACKAGE_FLAG = "--jar_package"; + private static final String SAMOA_HDFS_DIR_FLAG = "--samoa_hdfs_dir"; + private static final String AM_MEMORY_FLAG = "--yarn_am_mem"; + private static final String CONTAINER_MEMORY_FLAG = "--yarn_container_mem"; + private static final String PI_PER_CONTAINER_FLAG = "--pi_per_container"; + + private static final String KRYO_REGISTER_FLAG = "--kryo_register"; + + // config values + private static int kafkaReplicationFactor = 1; + private static int checkpointFrequency = 60000; + private static String kafka = "localhost:9092"; + private static String zookeeper = "localhost:2181"; + private static boolean isLocal = true; + private static String yarnConfHome = null; + private static String samoaHDFSDir = null; + private static String jarPackagePath = null; + private static int amMem = 1024; + private static int containerMem = 1024; + private static int piPerContainer = 2; + private static String kryoRegisterFile = null; + + /* + * 1. Read arguments + * 2. Construct topology/task + * 3. Upload the JAR to HDFS if we are running on YARN + * 4. Submit topology to SamzaEngine + */ + public static void main(String[] args) { + // Read arguments + List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + parseArguments(tmpArgs); + + args = tmpArgs.toArray(new String[0]); + + // Init Task + StringBuilder cliString = new StringBuilder(); + for (int i = 0; i < args.length; i++) { + cliString.append(" ").append(args[i]); + } + logger.debug("Command line string = {}", cliString.toString()); + System.out.println("Command line string = " + cliString.toString()); + + Task task = null; + try { + task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, null); + logger.info("Sucessfully instantiating {}", task.getClass().getCanonicalName()); + } catch (Exception e) { + logger.error("Fail to initialize the task", e); + System.out.println("Fail to initialize the task" + e); + return; + } + task.setFactory(new SamzaComponentFactory()); + task.init(); + + // Upload JAR file to HDFS + String hdfsPath = null; + if (!isLocal) { + Path path = FileSystems.getDefault().getPath(jarPackagePath); + hdfsPath = uploadJarToHDFS(path.toFile()); + if(hdfsPath == null) { + System.out.println("Fail uploading JAR file \""+path.toAbsolutePath().toString()+"\" to HDFS."); + return; + } + } + + // Set parameters + SamzaEngine.getEngine() + .setLocalMode(isLocal) + .setZooKeeper(zookeeper) + .setKafka(kafka) + .setYarnPackage(hdfsPath) + .setKafkaReplicationFactor(kafkaReplicationFactor) + .setConfigHome(yarnConfHome) + .setAMMemory(amMem) + .setContainerMemory(containerMem) + .setPiPerContainerRatio(piPerContainer) + .setKryoRegisterFile(kryoRegisterFile) + .setCheckpointFrequency(checkpointFrequency); + + // Submit topology + SamzaEngine.submitTopology((SamzaTopology)task.getTopology()); + + } + + private static boolean isLocalMode(String mode) { + return mode.equals(LOCAL_MODE); + } + + private static void parseArguments(List<String> args) { + for (int i=args.size()-1; i>=0; i--) { + String arg = args.get(i).trim(); + String[] splitted = arg.split("=",2); + + if (splitted.length >= 2) { + // YARN config folder which contains conf/core-site.xml, + // conf/hdfs-site.xml, conf/yarn-site.xml + if (splitted[0].equals(YARN_CONF_FLAG)) { + yarnConfHome = splitted[1]; + args.remove(i); + } + // host:port for zookeeper cluster + else if (splitted[0].equals(ZK_FLAG)) { + zookeeper = splitted[1]; + args.remove(i); + } + // host:port,... for kafka broker(s) + else if (splitted[0].equals(KAFKA_FLAG)) { + kafka = splitted[1]; + args.remove(i); + } + // whether we are running Samza in Local mode or YARN mode + else if (splitted[0].equals(MODE_FLAG)) { + isLocal = isLocalMode(splitted[1]); + args.remove(i); + } + // memory requirement for YARN application master + else if (splitted[0].equals(AM_MEMORY_FLAG)) { + amMem = Integer.parseInt(splitted[1]); + args.remove(i); + } + // memory requirement for YARN worker container + else if (splitted[0].equals(CONTAINER_MEMORY_FLAG)) { + containerMem = Integer.parseInt(splitted[1]); + args.remove(i); + } + // the path to JAR archive that we need to upload to HDFS + else if (splitted[0].equals(JAR_PACKAGE_FLAG)) { + jarPackagePath = splitted[1]; + args.remove(i); + } + // the HDFS dir for SAMOA files + else if (splitted[0].equals(SAMOA_HDFS_DIR_FLAG)) { + samoaHDFSDir = splitted[1]; + if (samoaHDFSDir.length() < 1) samoaHDFSDir = null; + args.remove(i); + } + // number of max PI instances per container + // this will be used to compute the number of containers + // AM will request for the job + else if (splitted[0].equals(PI_PER_CONTAINER_FLAG)) { + piPerContainer = Integer.parseInt(splitted[1]); + args.remove(i); + } + // kafka streams replication factor + else if (splitted[0].equals(KAFKA_REPLICATION_FLAG)) { + kafkaReplicationFactor = Integer.parseInt(splitted[1]); + args.remove(i); + } + // checkpoint frequency in ms + else if (splitted[0].equals(CHECKPOINT_FREQ_FLAG)) { + checkpointFrequency = Integer.parseInt(splitted[1]); + args.remove(i); + } + // the file contains registration information for Kryo serializer + else if (splitted[0].equals(KRYO_REGISTER_FLAG)) { + kryoRegisterFile = splitted[1]; + args.remove(i); + } + } + } + } + + private static String uploadJarToHDFS(File file) { + SystemsUtils.setHadoopConfigHome(yarnConfHome); + SystemsUtils.setSAMOADir(samoaHDFSDir); + return SystemsUtils.copyToHDFS(file, file.getName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java new file mode 100644 index 0000000..362e0a5 --- /dev/null +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java @@ -0,0 +1,57 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemProducer; +import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin; + +import com.yahoo.labs.samoa.topology.impl.SamzaEntranceProcessingItem.SamoaSystemConsumer; + +/** + * Implementation of Samza's SystemFactory + * Samza will use this factory to get our custom consumer + * which gets the events from SAMOA EntranceProcessor + * and feed them to EntranceProcessingItem task + * + * @author Anh Thu Vu + */ +public class SamoaSystemFactory implements SystemFactory { + @Override + public SystemAdmin getAdmin(String systemName, Config config) { + return new SinglePartitionWithoutOffsetsSystemAdmin(); + } + + @Override + public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { + return new SamoaSystemConsumer(systemName, config); + } + + @Override + public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { + throw new SamzaException("This implementation is not supposed to produce anything."); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java new file mode 100644 index 0000000..d71d97b --- /dev/null +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java @@ -0,0 +1,62 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.EntranceProcessor; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.topology.ComponentFactory; +import com.yahoo.labs.samoa.topology.EntranceProcessingItem; +import com.yahoo.labs.samoa.topology.IProcessingItem; +import com.yahoo.labs.samoa.topology.ProcessingItem; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.Topology; + +/** + * Implementation of SAMOA ComponentFactory for Samza + * + * @author Anh Thu Vu + */ +public class SamzaComponentFactory implements ComponentFactory { + @Override + public ProcessingItem createPi(Processor processor) { + return this.createPi(processor, 1); + } + + @Override + public ProcessingItem createPi(Processor processor, int parallelism) { + return new SamzaProcessingItem(processor, parallelism); + } + + @Override + public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { + return new SamzaEntranceProcessingItem(entranceProcessor); + } + + @Override + public Stream createStream(IProcessingItem sourcePi) { + return new SamzaStream(sourcePi); + } + + @Override + public Topology createTopology(String topoName) { + return new SamzaTopology(topoName); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java new file mode 100644 index 0000000..7339443 --- /dev/null +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java @@ -0,0 +1,197 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.List; +import java.util.Set; + +import org.apache.samza.config.MapConfig; +import org.apache.samza.job.JobRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.Topology; +import com.yahoo.labs.samoa.topology.impl.SamzaStream.SamzaSystemStream; +import com.yahoo.labs.samoa.utils.SamzaConfigFactory; +import com.yahoo.labs.samoa.utils.SystemsUtils; + +/** + * This class will submit a list of Samza jobs with + * the Configs generated from the input topology + * + * @author Anh Thu Vu + * + */ +public class SamzaEngine { + + private static final Logger logger = LoggerFactory.getLogger(SamzaEngine.class); + + /* + * Singleton instance + */ + private static SamzaEngine engine = new SamzaEngine(); + + private String zookeeper; + private String kafka; + private int kafkaReplicationFactor; + private boolean isLocalMode; + private String yarnPackagePath; + private String yarnConfHome; + + private String kryoRegisterFile; + + private int amMem; + private int containerMem; + private int piPerContainerRatio; + + private int checkpointFrequency; + + private void _submitTopology(SamzaTopology topology) { + + // Setup SamzaConfigFactory + SamzaConfigFactory configFactory = new SamzaConfigFactory(); + configFactory.setLocalMode(isLocalMode) + .setZookeeper(zookeeper) + .setKafka(kafka) + .setYarnPackage(yarnPackagePath) + .setAMMemory(amMem) + .setContainerMemory(containerMem) + .setPiPerContainerRatio(piPerContainerRatio) + .setKryoRegisterFile(kryoRegisterFile) + .setCheckpointFrequency(checkpointFrequency) + .setReplicationFactor(kafkaReplicationFactor); + + // Generate the list of Configs + List<MapConfig> configs; + try { + // ConfigFactory generate a list of configs + // Serialize a map of PIs and store in a file in the jar at jarFilePath + // (in dat/ folder) + configs = configFactory.getMapConfigsForTopology(topology); + } catch (Exception e) { + e.printStackTrace(); + return; + } + + // Create kafka streams + Set<Stream> streams = topology.getStreams(); + for (Stream stream:streams) { + SamzaStream samzaStream = (SamzaStream) stream; + List<SamzaSystemStream> systemStreams = samzaStream.getSystemStreams(); + for (SamzaSystemStream systemStream:systemStreams) { + // all streams should be kafka streams + SystemsUtils.createKafkaTopic(systemStream.getStream(),systemStream.getParallelism(),kafkaReplicationFactor); + } + } + + // Submit the jobs with those configs + for (MapConfig config:configs) { + logger.info("Config:{}",config); + JobRunner jobRunner = new JobRunner(config); + jobRunner.run(); + } + } + + private void _setupSystemsUtils() { + // Setup Utils + if (!isLocalMode) + SystemsUtils.setHadoopConfigHome(yarnConfHome); + SystemsUtils.setZookeeper(zookeeper); + } + + /* + * Setter methods + */ + public static SamzaEngine getEngine() { + return engine; + } + + public SamzaEngine setZooKeeper(String zk) { + this.zookeeper = zk; + return this; + } + + public SamzaEngine setKafka(String kafka) { + this.kafka = kafka; + return this; + } + + public SamzaEngine setKafkaReplicationFactor(int replicationFactor) { + this.kafkaReplicationFactor = replicationFactor; + return this; + } + + public SamzaEngine setCheckpointFrequency(int freq) { + this.checkpointFrequency = freq; + return this; + } + + public SamzaEngine setLocalMode(boolean isLocal) { + this.isLocalMode = isLocal; + return this; + } + + public SamzaEngine setYarnPackage(String yarnPackagePath) { + this.yarnPackagePath = yarnPackagePath; + return this; + } + + public SamzaEngine setConfigHome(String configHome) { + this.yarnConfHome = configHome; + return this; + } + + public SamzaEngine setAMMemory(int mem) { + this.amMem = mem; + return this; + } + + public SamzaEngine setContainerMemory(int mem) { + this.containerMem = mem; + return this; + } + + public SamzaEngine setPiPerContainerRatio(int piPerContainer) { + this.piPerContainerRatio = piPerContainer; + return this; + } + + public SamzaEngine setKryoRegisterFile(String registerFile) { + this.kryoRegisterFile = registerFile; + return this; + } + + /** + * Submit a list of Samza jobs correspond to the submitted + * topology + * + * @param topo + * the submitted topology + */ + public static void submitTopology(SamzaTopology topo) { + // Setup SystemsUtils + engine._setupSystemsUtils(); + + // Submit topology + engine._submitTopology(topo); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java new file mode 100644 index 0000000..e89d789 --- /dev/null +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java @@ -0,0 +1,222 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.BlockingEnvelopeMap; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.task.InitableTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.EntranceProcessor; +import com.yahoo.labs.samoa.topology.AbstractEntranceProcessingItem; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.utils.SamzaConfigFactory; +import com.yahoo.labs.samoa.utils.SystemsUtils; + +/** + * EntranceProcessingItem for Samza + * which is also a Samza task (StreamTask & InitableTask) + * + * @author Anh Thu Vu + * + */ +public class SamzaEntranceProcessingItem extends AbstractEntranceProcessingItem + implements SamzaProcessingNode, Serializable, StreamTask, InitableTask { + + /** + * + */ + private static final long serialVersionUID = 7157734520046135039L; + + /* + * Constructors + */ + public SamzaEntranceProcessingItem(EntranceProcessor processor) { + super(processor); + } + + // Need this so Samza can initialize a StreamTask + public SamzaEntranceProcessingItem() {} + + /* + * Simple setters, getters + */ + @Override + public int addOutputStream(SamzaStream stream) { + this.setOutputStream(stream); + return 1; // entrance PI should have only 1 output stream + } + + /* + * Serialization + */ + private Object writeReplace() { + return new SerializationProxy(this); + } + + private static class SerializationProxy implements Serializable { + /** + * + */ + private static final long serialVersionUID = 313907132721414634L; + + private EntranceProcessor processor; + private SamzaStream outputStream; + private String name; + + public SerializationProxy(SamzaEntranceProcessingItem epi) { + this.processor = epi.getProcessor(); + this.outputStream = (SamzaStream)epi.getOutputStream(); + this.name = epi.getName(); + } + } + + /* + * Implement Samza Task + */ + @Override + public void init(Config config, TaskContext context) throws Exception { + String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY); + if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property is set , otherwise, assume we are running in + // local mode and ignore this + SystemsUtils.setHadoopConfigHome(yarnConfHome); + + String filename = config.get(SamzaConfigFactory.FILE_KEY); + String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY); + + this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY)); + SerializationProxy wrapper = (SerializationProxy) SystemsUtils.deserializeObjectFromFileAndKey(filesystem, filename, this.getName()); + this.setOutputStream(wrapper.outputStream); + SamzaStream output = (SamzaStream)this.getOutputStream(); + if (output != null) // if output stream exists, set it up + output.onCreate(); + } + + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { + SamzaStream output = (SamzaStream)this.getOutputStream(); + if (output == null) return; // if there is no output stream, do nothing + output.setCollector(collector); + ContentEvent event = (ContentEvent) envelope.getMessage(); + output.put(event); + } + + /* + * Implementation of Samza's SystemConsumer to get events from source + * and feed to SAMOA system + * + */ + /* Current implementation: buffer the incoming events and send a batch + * of them when poll() is called by Samza system. + * + * Currently: it has a "soft" limit on the size of the buffer: + * when the buffer size reaches the limit, the reading thread will sleep + * for 100ms. + * A hard limit can be achieved by overriding the method + * protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() + * of BlockingEnvelopeMap + * But then we have handle the case when the queue is full. + * + */ + public static class SamoaSystemConsumer extends BlockingEnvelopeMap { + + private EntranceProcessor entranceProcessor = null; + private SystemStreamPartition systemStreamPartition; + + private static final Logger logger = LoggerFactory.getLogger(SamoaSystemConsumer.class); + + public SamoaSystemConsumer(String systemName, Config config) { + String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY); + if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property is set , otherwise, assume we are running in + // local mode and ignore this + SystemsUtils.setHadoopConfigHome(yarnConfHome); + + String filename = config.get(SamzaConfigFactory.FILE_KEY); + String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY); + String name = config.get(SamzaConfigFactory.JOB_NAME_KEY); + SerializationProxy wrapper = (SerializationProxy) SystemsUtils.deserializeObjectFromFileAndKey(filesystem, filename, name); + + this.entranceProcessor = wrapper.processor; + this.entranceProcessor.onCreate(0); + + // Internal stream from SystemConsumer to EntranceTask, so we + // need only one partition + this.systemStreamPartition = new SystemStreamPartition(systemName, wrapper.name, new Partition(0)); + } + + @Override + public void start() { + Thread processorPollingThread = new Thread( + new Runnable() { + @Override + public void run() { + try { + pollingEntranceProcessor(); + setIsAtHead(systemStreamPartition, true); + } catch (InterruptedException e) { + e.getStackTrace(); + stop(); + } + } + } + ); + + processorPollingThread.start(); + } + + @Override + public void stop() { + + } + + private void pollingEntranceProcessor() throws InterruptedException { + int messageCnt = 0; + while(!this.entranceProcessor.isFinished()) { + messageCnt = this.getNumMessagesInQueue(systemStreamPartition); + if (this.entranceProcessor.hasNext() && messageCnt < 10000) { // soft limit on the size of the queue + this.put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition,null, null,this.entranceProcessor.nextEvent())); + } else { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + break; + } + } + } + + // Send last event + this.put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition,null, null,this.entranceProcessor.nextEvent())); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java new file mode 100644 index 0000000..db72e7c --- /dev/null +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java @@ -0,0 +1,165 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Serializable; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.topology.AbstractProcessingItem; +import com.yahoo.labs.samoa.topology.ProcessingItem; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.impl.SamzaStream.SamzaSystemStream; +import com.yahoo.labs.samoa.utils.PartitioningScheme; +import com.yahoo.labs.samoa.utils.SamzaConfigFactory; +import com.yahoo.labs.samoa.utils.SystemsUtils; +import com.yahoo.labs.samoa.utils.StreamDestination; + +import org.apache.samza.config.Config; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.task.InitableTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; + +/** + * ProcessingItem for Samza + * which is also a Samza task (StreamTask and InitableTask) + * + * @author Anh Thu Vu + */ +public class SamzaProcessingItem extends AbstractProcessingItem + implements SamzaProcessingNode, Serializable, StreamTask, InitableTask { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private Set<SamzaSystemStream> inputStreams; // input streams: system.stream + private List<SamzaStream> outputStreams; + + /* + * Constructors + */ + // Need this so Samza can initialize a StreamTask + public SamzaProcessingItem() {} + + /* + * Implement com.yahoo.labs.samoa.topology.ProcessingItem + */ + public SamzaProcessingItem(Processor processor, int parallelismHint) { + super(processor, parallelismHint); + this.inputStreams = new HashSet<SamzaSystemStream>(); + this.outputStreams = new LinkedList<SamzaStream>(); + } + + /* + * Simple setters, getters + */ + public Set<SamzaSystemStream> getInputStreams() { + return this.inputStreams; + } + + /* + * Extends AbstractProcessingItem + */ + @Override + protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { + SamzaSystemStream stream = ((SamzaStream) inputStream).addDestination(new StreamDestination(this,this.getParallelism(),scheme)); + this.inputStreams.add(stream); + return this; + } + + /* + * Implement com.yahoo.samoa.topology.impl.SamzaProcessingNode + */ + @Override + public int addOutputStream(SamzaStream stream) { + this.outputStreams.add(stream); + return this.outputStreams.size(); + } + + public List<SamzaStream> getOutputStreams() { + return this.outputStreams; + } + + /* + * Implement Samza task + */ + @Override + public void init(Config config, TaskContext context) throws Exception { + String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY); + if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property is set , otherwise, assume we are running in + // local mode and ignore this + SystemsUtils.setHadoopConfigHome(yarnConfHome); + + String filename = config.get(SamzaConfigFactory.FILE_KEY); + String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY); + this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY)); + SerializationProxy wrapper = (SerializationProxy) SystemsUtils.deserializeObjectFromFileAndKey(filesystem, filename, this.getName()); + this.setProcessor(wrapper.processor); + this.outputStreams = wrapper.outputStreams; + + // Init Processor and Streams + this.getProcessor().onCreate(0); + for (SamzaStream stream:this.outputStreams) { + stream.onCreate(); + } + + } + + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { + for (SamzaStream stream:this.outputStreams) { + stream.setCollector(collector); + } + this.getProcessor().process((ContentEvent) envelope.getMessage()); + } + + /* + * SerializationProxy + */ + private Object writeReplace() { + return new SerializationProxy(this); + } + + private static class SerializationProxy implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1534643987559070336L; + + private Processor processor; + private List<SamzaStream> outputStreams; + + public SerializationProxy(SamzaProcessingItem pi) { + this.processor = pi.getProcessor(); + this.outputStreams = pi.getOutputStreams(); + } + } + +} \ No newline at end of file
