http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java deleted file mode 100644 index e1d4754..0000000 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java +++ /dev/null @@ -1,120 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.s4.core.App; -import org.apache.s4.core.ProcessingElement; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.topology.EntranceProcessingItem; -import com.yahoo.labs.samoa.topology.Stream; - -// TODO adapt this entrance processing item to connect to external streams so the application doesnt need to use an AdapterApp - -public class S4EntranceProcessingItem extends ProcessingElement implements EntranceProcessingItem { - - private EntranceProcessor entranceProcessor; - // private S4DoTask app; - private int parallelism; - protected Stream outputStream; - - /** - * Constructor of an S4 entrance processing item. - * - * @param app - * : S4 application - */ - public S4EntranceProcessingItem(EntranceProcessor entranceProcessor, App app) { - super(app); - this.entranceProcessor = entranceProcessor; - // this.app = (S4DoTask) app; - // this.setSingleton(true); - } - - public void setParallelism(int parallelism) { - this.parallelism = parallelism; - } - - public int getParallelism() { - return this.parallelism; - } - - @Override - public EntranceProcessor getProcessor() { - return this.entranceProcessor; - } - - // - // @Override - // public void put(Instance inst) { - // // do nothing - // // may not needed - // } - - @Override - protected void onCreate() { - // was commented - if (this.entranceProcessor != null) { - // TODO revisit if we need to change it to a clone() call - this.entranceProcessor = (EntranceProcessor) this.entranceProcessor.newProcessor(this.entranceProcessor); - this.entranceProcessor.onCreate(Integer.parseInt(getId())); - } - } - - @Override - protected void onRemove() { - // do nothing - } - - // - // /** - // * Sets the entrance processing item processor. - // * - // * @param processor - // */ - // public void setProcessor(Processor processor) { - // this.entranceProcessor = processor; - // } - - @Override - public void setName(String name) { - super.setName(name); - } - - @Override - public EntranceProcessingItem setOutputStream(Stream stream) { - if (this.outputStream != null) - throw new IllegalStateException("Output stream for an EntrancePI sohuld be initialized only once"); - this.outputStream = stream; - return this; - } - - public boolean injectNextEvent() { - if (entranceProcessor.hasNext()) { - ContentEvent nextEvent = this.entranceProcessor.nextEvent(); - outputStream.put(nextEvent); - return entranceProcessor.hasNext(); - } else - return false; - // return !nextEvent.isLastEvent(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java deleted file mode 100644 index 8ef137c..0000000 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java +++ /dev/null @@ -1,92 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** - * License - */ - -import net.jcip.annotations.Immutable; - -import org.apache.s4.base.Event; - -import com.yahoo.labs.samoa.core.ContentEvent; - -/** - * The Class InstanceEvent. - */ -@Immutable -final public class S4Event extends Event { - - private String key; - - public String getKey() { - return key; - } - - public void setKey(String key) { - this.key = key; - } - - /** The content event. */ - private ContentEvent contentEvent; - - /** - * Instantiates a new instance event. - */ - public S4Event() { - // Needed for serialization of kryo - } - - /** - * Instantiates a new instance event. - * - * @param contentEvent - * the content event - */ - public S4Event(ContentEvent contentEvent) { - if (contentEvent != null) { - this.contentEvent = contentEvent; - this.key = contentEvent.getKey(); - - } - } - - /** - * Gets the content event. - * - * @return the content event - */ - public ContentEvent getContentEvent() { - return contentEvent; - } - - /** - * Sets the content event. - * - * @param contentEvent - * the new content event - */ - public void setContentEvent(ContentEvent contentEvent) { - this.contentEvent = contentEvent; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java deleted file mode 100644 index ea88094..0000000 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java +++ /dev/null @@ -1,187 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -import org.apache.s4.base.KeyFinder; -import org.apache.s4.core.App; -import org.apache.s4.core.ProcessingElement; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.topology.ProcessingItem; -import com.yahoo.labs.samoa.topology.Stream; - -/** - * S4 Platform platform specific processing item, inherits from S4 ProcessinElemnt. - * - * @author severien - * - */ -public class S4ProcessingItem extends ProcessingElement implements - ProcessingItem { - - public static final Logger logger = LoggerFactory - .getLogger(S4ProcessingItem.class); - - private Processor processor; - private int paralellismLevel; - private S4DoTask app; - - private static final String NAME = "PROCESSING-ITEM-"; - private static int OBJ_COUNTER = 0; - - /** - * Constructor of S4 ProcessingItem. - * - * @param app - * : S4 application - */ - public S4ProcessingItem(App app) { - super(app); - super.setName(NAME + OBJ_COUNTER); - OBJ_COUNTER++; - this.app = (S4DoTask) app; - this.paralellismLevel = 1; - } - - @Override - public String getName() { - return super.getName(); - } - - /** - * Gets processing item paralellism level. - * - * @return int - */ - public int getParalellismLevel() { - return paralellismLevel; - } - - /** - * Sets processing item paralellism level. - * - * @param paralellismLevel - */ - public void setParalellismLevel(int paralellismLevel) { - this.paralellismLevel = paralellismLevel; - } - - /** - * onEvent method. - * - * @param event - */ - public void onEvent(S4Event event) { - if (processor.process(event.getContentEvent()) == true) { - close(); - } - } - - /** - * Sets S4 processing item processor. - * - * @param processor - */ - public void setProcessor(Processor processor) { - this.processor = processor; - } - - // Methods from ProcessingItem - @Override - public Processor getProcessor() { - return processor; - } - - /** - * KeyFinder sets the keys for a specific event. - * - * @return KeyFinder - */ - private KeyFinder<S4Event> getKeyFinder() { - KeyFinder<S4Event> keyFinder = new KeyFinder<S4Event>() { - @Override - public List<String> get(S4Event s4event) { - List<String> results = new ArrayList<String>(); - results.add(s4event.getKey()); - return results; - } - }; - - return keyFinder; - } - - @Override - public ProcessingItem connectInputAllStream(Stream inputStream) { - - S4Stream stream = (S4Stream) inputStream; - stream.setParallelism(this.paralellismLevel); - stream.addStream(inputStream.getStreamId(), - getKeyFinder(), this, S4Stream.BROADCAST); - return this; - } - - @Override - public ProcessingItem connectInputKeyStream(Stream inputStream) { - - S4Stream stream = (S4Stream) inputStream; - stream.setParallelism(this.paralellismLevel); - stream.addStream(inputStream.getStreamId(), - getKeyFinder(), this, S4Stream.GROUP_BY_KEY); - - return this; - } - - @Override - public ProcessingItem connectInputShuffleStream(Stream inputStream) { - S4Stream stream = (S4Stream) inputStream; - stream.setParallelism(this.paralellismLevel); - stream.addStream(inputStream.getStreamId(), - getKeyFinder(), this, S4Stream.SHUFFLE); - - return this; - } - - // Methods from ProcessingElement - @Override - protected void onCreate() { - logger.debug("PE ID {}", getId()); - if (this.processor != null) { - this.processor = this.processor.newProcessor(this.processor); - this.processor.onCreate(Integer.parseInt(getId())); - } - } - - @Override - protected void onRemove() { - // do nothing - } - - @Override - public int getParallelism() { - return this.paralellismLevel; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java deleted file mode 100644 index 46c979c..0000000 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java +++ /dev/null @@ -1,185 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.s4.base.KeyFinder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.topology.AbstractStream; - -/** - * S4 Platform specific stream. - * - * @author severien - * - */ -public class S4Stream extends AbstractStream { - - public static final int SHUFFLE = 0; - public static final int GROUP_BY_KEY = 1; - public static final int BROADCAST = 2; - - private static final Logger logger = LoggerFactory.getLogger(S4Stream.class); - - private S4DoTask app; - private int processingItemParalellism; - private int shuffleCounter; - - private static final String NAME = "STREAM-"; - private static int OBJ_COUNTER = 0; - - /* The stream list */ - public List<StreamType> streams; - - public S4Stream(S4DoTask app) { - super(); - this.app = app; - this.processingItemParalellism = 1; - this.shuffleCounter = 0; - this.streams = new ArrayList<StreamType>(); - this.setStreamId(NAME + OBJ_COUNTER); - OBJ_COUNTER++; - } - - public S4Stream(S4DoTask app, S4ProcessingItem pi) { - super(); - this.app = app; - this.processingItemParalellism = 1; - this.shuffleCounter = 0; - this.streams = new ArrayList<StreamType>(); - this.setStreamId(NAME + OBJ_COUNTER); - OBJ_COUNTER++; - - } - - /** - * - * @return - */ - public int getParallelism() { - return processingItemParalellism; - } - - public void setParallelism(int parallelism) { - this.processingItemParalellism = parallelism; - } - - public void addStream(String streamID, KeyFinder<S4Event> finder, - S4ProcessingItem pi, int type) { - String streamName = streamID + "_" + pi.getName(); - org.apache.s4.core.Stream<S4Event> stream = this.app.createStream( - streamName, pi); - stream.setName(streamName); - logger.debug("Stream name S4Stream {}", streamName); - if (finder != null) - stream.setKey(finder); - this.streams.add(new StreamType(stream, type)); - - } - - @Override - public void put(ContentEvent event) { - - for (int i = 0; i < streams.size(); i++) { - - switch (streams.get(i).getType()) { - case SHUFFLE: - S4Event s4event = new S4Event(event); - s4event.setStreamId(streams.get(i).getStream().getName()); - if (getParallelism() == 1) { - s4event.setKey("0"); - } else { - s4event.setKey(Integer.toString(shuffleCounter)); - } - streams.get(i).getStream().put(s4event); - shuffleCounter++; - if (shuffleCounter >= (getParallelism())) { - shuffleCounter = 0; - } - - break; - - case GROUP_BY_KEY: - S4Event s4event1 = new S4Event(event); - s4event1.setStreamId(streams.get(i).getStream().getName()); - HashCodeBuilder hb = new HashCodeBuilder(); - hb.append(event.getKey()); - String key = Integer.toString(hb.build() % getParallelism()); - s4event1.setKey(key); - streams.get(i).getStream().put(s4event1); - break; - - case BROADCAST: - for (int p = 0; p < this.getParallelism(); p++) { - S4Event s4event2 = new S4Event(event); - s4event2.setStreamId(streams.get(i).getStream().getName()); - s4event2.setKey(Integer.toString(p)); - streams.get(i).getStream().put(s4event2); - } - break; - - default: - break; - } - - } - - } - - /** - * Subclass for definig stream connection type - * - * @author severien - * - */ - class StreamType { - org.apache.s4.core.Stream<S4Event> stream; - int type; - - public StreamType(org.apache.s4.core.Stream<S4Event> s, int t) { - this.stream = s; - this.type = t; - } - - public org.apache.s4.core.Stream<S4Event> getStream() { - return stream; - } - - public void setStream(org.apache.s4.core.Stream<S4Event> stream) { - this.stream = stream; - } - - public int getType() { - return type; - } - - public void setType(int type) { - this.type = type; - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java deleted file mode 100644 index f6bca87..0000000 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java +++ /dev/null @@ -1,145 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.File; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.s4.core.util.AppConfig; -import org.apache.s4.core.util.ParsingUtils; -import org.apache.s4.deploy.DeploymentUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.yahoo.labs.samoa.tasks.Task; -import com.yahoo.labs.samoa.topology.ISubmitter; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.Parameters; - -public class S4Submitter implements ISubmitter { - - private static Logger logger = LoggerFactory.getLogger(S4Submitter.class); - - @Override - public void deployTask(Task task) { - // TODO: Get application FROM HTTP server - // TODO: Initializa a http server to serve the app package - - String appURIString = null; - // File app = new File(System.getProperty("user.dir") - // + "/src/site/dist/SAMOA-S4-0.1-dist.jar"); - - // TODO: String app url http://localhost:8000/SAMOA-S4-0.1-dist.jar - try { - URL appURL = new URL("http://localhost:8000/SAMOA-S4-0.1.jar"); - appURIString = appURL.toString(); - } catch (MalformedURLException e1) { - e1.printStackTrace(); - } - - // try { - // appURIString = app.toURI().toURL().toString(); - // } catch (MalformedURLException e) { - // e.printStackTrace(); - // } - if (task == null) { - logger.error("Can't execute since evaluation task is not set!"); - return; - } else { - logger.info("Deploying SAMOA S4 task [{}] from location [{}]. ", - task.getClass().getSimpleName(), appURIString); - } - - String[] args = { "-c=testCluster2", - "-appClass=" + S4DoTask.class.getName(), - "-appName=" + "samoaApp", - "-p=evalTask=" + task.getClass().getSimpleName(), - "-zk=localhost:2181", "-s4r=" + appURIString, "-emc=" + SamoaSerializerModule.class.getName() }; - // "-emc=" + S4MOAModule.class.getName(), - // "@" + - // Resources.getResource("s4moa.properties").getFile(), - - S4Config s4config = new S4Config(); - JCommander jc = new JCommander(s4config); - jc.parse(args); - - Map<String, String> namedParameters = new HashMap<String, String>(); - for (String parameter : s4config.namedParameters) { - String[] param = parameter.split("="); - namedParameters.put(param[0], param[1]); - } - - AppConfig config = new AppConfig.Builder() - .appClassName(s4config.appClass).appName(s4config.appName) - .appURI(s4config.appURI).namedParameters(namedParameters) - .build(); - - DeploymentUtils.initAppConfig(config, s4config.clusterName, true, - s4config.zkString); - - System.out.println("Suposedly deployed on S4"); - } - - public void initHTTPServer() { - - } - - @Parameters(separators = "=") - public static class S4Config { - - @Parameter(names = { "-c", "-cluster" }, description = "Cluster name", required = true) - String clusterName = null; - - @Parameter(names = "-appClass", description = "Main App class", required = false) - String appClass = null; - - @Parameter(names = "-appName", description = "Application name", required = false) - String appName = null; - - @Parameter(names = "-s4r", description = "Application URI", required = false) - String appURI = null; - - @Parameter(names = "-zk", description = "ZooKeeper connection string", required = false) - String zkString = null; - - @Parameter(names = { "-extraModulesClasses", "-emc" }, description = "Comma-separated list of additional configuration modules (they will be instantiated through their constructor without arguments).", required = false) - List<String> extraModules = new ArrayList<String>(); - - @Parameter(names = { "-p", "-namedStringParameters" }, description = "Comma-separated list of inline configuration " - + "parameters, taking precedence over homonymous configuration parameters from configuration files. " - + "Syntax: '-p=name1=value1,name2=value2 '", required = false, converter = ParsingUtils.InlineConfigParameterConverter.class) - List<String> namedParameters = new ArrayList<String>(); - - } - - @Override - public void setLocal(boolean bool) { - // TODO S4 works the same for local and distributed environments - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java deleted file mode 100644 index 5eb0ccf..0000000 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java +++ /dev/null @@ -1,63 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.topology.EntranceProcessingItem; -import com.yahoo.labs.samoa.topology.AbstractTopology; - -public class S4Topology extends AbstractTopology { - - // CASEY: it seems evaluationTask is not used. - // Remove it for now - - // private String _evaluationTask; - - // S4Topology(String topoName, String evalTask) { - // super(topoName); - // } - // - // S4Topology(String topoName) { - // this(topoName, null); - // } - - // @Override - // public void setEvaluationTask(String evalTask) { - // _evaluationTask = evalTask; - // } - // - // @Override - // public String getEvaluationTask() { - // return _evaluationTask; - // } - - S4Topology(String topoName) { - super(topoName); - } - - protected EntranceProcessingItem getEntranceProcessingItem() { - if (this.getEntranceProcessingItems() == null) - return null; - if (this.getEntranceProcessingItems().size() < 1) - return null; - // TODO: support multiple entrance PIs - return (EntranceProcessingItem) this.getEntranceProcessingItems().toArray()[0]; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java deleted file mode 100644 index b809284..0000000 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java +++ /dev/null @@ -1,99 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.nio.ByteBuffer; - -import org.apache.s4.base.SerializerDeserializer; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; -import com.yahoo.labs.samoa.learners.classifiers.trees.AttributeContentEvent; -import com.yahoo.labs.samoa.learners.classifiers.trees.ComputeContentEvent; - -public class SamoaSerializer implements SerializerDeserializer { - - private ThreadLocal<Kryo> kryoThreadLocal; - private ThreadLocal<Output> outputThreadLocal; - - private int initialBufferSize = 2048; - private int maxBufferSize = 256 * 1024; - - public void setMaxBufferSize(int maxBufferSize) { - this.maxBufferSize = maxBufferSize; - } - - /** - * - * @param classLoader - * classloader able to handle classes to serialize/deserialize. For instance, application-level events can - * only be handled by the application classloader. - */ - @Inject - public SamoaSerializer(@Assisted final ClassLoader classLoader) { - kryoThreadLocal = new ThreadLocal<Kryo>() { - - @Override - protected Kryo initialValue() { - Kryo kryo = new Kryo(); - kryo.setClassLoader(classLoader); - kryo.register(AttributeContentEvent.class, new AttributeContentEvent.AttributeCEFullPrecSerializer()); - kryo.register(ComputeContentEvent.class, new ComputeContentEvent.ComputeCEFullPrecSerializer()); - kryo.setRegistrationRequired(false); - return kryo; - } - }; - - outputThreadLocal = new ThreadLocal<Output>() { - @Override - protected Output initialValue() { - Output output = new Output(initialBufferSize, maxBufferSize); - return output; - } - }; - - } - - @Override - public Object deserialize(ByteBuffer rawMessage) { - Input input = new Input(rawMessage.array()); - try { - return kryoThreadLocal.get().readClassAndObject(input); - } finally { - input.close(); - } - } - - @SuppressWarnings("resource") - @Override - public ByteBuffer serialize(Object message) { - Output output = outputThreadLocal.get(); - try { - kryoThreadLocal.get().writeClassAndObject(output, message); - return ByteBuffer.wrap(output.toBytes()); - } finally { - output.clear(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java deleted file mode 100644 index d47b143..0000000 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.s4.base.SerializerDeserializer; - -import com.google.inject.AbstractModule; - -public class SamoaSerializerModule extends AbstractModule { - - @Override - protected void configure() { - bind(SerializerDeserializer.class).to(SamoaSerializer.class); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ComponentFactory.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ComponentFactory.java new file mode 100644 index 0000000..ebd18f9 --- /dev/null +++ b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ComponentFactory.java @@ -0,0 +1,97 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * S4 Platform Component Factory + * + * @author severien + * + */ +public class S4ComponentFactory implements ComponentFactory { + + public static final Logger logger = LoggerFactory.getLogger(S4ComponentFactory.class); + protected S4DoTask app; + + @Override + public ProcessingItem createPi(Processor processor, int paralellism) { + S4ProcessingItem processingItem = new S4ProcessingItem(app); + // TODO refactor how to set the paralellism level + processingItem.setParalellismLevel(paralellism); + processingItem.setProcessor(processor); + + return processingItem; + } + + @Override + public ProcessingItem createPi(Processor processor) { + return this.createPi(processor, 1); + } + + @Override + public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { + // TODO Create source Entry processing item that connects to an external + // stream + S4EntranceProcessingItem entrancePi = new S4EntranceProcessingItem(entranceProcessor, app); + entrancePi.setParallelism(1); // FIXME should not be set to 1 statically + return entrancePi; + } + + @Override + public Stream createStream(IProcessingItem sourcePi) { + S4Stream aStream = new S4Stream(app); + return aStream; + } + + @Override + public Topology createTopology(String topoName) { + return new S4Topology(topoName); + } + + /** + * Initialization method. + * + * @param evalTask + */ + public void init(String evalTask) { + // Task is initiated in the DoTaskApp + } + + /** + * Sets S4 application. + * + * @param app + */ + public void setApp(S4DoTask app) { + this.app = app; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4DoTask.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4DoTask.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4DoTask.java new file mode 100644 index 0000000..d52e981 --- /dev/null +++ b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4DoTask.java @@ -0,0 +1,268 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.s4.base.Event; +import org.apache.s4.base.KeyFinder; +import org.apache.s4.core.App; +import org.apache.s4.core.ProcessingElement; +import org.apache.s4.core.Stream; +import org.apache.samoa.core.Globals; +import org.apache.samoa.tasks.Task; +import org.apache.samoa.topology.ComponentFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.Option; +import com.github.javacliparser.ClassOption; +import com.google.inject.Inject; +import com.google.inject.name.Named; + +/* + * S4 App that runs samoa Tasks + * + * */ + +/** + * The Class DoTaskApp. + */ +final public class S4DoTask extends App { + + private final Logger logger = LoggerFactory.getLogger(S4DoTask.class); + Task task; + + @Inject + @Named("evalTask") + public String evalTask; + + public S4DoTask() { + super(); + } + + /** The engine. */ + protected ComponentFactory componentFactory; + + /** + * Gets the factory. + * + * @return the factory + */ + public ComponentFactory getFactory() { + return componentFactory; + } + + /** + * Sets the factory. + * + * @param factory + * the new factory + */ + public void setFactory(ComponentFactory factory) { + this.componentFactory = factory; + } + + /* + * Build the application + * + * @see org.apache.s4.core.App#onInit() + */ + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#onInit() + */ + @Override + protected void onInit() { + logger.info("DoTaskApp onInit"); + // ConsoleReporters prints S4 metrics + // MetricsRegistry mr = new MetricsRegistry(); + // + // CsvReporter.enable(new File(System.getProperty("user.home") + // + "/monitor/"), 10, TimeUnit.SECONDS); + // ConsoleReporter.enable(10, TimeUnit.SECONDS); + try { + System.err.println(); + System.err.println(Globals.getWorkbenchInfoString()); + System.err.println(); + + } catch (Exception ex) { + ex.printStackTrace(); + } + S4ComponentFactory factory = new S4ComponentFactory(); + factory.setApp(this); + + // logger.debug("LC {}", lc); + + // task = TaskProvider.getTask(evalTask); + + // EXAMPLE OPTIONS + // -l Clustream -g Clustream -i 100000 -s (RandomRBFGeneratorEvents -K + // 5 -N 0.0) + // String[] args = new String[] {evalTask,"-l", "Clustream","-g", + // "Clustream", "-i", "100000", "-s", "(RamdomRBFGeneratorsEvents", + // "-K", "5", "-N", "0.0)"}; + // String[] args = new String[] { evalTask, "-l", "clustream.Clustream", + // "-g", "clustream.Clustream", "-i", "100000", "-s", + // "(RandomRBFGeneratorEvents", "-K", "5", "-N", "0.0)" }; + logger.debug("PARAMETERS {}", evalTask); + // params = params.replace(":", " "); + List<String> parameters = new ArrayList<String>(); + // parameters.add(evalTask); + try { + parameters.addAll(Arrays.asList(URLDecoder.decode(evalTask, "UTF-8").split(" "))); + } catch (UnsupportedEncodingException ex) { + ex.printStackTrace(); + } + String[] args = parameters.toArray(new String[0]); + Option[] extraOptions = new Option[] {}; + // build a single string by concatenating cli options + StringBuilder cliString = new StringBuilder(); + for (int i = 0; i < args.length; i++) { + cliString.append(" ").append(args[i]); + } + + // parse options + try { + task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions); + task.setFactory(factory); + task.init(); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#onStart() + */ + @Override + protected void onStart() { + logger.info("Starting DoTaskApp... App Partition [{}]", this.getPartitionId()); + // <<<<<<< HEAD Task doesn't have start in latest storm-impl + // TODO change the way the app starts + // if (this.getPartitionId() == 0) + S4Topology s4topology = (S4Topology) getTask().getTopology(); + S4EntranceProcessingItem epi = (S4EntranceProcessingItem) s4topology.getEntranceProcessingItem(); + while (epi.injectNextEvent()) + // inject events from the EntrancePI + ; + } + + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#onClose() + */ + @Override + protected void onClose() { + System.out.println("Closing DoTaskApp..."); + + } + + /** + * Gets the task. + * + * @return the task + */ + public Task getTask() { + return task; + } + + // These methods are protected in App and can not be accessed from outside. + // They are + // called from parallel classifiers and evaluations. Is there a better way + // to do that? + + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#createPE(java.lang.Class) + */ + @Override + public <T extends ProcessingElement> T createPE(Class<T> type) { + return super.createPE(type); + } + + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#createStream(java.lang.String, + * org.apache.s4.base.KeyFinder, org.apache.s4.core.ProcessingElement[]) + */ + @Override + public <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder, + ProcessingElement... processingElements) { + return super.createStream(name, finder, processingElements); + } + + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#createStream(java.lang.String, + * org.apache.s4.core.ProcessingElement[]) + */ + @Override + public <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) { + return super.createStream(name, processingElements); + } + + // @com.beust.jcommander.Parameters(separators = "=") + // class Parameters { + // + // @Parameter(names={"-lc","-local"}, description="Local clustering method") + // private String localClustering; + // + // @Parameter(names={"-gc","-global"}, + // description="Global clustering method") + // private String globalClustering; + // + // } + // + // class ParametersConverter {// implements IStringConverter<String[]> { + // + // + // public String[] convertToArgs(String value) { + // + // String[] params = value.split(","); + // String[] args = new String[params.length*2]; + // for(int i=0; i<params.length ; i++) { + // args[i] = params[i].split("=")[0]; + // args[i+1] = params[i].split("=")[1]; + // i++; + // } + // return args; + // } + // + // } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4EntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4EntranceProcessingItem.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4EntranceProcessingItem.java new file mode 100644 index 0000000..771cbc8 --- /dev/null +++ b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4EntranceProcessingItem.java @@ -0,0 +1,119 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.s4.core.App; +import org.apache.s4.core.ProcessingElement; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.Stream; + +// TODO adapt this entrance processing item to connect to external streams so the application doesnt need to use an AdapterApp + +public class S4EntranceProcessingItem extends ProcessingElement implements EntranceProcessingItem { + + private EntranceProcessor entranceProcessor; + // private S4DoTask app; + private int parallelism; + protected Stream outputStream; + + /** + * Constructor of an S4 entrance processing item. + * + * @param app + * : S4 application + */ + public S4EntranceProcessingItem(EntranceProcessor entranceProcessor, App app) { + super(app); + this.entranceProcessor = entranceProcessor; + // this.app = (S4DoTask) app; + // this.setSingleton(true); + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + public int getParallelism() { + return this.parallelism; + } + + @Override + public EntranceProcessor getProcessor() { + return this.entranceProcessor; + } + + // + // @Override + // public void put(Instance inst) { + // // do nothing + // // may not needed + // } + + @Override + protected void onCreate() { + // was commented + if (this.entranceProcessor != null) { + // TODO revisit if we need to change it to a clone() call + this.entranceProcessor = (EntranceProcessor) this.entranceProcessor.newProcessor(this.entranceProcessor); + this.entranceProcessor.onCreate(Integer.parseInt(getId())); + } + } + + @Override + protected void onRemove() { + // do nothing + } + + // + // /** + // * Sets the entrance processing item processor. + // * + // * @param processor + // */ + // public void setProcessor(Processor processor) { + // this.entranceProcessor = processor; + // } + + @Override + public void setName(String name) { + super.setName(name); + } + + @Override + public EntranceProcessingItem setOutputStream(Stream stream) { + if (this.outputStream != null) + throw new IllegalStateException("Output stream for an EntrancePI sohuld be initialized only once"); + this.outputStream = stream; + return this; + } + + public boolean injectNextEvent() { + if (entranceProcessor.hasNext()) { + ContentEvent nextEvent = this.entranceProcessor.nextEvent(); + outputStream.put(nextEvent); + return entranceProcessor.hasNext(); + } else + return false; + // return !nextEvent.isLastEvent(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Event.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Event.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Event.java new file mode 100644 index 0000000..154715b --- /dev/null +++ b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Event.java @@ -0,0 +1,91 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import net.jcip.annotations.Immutable; + +import org.apache.s4.base.Event; +import org.apache.samoa.core.ContentEvent; + +/** + * The Class InstanceEvent. + */ +@Immutable +final public class S4Event extends Event { + + private String key; + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + /** The content event. */ + private ContentEvent contentEvent; + + /** + * Instantiates a new instance event. + */ + public S4Event() { + // Needed for serialization of kryo + } + + /** + * Instantiates a new instance event. + * + * @param contentEvent + * the content event + */ + public S4Event(ContentEvent contentEvent) { + if (contentEvent != null) { + this.contentEvent = contentEvent; + this.key = contentEvent.getKey(); + + } + } + + /** + * Gets the content event. + * + * @return the content event + */ + public ContentEvent getContentEvent() { + return contentEvent; + } + + /** + * Sets the content event. + * + * @param contentEvent + * the new content event + */ + public void setContentEvent(ContentEvent contentEvent) { + this.contentEvent = contentEvent; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ProcessingItem.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ProcessingItem.java new file mode 100644 index 0000000..b9c7467 --- /dev/null +++ b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ProcessingItem.java @@ -0,0 +1,186 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.s4.base.KeyFinder; +import org.apache.s4.core.App; +import org.apache.s4.core.ProcessingElement; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * S4 Platform platform specific processing item, inherits from S4 ProcessinElemnt. + * + * @author severien + * + */ +public class S4ProcessingItem extends ProcessingElement implements + ProcessingItem { + + public static final Logger logger = LoggerFactory + .getLogger(S4ProcessingItem.class); + + private Processor processor; + private int paralellismLevel; + private S4DoTask app; + + private static final String NAME = "PROCESSING-ITEM-"; + private static int OBJ_COUNTER = 0; + + /** + * Constructor of S4 ProcessingItem. + * + * @param app + * : S4 application + */ + public S4ProcessingItem(App app) { + super(app); + super.setName(NAME + OBJ_COUNTER); + OBJ_COUNTER++; + this.app = (S4DoTask) app; + this.paralellismLevel = 1; + } + + @Override + public String getName() { + return super.getName(); + } + + /** + * Gets processing item paralellism level. + * + * @return int + */ + public int getParalellismLevel() { + return paralellismLevel; + } + + /** + * Sets processing item paralellism level. + * + * @param paralellismLevel + */ + public void setParalellismLevel(int paralellismLevel) { + this.paralellismLevel = paralellismLevel; + } + + /** + * onEvent method. + * + * @param event + */ + public void onEvent(S4Event event) { + if (processor.process(event.getContentEvent()) == true) { + close(); + } + } + + /** + * Sets S4 processing item processor. + * + * @param processor + */ + public void setProcessor(Processor processor) { + this.processor = processor; + } + + // Methods from ProcessingItem + @Override + public Processor getProcessor() { + return processor; + } + + /** + * KeyFinder sets the keys for a specific event. + * + * @return KeyFinder + */ + private KeyFinder<S4Event> getKeyFinder() { + KeyFinder<S4Event> keyFinder = new KeyFinder<S4Event>() { + @Override + public List<String> get(S4Event s4event) { + List<String> results = new ArrayList<String>(); + results.add(s4event.getKey()); + return results; + } + }; + + return keyFinder; + } + + @Override + public ProcessingItem connectInputAllStream(Stream inputStream) { + + S4Stream stream = (S4Stream) inputStream; + stream.setParallelism(this.paralellismLevel); + stream.addStream(inputStream.getStreamId(), + getKeyFinder(), this, S4Stream.BROADCAST); + return this; + } + + @Override + public ProcessingItem connectInputKeyStream(Stream inputStream) { + + S4Stream stream = (S4Stream) inputStream; + stream.setParallelism(this.paralellismLevel); + stream.addStream(inputStream.getStreamId(), + getKeyFinder(), this, S4Stream.GROUP_BY_KEY); + + return this; + } + + @Override + public ProcessingItem connectInputShuffleStream(Stream inputStream) { + S4Stream stream = (S4Stream) inputStream; + stream.setParallelism(this.paralellismLevel); + stream.addStream(inputStream.getStreamId(), + getKeyFinder(), this, S4Stream.SHUFFLE); + + return this; + } + + // Methods from ProcessingElement + @Override + protected void onCreate() { + logger.debug("PE ID {}", getId()); + if (this.processor != null) { + this.processor = this.processor.newProcessor(this.processor); + this.processor.onCreate(Integer.parseInt(getId())); + } + } + + @Override + protected void onRemove() { + // do nothing + } + + @Override + public int getParallelism() { + return this.paralellismLevel; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Stream.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Stream.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Stream.java new file mode 100644 index 0000000..734462e --- /dev/null +++ b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Stream.java @@ -0,0 +1,184 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.s4.base.KeyFinder; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.topology.AbstractStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * S4 Platform specific stream. + * + * @author severien + * + */ +public class S4Stream extends AbstractStream { + + public static final int SHUFFLE = 0; + public static final int GROUP_BY_KEY = 1; + public static final int BROADCAST = 2; + + private static final Logger logger = LoggerFactory.getLogger(S4Stream.class); + + private S4DoTask app; + private int processingItemParalellism; + private int shuffleCounter; + + private static final String NAME = "STREAM-"; + private static int OBJ_COUNTER = 0; + + /* The stream list */ + public List<StreamType> streams; + + public S4Stream(S4DoTask app) { + super(); + this.app = app; + this.processingItemParalellism = 1; + this.shuffleCounter = 0; + this.streams = new ArrayList<StreamType>(); + this.setStreamId(NAME + OBJ_COUNTER); + OBJ_COUNTER++; + } + + public S4Stream(S4DoTask app, S4ProcessingItem pi) { + super(); + this.app = app; + this.processingItemParalellism = 1; + this.shuffleCounter = 0; + this.streams = new ArrayList<StreamType>(); + this.setStreamId(NAME + OBJ_COUNTER); + OBJ_COUNTER++; + + } + + /** + * + * @return + */ + public int getParallelism() { + return processingItemParalellism; + } + + public void setParallelism(int parallelism) { + this.processingItemParalellism = parallelism; + } + + public void addStream(String streamID, KeyFinder<S4Event> finder, + S4ProcessingItem pi, int type) { + String streamName = streamID + "_" + pi.getName(); + org.apache.s4.core.Stream<S4Event> stream = this.app.createStream( + streamName, pi); + stream.setName(streamName); + logger.debug("Stream name S4Stream {}", streamName); + if (finder != null) + stream.setKey(finder); + this.streams.add(new StreamType(stream, type)); + + } + + @Override + public void put(ContentEvent event) { + + for (int i = 0; i < streams.size(); i++) { + + switch (streams.get(i).getType()) { + case SHUFFLE: + S4Event s4event = new S4Event(event); + s4event.setStreamId(streams.get(i).getStream().getName()); + if (getParallelism() == 1) { + s4event.setKey("0"); + } else { + s4event.setKey(Integer.toString(shuffleCounter)); + } + streams.get(i).getStream().put(s4event); + shuffleCounter++; + if (shuffleCounter >= (getParallelism())) { + shuffleCounter = 0; + } + + break; + + case GROUP_BY_KEY: + S4Event s4event1 = new S4Event(event); + s4event1.setStreamId(streams.get(i).getStream().getName()); + HashCodeBuilder hb = new HashCodeBuilder(); + hb.append(event.getKey()); + String key = Integer.toString(hb.build() % getParallelism()); + s4event1.setKey(key); + streams.get(i).getStream().put(s4event1); + break; + + case BROADCAST: + for (int p = 0; p < this.getParallelism(); p++) { + S4Event s4event2 = new S4Event(event); + s4event2.setStreamId(streams.get(i).getStream().getName()); + s4event2.setKey(Integer.toString(p)); + streams.get(i).getStream().put(s4event2); + } + break; + + default: + break; + } + + } + + } + + /** + * Subclass for definig stream connection type + * + * @author severien + * + */ + class StreamType { + org.apache.s4.core.Stream<S4Event> stream; + int type; + + public StreamType(org.apache.s4.core.Stream<S4Event> s, int t) { + this.stream = s; + this.type = t; + } + + public org.apache.s4.core.Stream<S4Event> getStream() { + return stream; + } + + public void setStream(org.apache.s4.core.Stream<S4Event> stream) { + this.stream = stream; + } + + public int getType() { + return type; + } + + public void setType(int type) { + this.type = type; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Submitter.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Submitter.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Submitter.java new file mode 100644 index 0000000..22807a6 --- /dev/null +++ b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Submitter.java @@ -0,0 +1,144 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.s4.core.util.AppConfig; +import org.apache.s4.core.util.ParsingUtils; +import org.apache.s4.deploy.DeploymentUtils; +import org.apache.samoa.tasks.Task; +import org.apache.samoa.topology.ISubmitter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; + +public class S4Submitter implements ISubmitter { + + private static Logger logger = LoggerFactory.getLogger(S4Submitter.class); + + @Override + public void deployTask(Task task) { + // TODO: Get application FROM HTTP server + // TODO: Initializa a http server to serve the app package + + String appURIString = null; + // File app = new File(System.getProperty("user.dir") + // + "/src/site/dist/SAMOA-S4-0.1-dist.jar"); + + // TODO: String app url http://localhost:8000/SAMOA-S4-0.1-dist.jar + try { + URL appURL = new URL("http://localhost:8000/SAMOA-S4-0.1.jar"); + appURIString = appURL.toString(); + } catch (MalformedURLException e1) { + e1.printStackTrace(); + } + + // try { + // appURIString = app.toURI().toURL().toString(); + // } catch (MalformedURLException e) { + // e.printStackTrace(); + // } + if (task == null) { + logger.error("Can't execute since evaluation task is not set!"); + return; + } else { + logger.info("Deploying SAMOA S4 task [{}] from location [{}]. ", + task.getClass().getSimpleName(), appURIString); + } + + String[] args = { "-c=testCluster2", + "-appClass=" + S4DoTask.class.getName(), + "-appName=" + "samoaApp", + "-p=evalTask=" + task.getClass().getSimpleName(), + "-zk=localhost:2181", "-s4r=" + appURIString, "-emc=" + SamoaSerializerModule.class.getName() }; + // "-emc=" + S4MOAModule.class.getName(), + // "@" + + // Resources.getResource("s4moa.properties").getFile(), + + S4Config s4config = new S4Config(); + JCommander jc = new JCommander(s4config); + jc.parse(args); + + Map<String, String> namedParameters = new HashMap<String, String>(); + for (String parameter : s4config.namedParameters) { + String[] param = parameter.split("="); + namedParameters.put(param[0], param[1]); + } + + AppConfig config = new AppConfig.Builder() + .appClassName(s4config.appClass).appName(s4config.appName) + .appURI(s4config.appURI).namedParameters(namedParameters) + .build(); + + DeploymentUtils.initAppConfig(config, s4config.clusterName, true, + s4config.zkString); + + System.out.println("Suposedly deployed on S4"); + } + + public void initHTTPServer() { + + } + + @Parameters(separators = "=") + public static class S4Config { + + @Parameter(names = { "-c", "-cluster" }, description = "Cluster name", required = true) + String clusterName = null; + + @Parameter(names = "-appClass", description = "Main App class", required = false) + String appClass = null; + + @Parameter(names = "-appName", description = "Application name", required = false) + String appName = null; + + @Parameter(names = "-s4r", description = "Application URI", required = false) + String appURI = null; + + @Parameter(names = "-zk", description = "ZooKeeper connection string", required = false) + String zkString = null; + + @Parameter(names = { "-extraModulesClasses", "-emc" }, description = "Comma-separated list of additional configuration modules (they will be instantiated through their constructor without arguments).", required = false) + List<String> extraModules = new ArrayList<String>(); + + @Parameter(names = { "-p", "-namedStringParameters" }, description = "Comma-separated list of inline configuration " + + "parameters, taking precedence over homonymous configuration parameters from configuration files. " + + "Syntax: '-p=name1=value1,name2=value2 '", required = false, converter = ParsingUtils.InlineConfigParameterConverter.class) + List<String> namedParameters = new ArrayList<String>(); + + } + + @Override + public void setLocal(boolean bool) { + // TODO S4 works the same for local and distributed environments + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Topology.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Topology.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Topology.java new file mode 100644 index 0000000..413cfda --- /dev/null +++ b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Topology.java @@ -0,0 +1,63 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.topology.AbstractTopology; +import org.apache.samoa.topology.EntranceProcessingItem; + +public class S4Topology extends AbstractTopology { + + // CASEY: it seems evaluationTask is not used. + // Remove it for now + + // private String _evaluationTask; + + // S4Topology(String topoName, String evalTask) { + // super(topoName); + // } + // + // S4Topology(String topoName) { + // this(topoName, null); + // } + + // @Override + // public void setEvaluationTask(String evalTask) { + // _evaluationTask = evalTask; + // } + // + // @Override + // public String getEvaluationTask() { + // return _evaluationTask; + // } + + S4Topology(String topoName) { + super(topoName); + } + + protected EntranceProcessingItem getEntranceProcessingItem() { + if (this.getEntranceProcessingItems() == null) + return null; + if (this.getEntranceProcessingItems().size() < 1) + return null; + // TODO: support multiple entrance PIs + return (EntranceProcessingItem) this.getEntranceProcessingItems().toArray()[0]; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializer.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializer.java new file mode 100644 index 0000000..9f9f144 --- /dev/null +++ b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializer.java @@ -0,0 +1,99 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.nio.ByteBuffer; + +import org.apache.s4.base.SerializerDeserializer; +import org.apache.samoa.learners.classifiers.trees.AttributeContentEvent; +import org.apache.samoa.learners.classifiers.trees.ComputeContentEvent; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +public class SamoaSerializer implements SerializerDeserializer { + + private ThreadLocal<Kryo> kryoThreadLocal; + private ThreadLocal<Output> outputThreadLocal; + + private int initialBufferSize = 2048; + private int maxBufferSize = 256 * 1024; + + public void setMaxBufferSize(int maxBufferSize) { + this.maxBufferSize = maxBufferSize; + } + + /** + * + * @param classLoader + * classloader able to handle classes to serialize/deserialize. For instance, application-level events can + * only be handled by the application classloader. + */ + @Inject + public SamoaSerializer(@Assisted final ClassLoader classLoader) { + kryoThreadLocal = new ThreadLocal<Kryo>() { + + @Override + protected Kryo initialValue() { + Kryo kryo = new Kryo(); + kryo.setClassLoader(classLoader); + kryo.register(AttributeContentEvent.class, new AttributeContentEvent.AttributeCEFullPrecSerializer()); + kryo.register(ComputeContentEvent.class, new ComputeContentEvent.ComputeCEFullPrecSerializer()); + kryo.setRegistrationRequired(false); + return kryo; + } + }; + + outputThreadLocal = new ThreadLocal<Output>() { + @Override + protected Output initialValue() { + Output output = new Output(initialBufferSize, maxBufferSize); + return output; + } + }; + + } + + @Override + public Object deserialize(ByteBuffer rawMessage) { + Input input = new Input(rawMessage.array()); + try { + return kryoThreadLocal.get().readClassAndObject(input); + } finally { + input.close(); + } + } + + @SuppressWarnings("resource") + @Override + public ByteBuffer serialize(Object message) { + Output output = outputThreadLocal.get(); + try { + kryoThreadLocal.get().writeClassAndObject(output, message); + return ByteBuffer.wrap(output.toBytes()); + } finally { + output.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializerModule.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializerModule.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializerModule.java new file mode 100644 index 0000000..e530a09 --- /dev/null +++ b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializerModule.java @@ -0,0 +1,35 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.s4.base.SerializerDeserializer; + +import com.google.inject.AbstractModule; + +public class SamoaSerializerModule extends AbstractModule { + + @Override + protected void configure() { + bind(SerializerDeserializer.class).to(SamoaSerializer.class); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-samza/pom.xml b/samoa-samza/pom.xml index 0a04c65..1344031 100644 --- a/samoa-samza/pom.xml +++ b/samoa-samza/pom.xml @@ -31,14 +31,14 @@ <artifactId>samoa-samza</artifactId> <parent> - <groupId>com.yahoo.labs.samoa</groupId> + <groupId>org.apache.samoa</groupId> <artifactId>samoa</artifactId> <version>0.3.0-SNAPSHOT</version> </parent> <dependencies> <dependency> - <groupId>com.yahoo.labs.samoa</groupId> + <groupId>org.apache.samoa</groupId> <artifactId>samoa-api</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java deleted file mode 100644 index 8f90478..0000000 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java +++ /dev/null @@ -1,226 +0,0 @@ -package com.yahoo.labs.samoa; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.File; -import java.nio.file.FileSystems; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.javacliparser.ClassOption; -import com.yahoo.labs.samoa.tasks.Task; -import com.yahoo.labs.samoa.topology.impl.SamzaComponentFactory; -import com.yahoo.labs.samoa.topology.impl.SamzaEngine; -import com.yahoo.labs.samoa.topology.impl.SamzaTopology; -import com.yahoo.labs.samoa.utils.SystemsUtils; - -/** - * Main class to run the task on Samza - * - * @author Anh Thu Vu - */ -public class SamzaDoTask { - - private static final Logger logger = LoggerFactory.getLogger(SamzaDoTask.class); - - private static final String LOCAL_MODE = "local"; - private static final String YARN_MODE = "yarn"; - - // FLAGS - private static final String YARN_CONF_FLAG = "--yarn_home"; - private static final String MODE_FLAG = "--mode"; - private static final String ZK_FLAG = "--zookeeper"; - private static final String KAFKA_FLAG = "--kafka"; - private static final String KAFKA_REPLICATION_FLAG = "--kafka_replication_factor"; - private static final String CHECKPOINT_FREQ_FLAG = "--checkpoint_frequency"; - private static final String JAR_PACKAGE_FLAG = "--jar_package"; - private static final String SAMOA_HDFS_DIR_FLAG = "--samoa_hdfs_dir"; - private static final String AM_MEMORY_FLAG = "--yarn_am_mem"; - private static final String CONTAINER_MEMORY_FLAG = "--yarn_container_mem"; - private static final String PI_PER_CONTAINER_FLAG = "--pi_per_container"; - - private static final String KRYO_REGISTER_FLAG = "--kryo_register"; - - // config values - private static int kafkaReplicationFactor = 1; - private static int checkpointFrequency = 60000; - private static String kafka = "localhost:9092"; - private static String zookeeper = "localhost:2181"; - private static boolean isLocal = true; - private static String yarnConfHome = null; - private static String samoaHDFSDir = null; - private static String jarPackagePath = null; - private static int amMem = 1024; - private static int containerMem = 1024; - private static int piPerContainer = 2; - private static String kryoRegisterFile = null; - - /* - * 1. Read arguments 2. Construct topology/task 3. Upload the JAR to HDFS if - * we are running on YARN 4. Submit topology to SamzaEngine - */ - public static void main(String[] args) { - // Read arguments - List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); - parseArguments(tmpArgs); - - args = tmpArgs.toArray(new String[0]); - - // Init Task - StringBuilder cliString = new StringBuilder(); - for (int i = 0; i < args.length; i++) { - cliString.append(" ").append(args[i]); - } - logger.debug("Command line string = {}", cliString.toString()); - System.out.println("Command line string = " + cliString.toString()); - - Task task = null; - try { - task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, null); - logger.info("Sucessfully instantiating {}", task.getClass().getCanonicalName()); - } catch (Exception e) { - logger.error("Fail to initialize the task", e); - System.out.println("Fail to initialize the task" + e); - return; - } - task.setFactory(new SamzaComponentFactory()); - task.init(); - - // Upload JAR file to HDFS - String hdfsPath = null; - if (!isLocal) { - Path path = FileSystems.getDefault().getPath(jarPackagePath); - hdfsPath = uploadJarToHDFS(path.toFile()); - if (hdfsPath == null) { - System.out.println("Fail uploading JAR file \"" + path.toAbsolutePath().toString() + "\" to HDFS."); - return; - } - } - - // Set parameters - SamzaEngine.getEngine() - .setLocalMode(isLocal) - .setZooKeeper(zookeeper) - .setKafka(kafka) - .setYarnPackage(hdfsPath) - .setKafkaReplicationFactor(kafkaReplicationFactor) - .setConfigHome(yarnConfHome) - .setAMMemory(amMem) - .setContainerMemory(containerMem) - .setPiPerContainerRatio(piPerContainer) - .setKryoRegisterFile(kryoRegisterFile) - .setCheckpointFrequency(checkpointFrequency); - - // Submit topology - SamzaEngine.submitTopology((SamzaTopology) task.getTopology()); - - } - - private static boolean isLocalMode(String mode) { - return mode.equals(LOCAL_MODE); - } - - private static void parseArguments(List<String> args) { - for (int i = args.size() - 1; i >= 0; i--) { - String arg = args.get(i).trim(); - String[] splitted = arg.split("=", 2); - - if (splitted.length >= 2) { - // YARN config folder which contains conf/core-site.xml, - // conf/hdfs-site.xml, conf/yarn-site.xml - if (splitted[0].equals(YARN_CONF_FLAG)) { - yarnConfHome = splitted[1]; - args.remove(i); - } - // host:port for zookeeper cluster - else if (splitted[0].equals(ZK_FLAG)) { - zookeeper = splitted[1]; - args.remove(i); - } - // host:port,... for kafka broker(s) - else if (splitted[0].equals(KAFKA_FLAG)) { - kafka = splitted[1]; - args.remove(i); - } - // whether we are running Samza in Local mode or YARN mode - else if (splitted[0].equals(MODE_FLAG)) { - isLocal = isLocalMode(splitted[1]); - args.remove(i); - } - // memory requirement for YARN application master - else if (splitted[0].equals(AM_MEMORY_FLAG)) { - amMem = Integer.parseInt(splitted[1]); - args.remove(i); - } - // memory requirement for YARN worker container - else if (splitted[0].equals(CONTAINER_MEMORY_FLAG)) { - containerMem = Integer.parseInt(splitted[1]); - args.remove(i); - } - // the path to JAR archive that we need to upload to HDFS - else if (splitted[0].equals(JAR_PACKAGE_FLAG)) { - jarPackagePath = splitted[1]; - args.remove(i); - } - // the HDFS dir for SAMOA files - else if (splitted[0].equals(SAMOA_HDFS_DIR_FLAG)) { - samoaHDFSDir = splitted[1]; - if (samoaHDFSDir.length() < 1) - samoaHDFSDir = null; - args.remove(i); - } - // number of max PI instances per container - // this will be used to compute the number of containers - // AM will request for the job - else if (splitted[0].equals(PI_PER_CONTAINER_FLAG)) { - piPerContainer = Integer.parseInt(splitted[1]); - args.remove(i); - } - // kafka streams replication factor - else if (splitted[0].equals(KAFKA_REPLICATION_FLAG)) { - kafkaReplicationFactor = Integer.parseInt(splitted[1]); - args.remove(i); - } - // checkpoint frequency in ms - else if (splitted[0].equals(CHECKPOINT_FREQ_FLAG)) { - checkpointFrequency = Integer.parseInt(splitted[1]); - args.remove(i); - } - // the file contains registration information for Kryo serializer - else if (splitted[0].equals(KRYO_REGISTER_FLAG)) { - kryoRegisterFile = splitted[1]; - args.remove(i); - } - } - } - } - - private static String uploadJarToHDFS(File file) { - SystemsUtils.setHadoopConfigHome(yarnConfHome); - SystemsUtils.setSAMOADir(samoaHDFSDir); - return SystemsUtils.copyToHDFS(file, file.getName()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java deleted file mode 100644 index 486472c..0000000 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.system.SystemAdmin; -import org.apache.samza.system.SystemConsumer; -import org.apache.samza.system.SystemFactory; -import org.apache.samza.system.SystemProducer; -import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin; - -import com.yahoo.labs.samoa.topology.impl.SamzaEntranceProcessingItem.SamoaSystemConsumer; - -/** - * Implementation of Samza's SystemFactory Samza will use this factory to get our custom consumer which gets the events - * from SAMOA EntranceProcessor and feed them to EntranceProcessingItem task - * - * @author Anh Thu Vu - */ -public class SamoaSystemFactory implements SystemFactory { - @Override - public SystemAdmin getAdmin(String systemName, Config config) { - return new SinglePartitionWithoutOffsetsSystemAdmin(); - } - - @Override - public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { - return new SamoaSystemConsumer(systemName, config); - } - - @Override - public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { - throw new SamzaException("This implementation is not supposed to produce anything."); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java deleted file mode 100644 index 813c3b3..0000000 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.topology.ComponentFactory; -import com.yahoo.labs.samoa.topology.EntranceProcessingItem; -import com.yahoo.labs.samoa.topology.IProcessingItem; -import com.yahoo.labs.samoa.topology.ProcessingItem; -import com.yahoo.labs.samoa.topology.Stream; -import com.yahoo.labs.samoa.topology.Topology; - -/** - * Implementation of SAMOA ComponentFactory for Samza - * - * @author Anh Thu Vu - */ -public class SamzaComponentFactory implements ComponentFactory { - @Override - public ProcessingItem createPi(Processor processor) { - return this.createPi(processor, 1); - } - - @Override - public ProcessingItem createPi(Processor processor, int parallelism) { - return new SamzaProcessingItem(processor, parallelism); - } - - @Override - public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { - return new SamzaEntranceProcessingItem(entranceProcessor); - } - - @Override - public Stream createStream(IProcessingItem sourcePi) { - return new SamzaStream(sourcePi); - } - - @Override - public Topology createTopology(String topoName) { - return new SamzaTopology(topoName); - } -}
