http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaEngine.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaEngine.java b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaEngine.java new file mode 100644 index 0000000..9c30036 --- /dev/null +++ b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaEngine.java @@ -0,0 +1,194 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.List; +import java.util.Set; + +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; +import org.apache.samoa.topology.impl.SamzaStream.SamzaSystemStream; +import org.apache.samoa.utils.SamzaConfigFactory; +import org.apache.samoa.utils.SystemsUtils; +import org.apache.samza.config.MapConfig; +import org.apache.samza.job.JobRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class will submit a list of Samza jobs with the Configs generated from the input topology + * + * @author Anh Thu Vu + * + */ +public class SamzaEngine { + + private static final Logger logger = LoggerFactory.getLogger(SamzaEngine.class); + + /* + * Singleton instance + */ + private static SamzaEngine engine = new SamzaEngine(); + + private String zookeeper; + private String kafka; + private int kafkaReplicationFactor; + private boolean isLocalMode; + private String yarnPackagePath; + private String yarnConfHome; + + private String kryoRegisterFile; + + private int amMem; + private int containerMem; + private int piPerContainerRatio; + + private int checkpointFrequency; + + private void _submitTopology(SamzaTopology topology) { + + // Setup SamzaConfigFactory + SamzaConfigFactory configFactory = new SamzaConfigFactory(); + configFactory.setLocalMode(isLocalMode) + .setZookeeper(zookeeper) + .setKafka(kafka) + .setYarnPackage(yarnPackagePath) + .setAMMemory(amMem) + .setContainerMemory(containerMem) + .setPiPerContainerRatio(piPerContainerRatio) + .setKryoRegisterFile(kryoRegisterFile) + .setCheckpointFrequency(checkpointFrequency) + .setReplicationFactor(kafkaReplicationFactor); + + // Generate the list of Configs + List<MapConfig> configs; + try { + // ConfigFactory generate a list of configs + // Serialize a map of PIs and store in a file in the jar at jarFilePath + // (in dat/ folder) + configs = configFactory.getMapConfigsForTopology(topology); + } catch (Exception e) { + e.printStackTrace(); + return; + } + + // Create kafka streams + Set<Stream> streams = topology.getStreams(); + for (Stream stream : streams) { + SamzaStream samzaStream = (SamzaStream) stream; + List<SamzaSystemStream> systemStreams = samzaStream.getSystemStreams(); + for (SamzaSystemStream systemStream : systemStreams) { + // all streams should be kafka streams + SystemsUtils.createKafkaTopic(systemStream.getStream(), systemStream.getParallelism(), kafkaReplicationFactor); + } + } + + // Submit the jobs with those configs + for (MapConfig config : configs) { + logger.info("Config:{}", config); + JobRunner jobRunner = new JobRunner(config); + jobRunner.run(); + } + } + + private void _setupSystemsUtils() { + // Setup Utils + if (!isLocalMode) + SystemsUtils.setHadoopConfigHome(yarnConfHome); + SystemsUtils.setZookeeper(zookeeper); + } + + /* + * Setter methods + */ + public static SamzaEngine getEngine() { + return engine; + } + + public SamzaEngine setZooKeeper(String zk) { + this.zookeeper = zk; + return this; + } + + public SamzaEngine setKafka(String kafka) { + this.kafka = kafka; + return this; + } + + public SamzaEngine setKafkaReplicationFactor(int replicationFactor) { + this.kafkaReplicationFactor = replicationFactor; + return this; + } + + public SamzaEngine setCheckpointFrequency(int freq) { + this.checkpointFrequency = freq; + return this; + } + + public SamzaEngine setLocalMode(boolean isLocal) { + this.isLocalMode = isLocal; + return this; + } + + public SamzaEngine setYarnPackage(String yarnPackagePath) { + this.yarnPackagePath = yarnPackagePath; + return this; + } + + public SamzaEngine setConfigHome(String configHome) { + this.yarnConfHome = configHome; + return this; + } + + public SamzaEngine setAMMemory(int mem) { + this.amMem = mem; + return this; + } + + public SamzaEngine setContainerMemory(int mem) { + this.containerMem = mem; + return this; + } + + public SamzaEngine setPiPerContainerRatio(int piPerContainer) { + this.piPerContainerRatio = piPerContainer; + return this; + } + + public SamzaEngine setKryoRegisterFile(String registerFile) { + this.kryoRegisterFile = registerFile; + return this; + } + + /** + * Submit a list of Samza jobs correspond to the submitted topology + * + * @param topo + * the submitted topology + */ + public static void submitTopology(SamzaTopology topo) { + // Setup SystemsUtils + engine._setupSystemsUtils(); + + // Submit topology + engine._submitTopology(topo); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaEntranceProcessingItem.java b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaEntranceProcessingItem.java new file mode 100644 index 0000000..5f00b79 --- /dev/null +++ b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaEntranceProcessingItem.java @@ -0,0 +1,228 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.topology.AbstractEntranceProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.utils.SamzaConfigFactory; +import org.apache.samoa.utils.SystemsUtils; +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.BlockingEnvelopeMap; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.task.InitableTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * EntranceProcessingItem for Samza which is also a Samza task (StreamTask & InitableTask) + * + * @author Anh Thu Vu + * + */ +public class SamzaEntranceProcessingItem extends AbstractEntranceProcessingItem + implements SamzaProcessingNode, Serializable, StreamTask, InitableTask { + + /** + * + */ + private static final long serialVersionUID = 7157734520046135039L; + + /* + * Constructors + */ + public SamzaEntranceProcessingItem(EntranceProcessor processor) { + super(processor); + } + + // Need this so Samza can initialize a StreamTask + public SamzaEntranceProcessingItem() { + } + + /* + * Simple setters, getters + */ + @Override + public int addOutputStream(SamzaStream stream) { + this.setOutputStream(stream); + return 1; // entrance PI should have only 1 output stream + } + + /* + * Serialization + */ + private Object writeReplace() { + return new SerializationProxy(this); + } + + private static class SerializationProxy implements Serializable { + /** + * + */ + private static final long serialVersionUID = 313907132721414634L; + + private EntranceProcessor processor; + private SamzaStream outputStream; + private String name; + + public SerializationProxy(SamzaEntranceProcessingItem epi) { + this.processor = epi.getProcessor(); + this.outputStream = (SamzaStream) epi.getOutputStream(); + this.name = epi.getName(); + } + } + + /* + * Implement Samza Task + */ + @Override + public void init(Config config, TaskContext context) throws Exception { + String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY); + if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property is set, otherwise, assume we are running in local mode and ignore this + SystemsUtils.setHadoopConfigHome(yarnConfHome); + + String filename = config.get(SamzaConfigFactory.FILE_KEY); + String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY); + + this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY)); + SerializationProxy wrapper = (SerializationProxy) SystemsUtils.deserializeObjectFromFileAndKey(filesystem, + filename, this.getName()); + this.setOutputStream(wrapper.outputStream); + SamzaStream output = (SamzaStream) this.getOutputStream(); + if (output != null) // if output stream exists, set it up + output.onCreate(); + } + + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) + throws Exception { + SamzaStream output = (SamzaStream) this.getOutputStream(); + if (output == null) + return; // if there is no output stream, do nothing + output.setCollector(collector); + ContentEvent event = (ContentEvent) envelope.getMessage(); + output.put(event); + } + + /* + * Implementation of Samza's SystemConsumer to get events from source and feed + * to SAMOA system + */ + /* + * Current implementation: buffer the incoming events and send a batch of them + * when poll() is called by Samza system. + * + * Currently: it has a "soft" limit on the size of the buffer: when the buffer + * size reaches the limit, the reading thread will sleep for 100ms. A hard + * limit can be achieved by overriding the method protected + * BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() of + * BlockingEnvelopeMap But then we have handle the case when the queue is + * full. + */ + public static class SamoaSystemConsumer extends BlockingEnvelopeMap { + + private EntranceProcessor entranceProcessor = null; + private SystemStreamPartition systemStreamPartition; + + private static final Logger logger = LoggerFactory.getLogger(SamoaSystemConsumer.class); + + public SamoaSystemConsumer(String systemName, Config config) { + String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY); + if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property is set, otherwise, assume we are running in local mode and ignore this + SystemsUtils.setHadoopConfigHome(yarnConfHome); + + String filename = config.get(SamzaConfigFactory.FILE_KEY); + String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY); + String name = config.get(SamzaConfigFactory.JOB_NAME_KEY); + SerializationProxy wrapper = (SerializationProxy) SystemsUtils.deserializeObjectFromFileAndKey(filesystem, + filename, name); + + this.entranceProcessor = wrapper.processor; + this.entranceProcessor.onCreate(0); + + // Internal stream from SystemConsumer to EntranceTask, so we + // need only one partition + this.systemStreamPartition = new SystemStreamPartition(systemName, wrapper.name, new Partition(0)); + } + + @Override + public void start() { + Thread processorPollingThread = new Thread( + new Runnable() { + @Override + public void run() { + try { + pollingEntranceProcessor(); + setIsAtHead(systemStreamPartition, true); + } catch (InterruptedException e) { + e.getStackTrace(); + stop(); + } + } + } + ); + + processorPollingThread.start(); + } + + @Override + public void stop() { + + } + + private void pollingEntranceProcessor() throws InterruptedException { + int messageCnt = 0; + while (!this.entranceProcessor.isFinished()) { + messageCnt = this.getNumMessagesInQueue(systemStreamPartition); + if (this.entranceProcessor.hasNext() && messageCnt < 10000) { // soft + // limit + // on the + // size of + // the + // queue + this.put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, null, null, + this.entranceProcessor.nextEvent())); + } else { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + break; + } + } + } + + // Send last event + this.put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, null, null, + this.entranceProcessor.nextEvent())); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaProcessingItem.java b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaProcessingItem.java new file mode 100644 index 0000000..88f0eaa --- /dev/null +++ b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaProcessingItem.java @@ -0,0 +1,166 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Serializable; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.AbstractProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.impl.SamzaStream.SamzaSystemStream; +import org.apache.samoa.utils.PartitioningScheme; +import org.apache.samoa.utils.SamzaConfigFactory; +import org.apache.samoa.utils.StreamDestination; +import org.apache.samoa.utils.SystemsUtils; +import org.apache.samza.config.Config; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.task.InitableTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; + +/** + * ProcessingItem for Samza which is also a Samza task (StreamTask and InitableTask) + * + * @author Anh Thu Vu + */ +public class SamzaProcessingItem extends AbstractProcessingItem + implements SamzaProcessingNode, Serializable, StreamTask, InitableTask { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private Set<SamzaSystemStream> inputStreams; // input streams: system.stream + private List<SamzaStream> outputStreams; + + /* + * Constructors + */ + // Need this so Samza can initialize a StreamTask + public SamzaProcessingItem() { + } + + /* + * Implement org.apache.samoa.topology.ProcessingItem + */ + public SamzaProcessingItem(Processor processor, int parallelismHint) { + super(processor, parallelismHint); + this.inputStreams = new HashSet<SamzaSystemStream>(); + this.outputStreams = new LinkedList<SamzaStream>(); + } + + /* + * Simple setters, getters + */ + public Set<SamzaSystemStream> getInputStreams() { + return this.inputStreams; + } + + /* + * Extends AbstractProcessingItem + */ + @Override + protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { + SamzaSystemStream stream = ((SamzaStream) inputStream).addDestination(new StreamDestination(this, this + .getParallelism(), scheme)); + this.inputStreams.add(stream); + return this; + } + + /* + * Implement com.yahoo.samoa.topology.impl.SamzaProcessingNode + */ + @Override + public int addOutputStream(SamzaStream stream) { + this.outputStreams.add(stream); + return this.outputStreams.size(); + } + + public List<SamzaStream> getOutputStreams() { + return this.outputStreams; + } + + /* + * Implement Samza task + */ + @Override + public void init(Config config, TaskContext context) throws Exception { + String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY); + if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property is set, otherwise, assume we are running in local mode and ignore this // set , otherwise, + SystemsUtils.setHadoopConfigHome(yarnConfHome); + + String filename = config.get(SamzaConfigFactory.FILE_KEY); + String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY); + this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY)); + SerializationProxy wrapper = (SerializationProxy) SystemsUtils.deserializeObjectFromFileAndKey(filesystem, + filename, this.getName()); + this.setProcessor(wrapper.processor); + this.outputStreams = wrapper.outputStreams; + + // Init Processor and Streams + this.getProcessor().onCreate(0); + for (SamzaStream stream : this.outputStreams) { + stream.onCreate(); + } + + } + + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) + throws Exception { + for (SamzaStream stream : this.outputStreams) { + stream.setCollector(collector); + } + this.getProcessor().process((ContentEvent) envelope.getMessage()); + } + + /* + * SerializationProxy + */ + private Object writeReplace() { + return new SerializationProxy(this); + } + + private static class SerializationProxy implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1534643987559070336L; + + private Processor processor; + private List<SamzaStream> outputStreams; + + public SerializationProxy(SamzaProcessingItem pi) { + this.processor = pi.getProcessor(); + this.outputStreams = pi.getOutputStreams(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaProcessingNode.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaProcessingNode.java b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaProcessingNode.java new file mode 100644 index 0000000..3d221f1 --- /dev/null +++ b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaProcessingNode.java @@ -0,0 +1,58 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.topology.IProcessingItem; + +/** + * Common interface of SamzaEntranceProcessingItem and SamzaProcessingItem + * + * @author Anh Thu Vu + */ +public interface SamzaProcessingNode extends IProcessingItem { + /** + * Registers an output stream with this processing item + * + * @param stream + * the output stream + * @return the number of output streams of this processing item + */ + public int addOutputStream(SamzaStream stream); + + /** + * Gets the name/id of this processing item + * + * @return the name/id of this processing item + */ + // TODO: include getName() and setName() in IProcessingItem and/or + // AbstractEPI/PI + public String getName(); + + /** + * Sets the name/id for this processing item + * + * @param name + * the name/id of this processing item + */ + // TODO: include getName() and setName() in IProcessingItem and/or + // AbstractEPI/PI + public void setName(String name); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaStream.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaStream.java b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaStream.java new file mode 100644 index 0000000..da3f8bc --- /dev/null +++ b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaStream.java @@ -0,0 +1,245 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.topology.AbstractStream; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.utils.PartitioningScheme; +import org.apache.samoa.utils.StreamDestination; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.MessageCollector; + +/** + * Stream for SAMOA on Samza + * + * @author Anh Thu Vu + */ +public class SamzaStream extends AbstractStream implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private static final String DEFAULT_SYSTEM_NAME = "kafka"; + + private List<SamzaSystemStream> systemStreams; + private transient MessageCollector collector; + private String systemName; + + /* + * Constructor + */ + public SamzaStream(IProcessingItem sourcePi) { + super(sourcePi); + this.systemName = DEFAULT_SYSTEM_NAME; + // Get name/id for this stream + SamzaProcessingNode samzaPi = (SamzaProcessingNode) sourcePi; + int index = samzaPi.addOutputStream(this); + this.setStreamId(samzaPi.getName() + "-" + Integer.toString(index)); + // init list of SamzaSystemStream + systemStreams = new ArrayList<SamzaSystemStream>(); + } + + /* + * System name (Kafka) + */ + public void setSystemName(String systemName) { + this.systemName = systemName; + for (SamzaSystemStream systemStream : systemStreams) { + systemStream.setSystem(systemName); + } + } + + public String getSystemName() { + return this.systemName; + } + + /* + * Add the PI to the list of destinations. Return the name of the + * corresponding SystemStream. + */ + public SamzaSystemStream addDestination(StreamDestination destination) { + PartitioningScheme scheme = destination.getPartitioningScheme(); + int parallelism = destination.getParallelism(); + + SamzaSystemStream resultStream = null; + for (int i = 0; i < systemStreams.size(); i++) { + // There is an existing SystemStream that matches the settings. + // Do not create a new one + if (systemStreams.get(i).isSame(scheme, parallelism)) { + resultStream = systemStreams.get(i); + } + } + + // No existing SystemStream match the requirement + // Create a new one + if (resultStream == null) { + String topicName = this.getStreamId() + "-" + Integer.toString(systemStreams.size()); + resultStream = new SamzaSystemStream(this.systemName, topicName, scheme, parallelism); + systemStreams.add(resultStream); + } + + return resultStream; + } + + public void setCollector(MessageCollector collector) { + this.collector = collector; + } + + public MessageCollector getCollector() { + return this.collector; + } + + public void onCreate() { + for (SamzaSystemStream stream : systemStreams) { + stream.initSystemStream(); + } + } + + /* + * Implement Stream interface + */ + @Override + public void put(ContentEvent event) { + for (SamzaSystemStream stream : systemStreams) { + stream.send(collector, event); + } + } + + public List<SamzaSystemStream> getSystemStreams() { + return this.systemStreams; + } + + /** + * SamzaSystemStream wrap around a Samza's SystemStream It contains the info to create a Samza stream during the + * constructing process of the topology and will create the actual Samza stream when the topology is submitted + * (invoking initSystemStream()) + * + * @author Anh Thu Vu + */ + public static class SamzaSystemStream implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1L; + private String system; + private String stream; + private PartitioningScheme scheme; + private int parallelism; + + private transient SystemStream actualSystemStream = null; + + /* + * Constructors + */ + public SamzaSystemStream(String system, String stream, PartitioningScheme scheme, int parallelism) { + this.system = system; + this.stream = stream; + this.scheme = scheme; + this.parallelism = parallelism; + } + + public SamzaSystemStream(String system, String stream, PartitioningScheme scheme) { + this(system, stream, scheme, 1); + } + + /* + * Setters + */ + public void setSystem(String system) { + this.system = system; + } + + /* + * Getters + */ + public String getSystem() { + return this.system; + } + + public String getStream() { + return this.stream; + } + + public PartitioningScheme getPartitioningScheme() { + return this.scheme; + } + + public int getParallelism() { + return this.parallelism; + } + + public boolean isSame(PartitioningScheme scheme, int parallelismHint) { + return (this.scheme == scheme && this.parallelism == parallelismHint); + } + + /* + * Init the actual Samza stream + */ + public void initSystemStream() { + actualSystemStream = new SystemStream(this.system, this.stream); + } + + /* + * Send a ContentEvent + */ + public void send(MessageCollector collector, ContentEvent contentEvent) { + if (actualSystemStream == null) + this.initSystemStream(); + + switch (this.scheme) { + case SHUFFLE: + this.sendShuffle(collector, contentEvent); + break; + case GROUP_BY_KEY: + this.sendGroupByKey(collector, contentEvent); + break; + case BROADCAST: + this.sendBroadcast(collector, contentEvent); + break; + } + } + + /* + * Helpers + */ + private synchronized void sendShuffle(MessageCollector collector, ContentEvent event) { + collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, event)); + } + + private void sendGroupByKey(MessageCollector collector, ContentEvent event) { + collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, event.getKey(), null, event)); + } + + private synchronized void sendBroadcast(MessageCollector collector, ContentEvent event) { + for (int i = 0; i < parallelism; i++) { + collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, i, null, event)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaTopology.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaTopology.java b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaTopology.java new file mode 100644 index 0000000..ae74619 --- /dev/null +++ b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaTopology.java @@ -0,0 +1,64 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.HashSet; +import java.util.Set; + +import org.apache.samoa.topology.AbstractTopology; +import org.apache.samoa.topology.IProcessingItem; + +/** + * Topology for Samza + * + * @author Anh Thu Vu + */ +public class SamzaTopology extends AbstractTopology { + private int procItemCounter; + + public SamzaTopology(String topoName) { + super(topoName); + procItemCounter = 0; + } + + @Override + public void addProcessingItem(IProcessingItem procItem, int parallelism) { + super.addProcessingItem(procItem, parallelism); + SamzaProcessingNode samzaPi = (SamzaProcessingNode) procItem; + samzaPi.setName(this.getTopologyName() + "-" + Integer.toString(procItemCounter)); + procItemCounter++; + } + + /* + * Gets the set of ProcessingItems, excluding EntrancePIs Used by + * SamzaConfigFactory as the config for EntrancePIs and normal PIs are + * different + */ + public Set<IProcessingItem> getNonEntranceProcessingItems() throws Exception { + Set<IProcessingItem> copiedSet = new HashSet<IProcessingItem>(); + copiedSet.addAll(this.getProcessingItems()); + boolean result = copiedSet.removeAll(this.getEntranceProcessingItems()); + if (!result) { + throw new Exception("Failed extracting the set of non-entrance processing items"); + } + return copiedSet; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/utils/SamzaConfigFactory.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/org/apache/samoa/utils/SamzaConfigFactory.java b/samoa-samza/src/main/java/org/apache/samoa/utils/SamzaConfigFactory.java new file mode 100644 index 0000000..0a2efed --- /dev/null +++ b/samoa-samza/src/main/java/org/apache/samoa/utils/SamzaConfigFactory.java @@ -0,0 +1,538 @@ +package org.apache.samoa.utils; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.commons.io.FileUtils; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.impl.SamoaSystemFactory; +import org.apache.samoa.topology.impl.SamzaEntranceProcessingItem; +import org.apache.samoa.topology.impl.SamzaProcessingItem; +import org.apache.samoa.topology.impl.SamzaStream; +import org.apache.samoa.topology.impl.SamzaTopology; +import org.apache.samoa.topology.impl.SamzaStream.SamzaSystemStream; +import org.apache.samza.config.MapConfig; +import org.apache.samza.job.local.LocalJobFactory; +import org.apache.samza.job.yarn.YarnJobFactory; +import org.apache.samza.system.kafka.KafkaSystemFactory; + +/** + * Generate Configs that will be used to submit Samza jobs from the input topology (one config per PI/EntrancePI in the + * topology) + * + * @author Anh Thu Vu + * + */ +public class SamzaConfigFactory { + public static final String SYSTEM_NAME = "samoa"; + + // DEFAULT VALUES + private static final String DEFAULT_ZOOKEEPER = "localhost:2181"; + private static final String DEFAULT_BROKER_LIST = "localhost:9092"; + + // DELIMINATORS + public static final String COMMA = ","; + public static final String COLON = ":"; + public static final String DOT = "."; + public static final char DOLLAR_SIGN = '$'; + public static final char QUESTION_MARK = '?'; + + // PARTITIONING SCHEMES + public static final String SHUFFLE = "shuffle"; + public static final String KEY = "key"; + public static final String BROADCAST = "broadcast"; + + // PROPERTY KEYS + // JOB + public static final String JOB_FACTORY_CLASS_KEY = "job.factory.class"; + public static final String JOB_NAME_KEY = "job.name"; + // YARN + public static final String YARN_PACKAGE_KEY = "yarn.package.path"; + public static final String CONTAINER_MEMORY_KEY = "yarn.container.memory.mb"; + public static final String AM_MEMORY_KEY = "yarn.am.container.memory.mb"; + public static final String CONTAINER_COUNT_KEY = "yarn.container.count"; + // TASK (SAMZA original) + public static final String TASK_CLASS_KEY = "task.class"; + public static final String TASK_INPUTS_KEY = "task.inputs"; + // TASK (extra) + public static final String FILE_KEY = "task.processor.file"; + public static final String FILESYSTEM_KEY = "task.processor.filesystem"; + public static final String ENTRANCE_INPUT_KEY = "task.entrance.input"; + public static final String ENTRANCE_OUTPUT_KEY = "task.entrance.outputs"; + public static final String YARN_CONF_HOME_KEY = "yarn.config.home"; + // KAFKA + public static final String ZOOKEEPER_URI_KEY = "consumer.zookeeper.connect"; + public static final String BROKER_URI_KEY = "producer.metadata.broker.list"; + public static final String KAFKA_BATCHSIZE_KEY = "producer.batch.num.messages"; + public static final String KAFKA_PRODUCER_TYPE_KEY = "producer.producer.type"; + // SERDE + public static final String SERDE_REGISTRATION_KEY = "kryo.register"; + + // Instance variables + private boolean isLocalMode; + private String zookeeper; + private String kafkaBrokerList; + private int replicationFactor; + private int amMemory; + private int containerMemory; + private int piPerContainerRatio; + private int checkpointFrequency; // in ms + + private String jarPath; + private String kryoRegisterFile = null; + + public SamzaConfigFactory() { + this.isLocalMode = false; + this.zookeeper = DEFAULT_ZOOKEEPER; + this.kafkaBrokerList = DEFAULT_BROKER_LIST; + this.checkpointFrequency = 60000; // default: 1 minute + this.replicationFactor = 1; + } + + /* + * Setter methods + */ + public SamzaConfigFactory setYarnPackage(String packagePath) { + this.jarPath = packagePath; + return this; + } + + public SamzaConfigFactory setLocalMode(boolean isLocal) { + this.isLocalMode = isLocal; + return this; + } + + public SamzaConfigFactory setZookeeper(String zk) { + this.zookeeper = zk; + return this; + } + + public SamzaConfigFactory setKafka(String brokerList) { + this.kafkaBrokerList = brokerList; + return this; + } + + public SamzaConfigFactory setCheckpointFrequency(int freq) { + this.checkpointFrequency = freq; + return this; + } + + public SamzaConfigFactory setReplicationFactor(int replicationFactor) { + this.replicationFactor = replicationFactor; + return this; + } + + public SamzaConfigFactory setAMMemory(int mem) { + this.amMemory = mem; + return this; + } + + public SamzaConfigFactory setContainerMemory(int mem) { + this.containerMemory = mem; + return this; + } + + public SamzaConfigFactory setPiPerContainerRatio(int piPerContainer) { + this.piPerContainerRatio = piPerContainer; + return this; + } + + public SamzaConfigFactory setKryoRegisterFile(String kryoRegister) { + this.kryoRegisterFile = kryoRegister; + return this; + } + + /* + * Generate a map of all config properties for the input SamzaProcessingItem + */ + private Map<String, String> getMapForPI(SamzaProcessingItem pi, String filename, String filesystem) throws Exception { + Map<String, String> map = getBasicSystemConfig(); + + // Set job name, task class, task inputs (from SamzaProcessingItem) + setJobName(map, pi.getName()); + setTaskClass(map, SamzaProcessingItem.class.getName()); + + StringBuilder streamNames = new StringBuilder(); + boolean first = true; + for (SamzaSystemStream stream : pi.getInputStreams()) { + if (!first) + streamNames.append(COMMA); + streamNames.append(stream.getSystem() + DOT + stream.getStream()); + if (first) + first = false; + } + setTaskInputs(map, streamNames.toString()); + + // Processor file + setFileName(map, filename); + setFileSystem(map, filesystem); + + List<String> nameList = new ArrayList<String>(); + // Default kafka system: kafka0: sync producer + // This system is always required: it is used for checkpointing + nameList.add("kafka0"); + setKafkaSystem(map, "kafka0", this.zookeeper, this.kafkaBrokerList, 1); + // Output streams: set kafka systems + for (SamzaStream stream : pi.getOutputStreams()) { + boolean found = false; + for (String name : nameList) { + if (stream.getSystemName().equals(name)) { + found = true; + break; + } + } + if (!found) { + nameList.add(stream.getSystemName()); + setKafkaSystem(map, stream.getSystemName(), this.zookeeper, this.kafkaBrokerList, stream.getBatchSize()); + } + } + // Input streams: set kafka systems + for (SamzaSystemStream stream : pi.getInputStreams()) { + boolean found = false; + for (String name : nameList) { + if (stream.getSystem().equals(name)) { + found = true; + break; + } + } + if (!found) { + nameList.add(stream.getSystem()); + setKafkaSystem(map, stream.getSystem(), this.zookeeper, this.kafkaBrokerList, 1); + } + } + + // Checkpointing + setValue(map, "task.checkpoint.factory", "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory"); + setValue(map, "task.checkpoint.system", "kafka0"); + setValue(map, "task.commit.ms", "1000"); + setValue(map, "task.checkpoint.replication.factor", Integer.toString(this.replicationFactor)); + + // Number of containers + setNumberOfContainers(map, pi.getParallelism(), this.piPerContainerRatio); + + return map; + } + + /* + * Generate a map of all config properties for the input SamzaProcessingItem + */ + public Map<String, String> getMapForEntrancePI(SamzaEntranceProcessingItem epi, String filename, String filesystem) { + Map<String, String> map = getBasicSystemConfig(); + + // Set job name, task class (from SamzaEntranceProcessingItem) + setJobName(map, epi.getName()); + setTaskClass(map, SamzaEntranceProcessingItem.class.getName()); + + // Input for the entrance task (from our custom consumer) + setTaskInputs(map, SYSTEM_NAME + "." + epi.getName()); + + // Output from entrance task + // Since entrancePI should have only 1 output stream + // there is no need for checking the batch size, setting different system + // names + // The custom consumer (samoa system) does not suuport reading from a + // specific index + // => no need for checkpointing + SamzaStream outputStream = (SamzaStream) epi.getOutputStream(); + // Set samoa system factory + setValue(map, "systems." + SYSTEM_NAME + ".samza.factory", SamoaSystemFactory.class.getName()); + // Set Kafka system (only if there is an output stream) + if (outputStream != null) + setKafkaSystem(map, outputStream.getSystemName(), this.zookeeper, this.kafkaBrokerList, + outputStream.getBatchSize()); + + // Processor file + setFileName(map, filename); + setFileSystem(map, filesystem); + + // Number of containers + setNumberOfContainers(map, 1, this.piPerContainerRatio); + + return map; + } + + /* + * Generate a list of map (of config properties) for all PIs and EPI in the + * input topology + */ + public List<Map<String, String>> getMapsForTopology(SamzaTopology topology) throws Exception { + + List<Map<String, String>> maps = new ArrayList<Map<String, String>>(); + + // File to write serialized objects + String filename = topology.getTopologyName() + ".dat"; + Path dirPath = FileSystems.getDefault().getPath("dat"); + Path filePath = FileSystems.getDefault().getPath(dirPath.toString(), filename); + String dstPath = filePath.toString(); + String resPath; + String filesystem; + if (this.isLocalMode) { + filesystem = SystemsUtils.LOCAL_FS; + File dir = dirPath.toFile(); + if (!dir.exists()) + FileUtils.forceMkdir(dir); + } + else { + filesystem = SystemsUtils.HDFS; + } + + // Correct system name for streams + this.setSystemNameForStreams(topology.getStreams()); + + // Add all PIs to a collection (map) + Map<String, Object> piMap = new HashMap<String, Object>(); + Set<EntranceProcessingItem> entranceProcessingItems = topology.getEntranceProcessingItems(); + Set<IProcessingItem> processingItems = topology.getNonEntranceProcessingItems(); + for (EntranceProcessingItem epi : entranceProcessingItems) { + SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi; + piMap.put(sepi.getName(), sepi); + } + for (IProcessingItem pi : processingItems) { + SamzaProcessingItem spi = (SamzaProcessingItem) pi; + piMap.put(spi.getName(), spi); + } + + // Serialize all PIs + boolean serialized = false; + if (this.isLocalMode) { + serialized = SystemsUtils.serializeObjectToLocalFileSystem(piMap, dstPath); + resPath = dstPath; + } + else { + resPath = SystemsUtils.serializeObjectToHDFS(piMap, dstPath); + serialized = resPath != null; + } + + if (!serialized) { + throw new Exception("Fail serialize map of PIs to file"); + } + + // MapConfig for all PIs + for (EntranceProcessingItem epi : entranceProcessingItems) { + SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi; + maps.add(this.getMapForEntrancePI(sepi, resPath, filesystem)); + } + for (IProcessingItem pi : processingItems) { + SamzaProcessingItem spi = (SamzaProcessingItem) pi; + maps.add(this.getMapForPI(spi, resPath, filesystem)); + } + + return maps; + } + + /** + * Construct a list of MapConfigs for a Topology + * + * @return the list of MapConfigs + * @throws Exception + */ + public List<MapConfig> getMapConfigsForTopology(SamzaTopology topology) throws Exception { + List<MapConfig> configs = new ArrayList<MapConfig>(); + List<Map<String, String>> maps = this.getMapsForTopology(topology); + for (Map<String, String> map : maps) { + configs.add(new MapConfig(map)); + } + return configs; + } + + /* + * + */ + public void setSystemNameForStreams(Set<Stream> streams) { + Map<Integer, String> batchSizeMap = new HashMap<Integer, String>(); + batchSizeMap.put(1, "kafka0"); // default system with sync producer + int counter = 0; + for (Stream stream : streams) { + SamzaStream samzaStream = (SamzaStream) stream; + String systemName = batchSizeMap.get(samzaStream.getBatchSize()); + if (systemName == null) { + counter++; + // Add new system + systemName = "kafka" + Integer.toString(counter); + batchSizeMap.put(samzaStream.getBatchSize(), systemName); + } + samzaStream.setSystemName(systemName); + } + + } + + /* + * Generate a map with common properties for PIs and EPI + */ + private Map<String, String> getBasicSystemConfig() { + Map<String, String> map = new HashMap<String, String>(); + // Job & Task + if (this.isLocalMode) + map.put(JOB_FACTORY_CLASS_KEY, LocalJobFactory.class.getName()); + else { + map.put(JOB_FACTORY_CLASS_KEY, YarnJobFactory.class.getName()); + + // yarn + map.put(YARN_PACKAGE_KEY, jarPath); + map.put(CONTAINER_MEMORY_KEY, Integer.toString(this.containerMemory)); + map.put(AM_MEMORY_KEY, Integer.toString(this.amMemory)); + map.put(CONTAINER_COUNT_KEY, "1"); + map.put(YARN_CONF_HOME_KEY, SystemsUtils.getHadoopConfigHome()); + + // Task opts (Heap size = 0.75 container memory) + int heapSize = (int) (0.75 * this.containerMemory); + map.put("task.opts", "-Xmx" + Integer.toString(heapSize) + "M -XX:+PrintGCDateStamps"); + } + + map.put(JOB_NAME_KEY, ""); + map.put(TASK_CLASS_KEY, ""); + map.put(TASK_INPUTS_KEY, ""); + + // register serializer + map.put("serializers.registry.kryo.class", SamzaKryoSerdeFactory.class.getName()); + + // Serde registration + setKryoRegistration(map, this.kryoRegisterFile); + + return map; + } + + /* + * Helper methods to set different properties in the input map + */ + private static void setJobName(Map<String, String> map, String jobName) { + map.put(JOB_NAME_KEY, jobName); + } + + private static void setFileName(Map<String, String> map, String filename) { + map.put(FILE_KEY, filename); + } + + private static void setFileSystem(Map<String, String> map, String filesystem) { + map.put(FILESYSTEM_KEY, filesystem); + } + + private static void setTaskClass(Map<String, String> map, String taskClass) { + map.put(TASK_CLASS_KEY, taskClass); + } + + private static void setTaskInputs(Map<String, String> map, String inputs) { + map.put(TASK_INPUTS_KEY, inputs); + } + + private static void setKryoRegistration(Map<String, String> map, String kryoRegisterFile) { + if (kryoRegisterFile != null) { + String value = readKryoRegistration(kryoRegisterFile); + map.put(SERDE_REGISTRATION_KEY, value); + } + } + + private static void setNumberOfContainers(Map<String, String> map, int parallelism, int piPerContainer) { + int res = parallelism / piPerContainer; + if (parallelism % piPerContainer != 0) + res++; + map.put(CONTAINER_COUNT_KEY, Integer.toString(res)); + } + + private static void setKafkaSystem(Map<String, String> map, String systemName, String zk, String brokers, + int batchSize) { + map.put("systems." + systemName + ".samza.factory", KafkaSystemFactory.class.getName()); + map.put("systems." + systemName + ".samza.msg.serde", "kryo"); + + map.put("systems." + systemName + "." + ZOOKEEPER_URI_KEY, zk); + map.put("systems." + systemName + "." + BROKER_URI_KEY, brokers); + map.put("systems." + systemName + "." + KAFKA_BATCHSIZE_KEY, Integer.toString(batchSize)); + + map.put("systems." + systemName + ".samza.offset.default", "oldest"); + + if (batchSize > 1) { + map.put("systems." + systemName + "." + KAFKA_PRODUCER_TYPE_KEY, "async"); + } + else { + map.put("systems." + systemName + "." + KAFKA_PRODUCER_TYPE_KEY, "sync"); + } + } + + // Set custom properties + private static void setValue(Map<String, String> map, String key, String value) { + map.put(key, value); + } + + /* + * Helper method to parse Kryo registration file + */ + private static String readKryoRegistration(String filePath) { + FileInputStream fis = null; + Properties props = new Properties(); + StringBuilder result = new StringBuilder(); + try { + fis = new FileInputStream(filePath); + props.load(fis); + + boolean first = true; + String value = null; + for (String k : props.stringPropertyNames()) { + if (!first) + result.append(COMMA); + else + first = false; + + // Need to avoid the dollar sign as samza pass all the properties in + // the config to containers via commandline parameters/enviroment + // variables + // We might escape the dollar sign, but it's more complicated than + // replacing it with something else + result.append(k.trim().replace(DOLLAR_SIGN, QUESTION_MARK)); + value = props.getProperty(k); + if (value != null && value.trim().length() > 0) { + result.append(COLON); + result.append(value.trim().replace(DOLLAR_SIGN, QUESTION_MARK)); + } + } + } catch (FileNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally { + if (fis != null) + try { + fis.close(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + return result.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/utils/SamzaKryoSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/org/apache/samoa/utils/SamzaKryoSerdeFactory.java b/samoa-samza/src/main/java/org/apache/samoa/utils/SamzaKryoSerdeFactory.java new file mode 100644 index 0000000..90c0f25 --- /dev/null +++ b/samoa-samza/src/main/java/org/apache/samoa/utils/SamzaKryoSerdeFactory.java @@ -0,0 +1,158 @@ +package org.apache.samoa.utils; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.ByteArrayOutputStream; + +import org.apache.samza.config.Config; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerdeFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * Implementation of Samza's SerdeFactory that uses Kryo to serialize/deserialize objects + * + * @author Anh Thu Vu + * @param <T> + * + */ +public class SamzaKryoSerdeFactory<T> implements SerdeFactory<T> { + + private static final Logger logger = LoggerFactory.getLogger(SamzaKryoSerdeFactory.class); + + public static class SamzaKryoSerde<V> implements Serde<V> { + private Kryo kryo; + + public SamzaKryoSerde(String registrationInfo) { + this.kryo = new Kryo(); + this.register(registrationInfo); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void register(String registrationInfo) { + if (registrationInfo == null) + return; + + String[] infoList = registrationInfo.split(SamzaConfigFactory.COMMA); + + Class targetClass = null; + Class serializerClass = null; + Serializer serializer = null; + + for (String info : infoList) { + String[] fields = info.split(SamzaConfigFactory.COLON); + + targetClass = null; + serializerClass = null; + if (fields.length > 0) { + try { + targetClass = Class.forName(fields[0].replace(SamzaConfigFactory.QUESTION_MARK, + SamzaConfigFactory.DOLLAR_SIGN)); + } catch (ClassNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + if (fields.length > 1) { + try { + serializerClass = Class.forName(fields[1].replace(SamzaConfigFactory.QUESTION_MARK, + SamzaConfigFactory.DOLLAR_SIGN)); + } catch (ClassNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + if (targetClass != null) { + if (serializerClass == null) { + kryo.register(targetClass); + } + else { + serializer = resolveSerializerInstance(kryo, targetClass, (Class<? extends Serializer>) serializerClass); + kryo.register(targetClass, serializer); + } + } + else { + logger.info("Invalid registration info:{}", info); + } + } + } + + @SuppressWarnings("rawtypes") + private static Serializer resolveSerializerInstance(Kryo k, Class superClass, + Class<? extends Serializer> serializerClass) { + try { + try { + return serializerClass.getConstructor(Kryo.class, Class.class).newInstance(k, superClass); + } catch (Exception ex1) { + try { + return serializerClass.getConstructor(Kryo.class).newInstance(k); + } catch (Exception ex2) { + try { + return serializerClass.getConstructor(Class.class).newInstance(superClass); + } catch (Exception ex3) { + return serializerClass.newInstance(); + } + } + } + } catch (Exception ex) { + throw new IllegalArgumentException("Unable to create serializer \"" + + serializerClass.getName() + + "\" for class: " + + superClass.getName(), ex); + } + } + + /* + * Implement Samza Serde interface + */ + @Override + public byte[] toBytes(V obj) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Output output = new Output(bos); + kryo.writeClassAndObject(output, obj); + output.flush(); + output.close(); + return bos.toByteArray(); + } + + @SuppressWarnings("unchecked") + @Override + public V fromBytes(byte[] byteArr) { + Input input = new Input(byteArr); + Object obj = kryo.readClassAndObject(input); + input.close(); + return (V) obj; + } + + } + + @Override + public Serde<T> getSerde(String name, Config config) { + return new SamzaKryoSerde<T>(config.get(SamzaConfigFactory.SERDE_REGISTRATION_KEY)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/utils/SerializableSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/org/apache/samoa/utils/SerializableSerializer.java b/samoa-samza/src/main/java/org/apache/samoa/utils/SerializableSerializer.java new file mode 100644 index 0000000..4932ba8 --- /dev/null +++ b/samoa-samza/src/main/java/org/apache/samoa/utils/SerializableSerializer.java @@ -0,0 +1,70 @@ +package org.apache.samoa.utils; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * Serialize and deserialize objects with Java serialization + * + * @author Anh Thu Vu + */ +public class SerializableSerializer extends Serializer<Object> { + @Override + public void write(Kryo kryo, Output output, Object object) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(object); + oos.flush(); + } catch (IOException e) { + throw new RuntimeException(e); + } + byte[] ser = bos.toByteArray(); + output.writeInt(ser.length); + output.writeBytes(ser); + } + + @SuppressWarnings("rawtypes") + @Override + public Object read(Kryo kryo, Input input, Class c) { + int len = input.readInt(); + byte[] ser = new byte[len]; + input.readBytes(ser); + ByteArrayInputStream bis = new ByteArrayInputStream(ser); + try { + ObjectInputStream ois = new ObjectInputStream(bis); + return ois.readObject(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java b/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java new file mode 100644 index 0000000..6c369ae --- /dev/null +++ b/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java @@ -0,0 +1,386 @@ +package org.apache.samoa.utils; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.file.FileSystems; +import java.util.Map; +import java.util.Properties; + +import kafka.admin.AdminUtils; +import kafka.utils.ZKStringSerializer; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.exception.ZkMarshallingError; +import org.I0Itec.zkclient.serialize.ZkSerializer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utilities methods for: - Kafka - HDFS - Handling files on local FS + * + * @author Anh Thu Vu + */ +public class SystemsUtils { + private static final Logger logger = LoggerFactory.getLogger(SystemsUtils.class); + + public static final String HDFS = "hdfs"; + public static final String LOCAL_FS = "local"; + + private static final String TEMP_FILE = "samoaTemp"; + private static final String TEMP_FILE_SUFFIX = ".dat"; + + /* + * Kafka + */ + private static class KafkaUtils { + private static ZkClient zkClient; + + static void setZookeeper(String zk) { + zkClient = new ZkClient(zk, 30000, 30000, new ZKStringSerializerWrapper()); + } + + /* + * Create Kafka topic/stream + */ + static void createKafkaTopic(String name, int partitions, int replicas) { + AdminUtils.createTopic(zkClient, name, partitions, replicas, new Properties()); + } + + static class ZKStringSerializerWrapper implements ZkSerializer { + @Override + public Object deserialize(byte[] byteArray) throws ZkMarshallingError { + return ZKStringSerializer.deserialize(byteArray); + } + + @Override + public byte[] serialize(Object obj) throws ZkMarshallingError { + return ZKStringSerializer.serialize(obj); + } + } + } + + /* + * HDFS + */ + private static class HDFSUtils { + private static String coreConfPath; + private static String hdfsConfPath; + private static String configHomePath; + private static String samoaDir = null; + + static void setHadoopConfigHome(String hadoopConfPath) { + logger.info("Hadoop config home:{}", hadoopConfPath); + configHomePath = hadoopConfPath; + java.nio.file.Path coreSitePath = FileSystems.getDefault().getPath(hadoopConfPath, "core-site.xml"); + java.nio.file.Path hdfsSitePath = FileSystems.getDefault().getPath(hadoopConfPath, "hdfs-site.xml"); + coreConfPath = coreSitePath.toAbsolutePath().toString(); + hdfsConfPath = hdfsSitePath.toAbsolutePath().toString(); + } + + static String getNameNodeUri() { + Configuration config = new Configuration(); + config.addResource(new Path(coreConfPath)); + config.addResource(new Path(hdfsConfPath)); + + return config.get("fs.defaultFS"); + } + + static String getHadoopConfigHome() { + return configHomePath; + } + + static void setSAMOADir(String dir) { + if (dir != null) + samoaDir = getNameNodeUri() + dir; + else + samoaDir = null; + } + + static String getDefaultSAMOADir() throws IOException { + Configuration config = new Configuration(); + config.addResource(new Path(coreConfPath)); + config.addResource(new Path(hdfsConfPath)); + + FileSystem fs = FileSystem.get(config); + Path defaultDir = new Path(fs.getHomeDirectory(), ".samoa"); + return defaultDir.toString(); + } + + static boolean deleteFileIfExist(String absPath) { + Path p = new Path(absPath); + return deleteFileIfExist(p); + } + + static boolean deleteFileIfExist(Path p) { + Configuration config = new Configuration(); + config.addResource(new Path(coreConfPath)); + config.addResource(new Path(hdfsConfPath)); + + FileSystem fs; + try { + fs = FileSystem.get(config); + if (fs.exists(p)) { + return fs.delete(p, false); + } + else + return true; + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return false; + } + + /* + * Write to HDFS + */ + static String writeToHDFS(File file, String dstPath) { + Configuration config = new Configuration(); + config.addResource(new Path(coreConfPath)); + config.addResource(new Path(hdfsConfPath)); + logger.info("Filesystem name:{}", config.get("fs.defaultFS")); + + // Default samoaDir + if (samoaDir == null) { + try { + samoaDir = getDefaultSAMOADir(); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + } + + // Setup src and dst paths + // java.nio.file.Path tempPath = + // FileSystems.getDefault().getPath(samoaDir, dstPath); + Path dst = new Path(samoaDir, dstPath); + Path src = new Path(file.getAbsolutePath()); + + // Delete file if already exists in HDFS + if (deleteFileIfExist(dst) == false) + return null; + + // Copy to HDFS + FileSystem fs; + try { + fs = FileSystem.get(config); + fs.copyFromLocalFile(src, dst); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + + return dst.toString(); // abs path to file + } + + /* + * Read from HDFS + */ + static Object deserializeObjectFromFile(String filePath) { + logger.info("Deserialize HDFS file:{}", filePath); + Configuration config = new Configuration(); + config.addResource(new Path(coreConfPath)); + config.addResource(new Path(hdfsConfPath)); + + Path file = new Path(filePath); + FSDataInputStream dataInputStream = null; + ObjectInputStream ois = null; + Object obj = null; + FileSystem fs; + try { + fs = FileSystem.get(config); + dataInputStream = fs.open(file); + ois = new ObjectInputStream(dataInputStream); + obj = ois.readObject(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (ClassNotFoundException e) { + try { + if (dataInputStream != null) + dataInputStream.close(); + if (ois != null) + ois.close(); + } catch (IOException ioException) { + // TODO auto-generated catch block + e.printStackTrace(); + } + } + + return obj; + } + + } + + private static class LocalFileSystemUtils { + static boolean serializObjectToFile(Object obj, String fn) { + FileOutputStream fos = null; + ObjectOutputStream oos = null; + try { + fos = new FileOutputStream(fn); + oos = new ObjectOutputStream(fos); + oos.writeObject(obj); + } catch (FileNotFoundException e) { + e.printStackTrace(); + return false; + } catch (IOException e) { + e.printStackTrace(); + return false; + } finally { + try { + if (fos != null) + fos.close(); + if (oos != null) + oos.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + return true; + } + + static Object deserializeObjectFromLocalFile(String filename) { + logger.info("Deserialize local file:{}", filename); + FileInputStream fis = null; + ObjectInputStream ois = null; + Object obj = null; + try { + fis = new FileInputStream(filename); + ois = new ObjectInputStream(fis); + obj = ois.readObject(); + } catch (IOException e) { + // TODO auto-generated catch block + e.printStackTrace(); + } catch (ClassNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally { + try { + if (fis != null) + fis.close(); + if (ois != null) + ois.close(); + } catch (IOException e) { + // TODO auto-generated catch block + e.printStackTrace(); + } + } + + return obj; + } + } + + /* + * Create streams + */ + public static void createKafkaTopic(String name, int partitions) { + createKafkaTopic(name, partitions, 1); + } + + public static void createKafkaTopic(String name, int partitions, int replicas) { + KafkaUtils.createKafkaTopic(name, partitions, replicas); + } + + /* + * Serialize object + */ + public static boolean serializeObjectToLocalFileSystem(Object object, String path) { + return LocalFileSystemUtils.serializObjectToFile(object, path); + } + + public static String serializeObjectToHDFS(Object object, String path) { + File tmpDatFile; + try { + tmpDatFile = File.createTempFile(TEMP_FILE, TEMP_FILE_SUFFIX); + if (serializeObjectToLocalFileSystem(object, tmpDatFile.getAbsolutePath())) { + return HDFSUtils.writeToHDFS(tmpDatFile, path); + } + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return null; + } + + /* + * Deserialize object + */ + @SuppressWarnings("unchecked") + public static Map<String, Object> deserializeMapFromFile(String filesystem, String filename) { + Map<String, Object> map; + if (filesystem.equals(HDFS)) { + map = (Map<String, Object>) HDFSUtils.deserializeObjectFromFile(filename); + } + else { + map = (Map<String, Object>) LocalFileSystemUtils.deserializeObjectFromLocalFile(filename); + } + return map; + } + + public static Object deserializeObjectFromFileAndKey(String filesystem, String filename, String key) { + Map<String, Object> map = deserializeMapFromFile(filesystem, filename); + if (map == null) + return null; + return map.get(key); + } + + /* + * Setup + */ + public static void setZookeeper(String zookeeper) { + KafkaUtils.setZookeeper(zookeeper); + } + + public static void setHadoopConfigHome(String hadoopHome) { + HDFSUtils.setHadoopConfigHome(hadoopHome); + } + + public static void setSAMOADir(String samoaDir) { + HDFSUtils.setSAMOADir(samoaDir); + } + + /* + * Others + */ + public static String getHDFSNameNodeUri() { + return HDFSUtils.getNameNodeUri(); + } + + public static String getHadoopConfigHome() { + return HDFSUtils.getHadoopConfigHome(); + } + + public static String copyToHDFS(File file, String dstPath) { + return HDFSUtils.writeToHDFS(file, dstPath); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/resources/log4j.xml b/samoa-samza/src/main/resources/log4j.xml index 1704755..5862b88 100644 --- a/samoa-samza/src/main/resources/log4j.xml +++ b/samoa-samza/src/main/resources/log4j.xml @@ -51,7 +51,7 @@ </layout> </appender> - <category name="com.yahoo.labs" additivity="false"> + <category name="org.apache.samoa" additivity="false"> <priority value="info" /> <appender-ref ref="CONSOLE" /> </category> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-storm/pom.xml b/samoa-storm/pom.xml index abd351b..a5d1012 100644 --- a/samoa-storm/pom.xml +++ b/samoa-storm/pom.xml @@ -31,7 +31,7 @@ <artifactId>samoa-storm</artifactId> <parent> - <groupId>com.yahoo.labs.samoa</groupId> + <groupId>org.apache.samoa</groupId> <artifactId>samoa</artifactId> <version>0.3.0-SNAPSHOT</version> </parent> @@ -45,12 +45,12 @@ <dependencies> <dependency> - <groupId>com.yahoo.labs.samoa</groupId> + <groupId>org.apache.samoa</groupId> <artifactId>samoa-api</artifactId> <version>${project.version}</version> </dependency> <dependency> - <groupId>com.yahoo.labs.samoa</groupId> + <groupId>org.apache.samoa</groupId> <artifactId>samoa-test</artifactId> <type>test-jar</type> <classifier>test-jar-with-dependencies</classifier> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java deleted file mode 100644 index a5e1cdd..0000000 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.yahoo.labs.samoa; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.Config; -import backtype.storm.utils.Utils; - -import com.yahoo.labs.samoa.topology.impl.StormSamoaUtils; -import com.yahoo.labs.samoa.topology.impl.StormTopology; - -/** - * The main class to execute a SAMOA task in LOCAL mode in Storm. - * - * @author Arinto Murdopo - * - */ -public class LocalStormDoTask { - - private static final Logger logger = LoggerFactory.getLogger(LocalStormDoTask.class); - - /** - * The main method. - * - * @param args - * the arguments - */ - public static void main(String[] args) { - - List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); - - int numWorker = StormSamoaUtils.numWorkers(tmpArgs); - - args = tmpArgs.toArray(new String[0]); - - // convert the arguments into Storm topology - StormTopology stormTopo = StormSamoaUtils.argsToTopology(args); - String topologyName = stormTopo.getTopologyName(); - - Config conf = new Config(); - // conf.putAll(Utils.readStormConfig()); - conf.setDebug(false); - - // local mode - conf.setMaxTaskParallelism(numWorker); - - backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster(); - cluster.submitTopology(topologyName, conf, stormTopo.getStormBuilder().createTopology()); - - backtype.storm.utils.Utils.sleep(600 * 1000); - - cluster.killTopology(topologyName); - cluster.shutdown(); - - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java deleted file mode 100644 index d879171..0000000 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import backtype.storm.task.OutputCollector; -import backtype.storm.tuple.Values; -import com.yahoo.labs.samoa.core.ContentEvent; - -/** - * Storm Stream that connects into Bolt. It wraps Storm's outputCollector class - * - * @author Arinto Murdopo - * - */ -class StormBoltStream extends StormStream { - - /** - * - */ - private static final long serialVersionUID = -5712513402991550847L; - - private OutputCollector outputCollector; - - StormBoltStream(String stormComponentId) { - super(stormComponentId); - } - - @Override - public void put(ContentEvent contentEvent) { - outputCollector.emit(this.outputStreamId, new Values(contentEvent, contentEvent.getKey())); - } - - public void setCollector(OutputCollector outputCollector) { - this.outputCollector = outputCollector; - } - - // @Override - // public void setStreamId(String streamId) { - // // TODO Auto-generated method stub - // //this.outputStreamId = streamId; - // } - - @Override - public String getStreamId() { - // TODO Auto-generated method stub - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java deleted file mode 100644 index 0537c3f..0000000 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.HashMap; -import java.util.Map; - -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.topology.ComponentFactory; -import com.yahoo.labs.samoa.topology.EntranceProcessingItem; -import com.yahoo.labs.samoa.topology.IProcessingItem; -import com.yahoo.labs.samoa.topology.ProcessingItem; -import com.yahoo.labs.samoa.topology.Stream; -import com.yahoo.labs.samoa.topology.Topology; - -/** - * Component factory implementation for samoa-storm - */ -public final class StormComponentFactory implements ComponentFactory { - - private final Map<String, Integer> processorList; - - public StormComponentFactory() { - processorList = new HashMap<>(); - } - - @Override - public ProcessingItem createPi(Processor processor) { - return new StormProcessingItem(processor, this.getComponentName(processor.getClass()), 1); - } - - @Override - public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { - return new StormEntranceProcessingItem(processor, this.getComponentName(processor.getClass())); - } - - @Override - public Stream createStream(IProcessingItem sourcePi) { - StormTopologyNode stormCompatiblePi = (StormTopologyNode) sourcePi; - return stormCompatiblePi.createStream(); - } - - @Override - public Topology createTopology(String topoName) { - return new StormTopology(topoName); - } - - private String getComponentName(Class<? extends Processor> clazz) { - StringBuilder componentName = new StringBuilder(clazz.getCanonicalName()); - String key = componentName.toString(); - Integer index; - - if (!processorList.containsKey(key)) { - index = 1; - } else { - index = processorList.get(key) + 1; - } - - processorList.put(key, index); - - componentName.append('_'); - componentName.append(index); - - return componentName.toString(); - } - - @Override - public ProcessingItem createPi(Processor processor, int parallelism) { - return new StormProcessingItem(processor, this.getComponentName(processor.getClass()), parallelism); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java deleted file mode 100644 index 41ee276..0000000 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java +++ /dev/null @@ -1,118 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.Config; -import backtype.storm.utils.Utils; - -/** - * The main class that used by samoa script to execute SAMOA task. - * - * @author Arinto Murdopo - * - */ -public class StormDoTask { - private static final Logger logger = LoggerFactory.getLogger(StormDoTask.class); - private static String localFlag = "local"; - private static String clusterFlag = "cluster"; - - /** - * The main method. - * - * @param args - * the arguments - */ - public static void main(String[] args) { - - List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); - - boolean isLocal = isLocal(tmpArgs); - int numWorker = StormSamoaUtils.numWorkers(tmpArgs); - - args = tmpArgs.toArray(new String[0]); - - // convert the arguments into Storm topology - StormTopology stormTopo = StormSamoaUtils.argsToTopology(args); - String topologyName = stormTopo.getTopologyName(); - - Config conf = new Config(); - conf.putAll(Utils.readStormConfig()); - conf.setDebug(false); - - if (isLocal) { - // local mode - conf.setMaxTaskParallelism(numWorker); - - backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster(); - cluster.submitTopology(topologyName, conf, stormTopo.getStormBuilder().createTopology()); - - backtype.storm.utils.Utils.sleep(600 * 1000); - - cluster.killTopology(topologyName); - cluster.shutdown(); - - } else { - // cluster mode - conf.setNumWorkers(numWorker); - try { - backtype.storm.StormSubmitter.submitTopology(topologyName, conf, - stormTopo.getStormBuilder().createTopology()); - } catch (backtype.storm.generated.AlreadyAliveException ale) { - ale.printStackTrace(); - } catch (backtype.storm.generated.InvalidTopologyException ite) { - ite.printStackTrace(); - } - } - } - - private static boolean isLocal(List<String> tmpArgs) { - ExecutionMode executionMode = ExecutionMode.UNDETERMINED; - - int position = tmpArgs.size() - 1; - String flag = tmpArgs.get(position); - boolean isLocal = true; - - if (flag.equals(clusterFlag)) { - executionMode = ExecutionMode.CLUSTER; - isLocal = false; - } else if (flag.equals(localFlag)) { - executionMode = ExecutionMode.LOCAL; - isLocal = true; - } - - if (executionMode != ExecutionMode.UNDETERMINED) { - tmpArgs.remove(position); - } - - return isLocal; - } - - private enum ExecutionMode { - LOCAL, CLUSTER, UNDETERMINED - }; -}
