http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java new file mode 100644 index 0000000..be13673 --- /dev/null +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java @@ -0,0 +1,56 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.topology.IProcessingItem; + +/** + * Common interface of SamzaEntranceProcessingItem and + * SamzaProcessingItem + * + * @author Anh Thu Vu + */ +public interface SamzaProcessingNode extends IProcessingItem { + /** + * Registers an output stream with this processing item + * + * @param stream + * the output stream + * @return the number of output streams of this processing item + */ + public int addOutputStream(SamzaStream stream); + + /** + * Gets the name/id of this processing item + * + * @return the name/id of this processing item + */ + // TODO: include getName() and setName() in IProcessingItem and/or AbstractEPI/PI + public String getName(); + + /** + * Sets the name/id for this processing item + * @param name + * the name/id of this processing item + */ + // TODO: include getName() and setName() in IProcessingItem and/or AbstractEPI/PI + public void setName(String name); +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java new file mode 100644 index 0000000..c1bf5a2 --- /dev/null +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java @@ -0,0 +1,248 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.MessageCollector; + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.topology.IProcessingItem; +import com.yahoo.labs.samoa.topology.AbstractStream; +import com.yahoo.labs.samoa.utils.PartitioningScheme; +import com.yahoo.labs.samoa.utils.StreamDestination; + +/** + * Stream for SAMOA on Samza + * + * @author Anh Thu Vu + */ +public class SamzaStream extends AbstractStream implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private static final String DEFAULT_SYSTEM_NAME = "kafka"; + + private List<SamzaSystemStream> systemStreams; + private transient MessageCollector collector; + private String systemName; + + /* + * Constructor + */ + public SamzaStream(IProcessingItem sourcePi) { + super(sourcePi); + this.systemName = DEFAULT_SYSTEM_NAME; + // Get name/id for this stream + SamzaProcessingNode samzaPi = (SamzaProcessingNode) sourcePi; + int index = samzaPi.addOutputStream(this); + this.setStreamId(samzaPi.getName()+"-"+Integer.toString(index)); + // init list of SamzaSystemStream + systemStreams = new ArrayList<SamzaSystemStream>(); + } + + /* + * System name (Kafka) + */ + public void setSystemName(String systemName) { + this.systemName = systemName; + for (SamzaSystemStream systemStream:systemStreams) { + systemStream.setSystem(systemName); + } + } + + public String getSystemName() { + return this.systemName; + } + + /* + * Add the PI to the list of destinations. + * Return the name of the corresponding SystemStream. + */ + public SamzaSystemStream addDestination(StreamDestination destination) { + PartitioningScheme scheme = destination.getPartitioningScheme(); + int parallelism = destination.getParallelism(); + + SamzaSystemStream resultStream = null; + for (int i=0; i<systemStreams.size(); i++) { + // There is an existing SystemStream that matches the settings. + // Do not create a new one + if (systemStreams.get(i).isSame(scheme, parallelism)) { + resultStream = systemStreams.get(i); + } + } + + // No existing SystemStream match the requirement + // Create a new one + if (resultStream == null) { + String topicName = this.getStreamId() + "-" + Integer.toString(systemStreams.size()); + resultStream = new SamzaSystemStream(this.systemName,topicName,scheme,parallelism); + systemStreams.add(resultStream); + } + + return resultStream; + } + + public void setCollector(MessageCollector collector) { + this.collector = collector; + } + + public MessageCollector getCollector(){ + return this.collector; + } + + public void onCreate() { + for (SamzaSystemStream stream:systemStreams) { + stream.initSystemStream(); + } + } + + /* + * Implement Stream interface + */ + @Override + public void put(ContentEvent event) { + for (SamzaSystemStream stream:systemStreams) { + stream.send(collector,event); + } + } + + public List<SamzaSystemStream> getSystemStreams() { + return this.systemStreams; + } + + /** + * SamzaSystemStream wrap around a Samza's SystemStream + * It contains the info to create a Samza stream during the + * constructing process of the topology and + * will create the actual Samza stream when the topology is submitted + * (invoking initSystemStream()) + * + * @author Anh Thu Vu + */ + public static class SamzaSystemStream implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1L; + private String system; + private String stream; + private PartitioningScheme scheme; + private int parallelism; + + private transient SystemStream actualSystemStream = null; + + /* + * Constructors + */ + public SamzaSystemStream(String system, String stream, PartitioningScheme scheme, int parallelism) { + this.system = system; + this.stream = stream; + this.scheme = scheme; + this.parallelism = parallelism; + } + + public SamzaSystemStream(String system, String stream, PartitioningScheme scheme) { + this(system, stream, scheme, 1); + } + + /* + * Setters + */ + public void setSystem(String system) { + this.system = system; + } + + /* + * Getters + */ + public String getSystem() { + return this.system; + } + + public String getStream() { + return this.stream; + } + + public PartitioningScheme getPartitioningScheme() { + return this.scheme; + } + + public int getParallelism() { + return this.parallelism; + } + + public boolean isSame(PartitioningScheme scheme, int parallelismHint) { + return (this.scheme == scheme && this.parallelism == parallelismHint); + } + + /* + * Init the actual Samza stream + */ + public void initSystemStream() { + actualSystemStream = new SystemStream(this.system, this.stream); + } + + /* + * Send a ContentEvent + */ + public void send(MessageCollector collector, ContentEvent contentEvent) { + if (actualSystemStream == null) + this.initSystemStream(); + + switch(this.scheme) { + case SHUFFLE: + this.sendShuffle(collector, contentEvent); + break; + case GROUP_BY_KEY: + this.sendGroupByKey(collector, contentEvent); + break; + case BROADCAST: + this.sendBroadcast(collector, contentEvent); + break; + } + } + + /* + * Helpers + */ + private synchronized void sendShuffle(MessageCollector collector, ContentEvent event) { + collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, event)); + } + + private void sendGroupByKey(MessageCollector collector, ContentEvent event) { + collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, event.getKey(), null, event)); + } + + private synchronized void sendBroadcast(MessageCollector collector, ContentEvent event) { + for (int i=0; i<parallelism; i++) { + collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, i, null, event)); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java new file mode 100644 index 0000000..a169bc2 --- /dev/null +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java @@ -0,0 +1,64 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.HashSet; +import java.util.Set; + +import com.yahoo.labs.samoa.topology.IProcessingItem; +import com.yahoo.labs.samoa.topology.AbstractTopology; + +/** + * Topology for Samza + * + * @author Anh Thu Vu + */ +public class SamzaTopology extends AbstractTopology { + private int procItemCounter; + + public SamzaTopology(String topoName) { + super(topoName); + procItemCounter = 0; + } + + @Override + public void addProcessingItem(IProcessingItem procItem, int parallelism) { + super.addProcessingItem(procItem, parallelism); + SamzaProcessingNode samzaPi = (SamzaProcessingNode) procItem; + samzaPi.setName(this.getTopologyName()+"-"+Integer.toString(procItemCounter)); + procItemCounter++; + } + + /* + * Gets the set of ProcessingItems, excluding EntrancePIs + * Used by SamzaConfigFactory as the config for EntrancePIs and + * normal PIs are different + */ + public Set<IProcessingItem> getNonEntranceProcessingItems() throws Exception { + Set<IProcessingItem> copiedSet = new HashSet<IProcessingItem>(); + copiedSet.addAll(this.getProcessingItems()); + boolean result = copiedSet.removeAll(this.getEntranceProcessingItems()); + if (!result) { + throw new Exception("Failed extracting the set of non-entrance processing items"); + } + return copiedSet; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java new file mode 100644 index 0000000..56427d0 --- /dev/null +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java @@ -0,0 +1,532 @@ +package com.yahoo.labs.samoa.utils; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.commons.io.FileUtils; +import org.apache.samza.config.MapConfig; +import org.apache.samza.job.local.LocalJobFactory; +import org.apache.samza.job.yarn.YarnJobFactory; +import org.apache.samza.system.kafka.KafkaSystemFactory; + +import com.yahoo.labs.samoa.topology.EntranceProcessingItem; +import com.yahoo.labs.samoa.topology.IProcessingItem; +import com.yahoo.labs.samoa.topology.ProcessingItem; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.impl.SamoaSystemFactory; +import com.yahoo.labs.samoa.topology.impl.SamzaEntranceProcessingItem; +import com.yahoo.labs.samoa.topology.impl.SamzaProcessingItem; +import com.yahoo.labs.samoa.topology.impl.SamzaStream; +import com.yahoo.labs.samoa.topology.impl.SamzaTopology; +import com.yahoo.labs.samoa.topology.impl.SamzaStream.SamzaSystemStream; + +/** + * Generate Configs that will be used to submit Samza jobs + * from the input topology (one config per PI/EntrancePI in + * the topology) + * + * @author Anh Thu Vu + * + */ +public class SamzaConfigFactory { + public static final String SYSTEM_NAME = "samoa"; + + // DEFAULT VALUES + private static final String DEFAULT_ZOOKEEPER = "localhost:2181"; + private static final String DEFAULT_BROKER_LIST = "localhost:9092"; + + // DELIMINATORS + public static final String COMMA = ","; + public static final String COLON = ":"; + public static final String DOT = "."; + public static final char DOLLAR_SIGN = '$'; + public static final char QUESTION_MARK = '?'; + + // PARTITIONING SCHEMES + public static final String SHUFFLE = "shuffle"; + public static final String KEY = "key"; + public static final String BROADCAST = "broadcast"; + + // PROPERTY KEYS + // JOB + public static final String JOB_FACTORY_CLASS_KEY = "job.factory.class"; + public static final String JOB_NAME_KEY = "job.name"; + // YARN + public static final String YARN_PACKAGE_KEY = "yarn.package.path"; + public static final String CONTAINER_MEMORY_KEY = "yarn.container.memory.mb"; + public static final String AM_MEMORY_KEY = "yarn.am.container.memory.mb"; + public static final String CONTAINER_COUNT_KEY = "yarn.container.count"; + // TASK (SAMZA original) + public static final String TASK_CLASS_KEY = "task.class"; + public static final String TASK_INPUTS_KEY = "task.inputs"; + // TASK (extra) + public static final String FILE_KEY = "task.processor.file"; + public static final String FILESYSTEM_KEY = "task.processor.filesystem"; + public static final String ENTRANCE_INPUT_KEY = "task.entrance.input"; + public static final String ENTRANCE_OUTPUT_KEY = "task.entrance.outputs"; + public static final String YARN_CONF_HOME_KEY = "yarn.config.home"; + // KAFKA + public static final String ZOOKEEPER_URI_KEY = "consumer.zookeeper.connect"; + public static final String BROKER_URI_KEY = "producer.metadata.broker.list"; + public static final String KAFKA_BATCHSIZE_KEY = "producer.batch.num.messages"; + public static final String KAFKA_PRODUCER_TYPE_KEY = "producer.producer.type"; + // SERDE + public static final String SERDE_REGISTRATION_KEY = "kryo.register"; + + // Instance variables + private boolean isLocalMode; + private String zookeeper; + private String kafkaBrokerList; + private int replicationFactor; + private int amMemory; + private int containerMemory; + private int piPerContainerRatio; + private int checkpointFrequency; // in ms + + private String jarPath; + private String kryoRegisterFile = null; + + public SamzaConfigFactory() { + this.isLocalMode = false; + this.zookeeper = DEFAULT_ZOOKEEPER; + this.kafkaBrokerList = DEFAULT_BROKER_LIST; + this.checkpointFrequency = 60000; // default: 1 minute + this.replicationFactor = 1; + } + + /* + * Setter methods + */ + public SamzaConfigFactory setYarnPackage(String packagePath) { + this.jarPath = packagePath; + return this; + } + + public SamzaConfigFactory setLocalMode(boolean isLocal) { + this.isLocalMode = isLocal; + return this; + } + + public SamzaConfigFactory setZookeeper(String zk) { + this.zookeeper = zk; + return this; + } + + public SamzaConfigFactory setKafka(String brokerList) { + this.kafkaBrokerList = brokerList; + return this; + } + + public SamzaConfigFactory setCheckpointFrequency(int freq) { + this.checkpointFrequency = freq; + return this; + } + + public SamzaConfigFactory setReplicationFactor(int replicationFactor) { + this.replicationFactor = replicationFactor; + return this; + } + + public SamzaConfigFactory setAMMemory(int mem) { + this.amMemory = mem; + return this; + } + + public SamzaConfigFactory setContainerMemory(int mem) { + this.containerMemory = mem; + return this; + } + + public SamzaConfigFactory setPiPerContainerRatio(int piPerContainer) { + this.piPerContainerRatio = piPerContainer; + return this; + } + + public SamzaConfigFactory setKryoRegisterFile(String kryoRegister) { + this.kryoRegisterFile = kryoRegister; + return this; + } + + /* + * Generate a map of all config properties for the input SamzaProcessingItem + */ + private Map<String,String> getMapForPI(SamzaProcessingItem pi, String filename, String filesystem) throws Exception { + Map<String,String> map = getBasicSystemConfig(); + + // Set job name, task class, task inputs (from SamzaProcessingItem) + setJobName(map, pi.getName()); + setTaskClass(map, SamzaProcessingItem.class.getName()); + + StringBuilder streamNames = new StringBuilder(); + boolean first = true; + for(SamzaSystemStream stream:pi.getInputStreams()) { + if (!first) streamNames.append(COMMA); + streamNames.append(stream.getSystem()+DOT+stream.getStream()); + if (first) first = false; + } + setTaskInputs(map, streamNames.toString()); + + // Processor file + setFileName(map, filename); + setFileSystem(map, filesystem); + + List<String> nameList = new ArrayList<String>(); + // Default kafka system: kafka0: sync producer + // This system is always required: it is used for checkpointing + nameList.add("kafka0"); + setKafkaSystem(map, "kafka0", this.zookeeper, this.kafkaBrokerList, 1); + // Output streams: set kafka systems + for (SamzaStream stream:pi.getOutputStreams()) { + boolean found = false; + for (String name:nameList) { + if (stream.getSystemName().equals(name)) { + found = true; + break; + } + } + if (!found) { + nameList.add(stream.getSystemName()); + setKafkaSystem(map, stream.getSystemName(), this.zookeeper, this.kafkaBrokerList, stream.getBatchSize()); + } + } + // Input streams: set kafka systems + for (SamzaSystemStream stream:pi.getInputStreams()) { + boolean found = false; + for (String name:nameList) { + if (stream.getSystem().equals(name)) { + found = true; + break; + } + } + if (!found) { + nameList.add(stream.getSystem()); + setKafkaSystem(map, stream.getSystem(), this.zookeeper, this.kafkaBrokerList, 1); + } + } + + // Checkpointing + setValue(map,"task.checkpoint.factory","org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory"); + setValue(map,"task.checkpoint.system","kafka0"); + setValue(map,"task.commit.ms","1000"); + setValue(map,"task.checkpoint.replication.factor",Integer.toString(this.replicationFactor)); + + // Number of containers + setNumberOfContainers(map, pi.getParallelism(), this.piPerContainerRatio); + + return map; + } + + /* + * Generate a map of all config properties for the input SamzaProcessingItem + */ + public Map<String,String> getMapForEntrancePI(SamzaEntranceProcessingItem epi, String filename, String filesystem) { + Map<String,String> map = getBasicSystemConfig(); + + // Set job name, task class (from SamzaEntranceProcessingItem) + setJobName(map, epi.getName()); + setTaskClass(map, SamzaEntranceProcessingItem.class.getName()); + + // Input for the entrance task (from our custom consumer) + setTaskInputs(map, SYSTEM_NAME+"."+epi.getName()); + + // Output from entrance task + // Since entrancePI should have only 1 output stream + // there is no need for checking the batch size, setting different system names + // The custom consumer (samoa system) does not suuport reading from a specific index + // => no need for checkpointing + SamzaStream outputStream = (SamzaStream)epi.getOutputStream(); + // Set samoa system factory + setValue(map, "systems."+SYSTEM_NAME+".samza.factory", SamoaSystemFactory.class.getName()); + // Set Kafka system (only if there is an output stream) + if (outputStream != null) + setKafkaSystem(map, outputStream.getSystemName(), this.zookeeper, this.kafkaBrokerList, outputStream.getBatchSize()); + + // Processor file + setFileName(map, filename); + setFileSystem(map, filesystem); + + // Number of containers + setNumberOfContainers(map, 1, this.piPerContainerRatio); + + return map; + } + + /* + * Generate a list of map (of config properties) for all PIs and EPI in + * the input topology + */ + public List<Map<String,String>> getMapsForTopology(SamzaTopology topology) throws Exception { + + List<Map<String,String>> maps = new ArrayList<Map<String,String>>(); + + // File to write serialized objects + String filename = topology.getTopologyName() + ".dat"; + Path dirPath = FileSystems.getDefault().getPath("dat"); + Path filePath= FileSystems.getDefault().getPath(dirPath.toString(), filename); + String dstPath = filePath.toString(); + String resPath; + String filesystem; + if (this.isLocalMode) { + filesystem = SystemsUtils.LOCAL_FS; + File dir = dirPath.toFile(); + if (!dir.exists()) + FileUtils.forceMkdir(dir); + } + else { + filesystem = SystemsUtils.HDFS; + } + + // Correct system name for streams + this.setSystemNameForStreams(topology.getStreams()); + + // Add all PIs to a collection (map) + Map<String,Object> piMap = new HashMap<String,Object>(); + Set<EntranceProcessingItem> entranceProcessingItems = topology.getEntranceProcessingItems(); + Set<IProcessingItem> processingItems = topology.getNonEntranceProcessingItems(); + for(EntranceProcessingItem epi:entranceProcessingItems) { + SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi; + piMap.put(sepi.getName(), sepi); + } + for(IProcessingItem pi:processingItems) { + SamzaProcessingItem spi = (SamzaProcessingItem) pi; + piMap.put(spi.getName(), spi); + } + + // Serialize all PIs + boolean serialized = false; + if (this.isLocalMode) { + serialized = SystemsUtils.serializeObjectToLocalFileSystem(piMap, dstPath); + resPath = dstPath; + } + else { + resPath = SystemsUtils.serializeObjectToHDFS(piMap, dstPath); + serialized = resPath != null; + } + + if (!serialized) { + throw new Exception("Fail serialize map of PIs to file"); + } + + // MapConfig for all PIs + for(EntranceProcessingItem epi:entranceProcessingItems) { + SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi; + maps.add(this.getMapForEntrancePI(sepi, resPath, filesystem)); + } + for(IProcessingItem pi:processingItems) { + SamzaProcessingItem spi = (SamzaProcessingItem) pi; + maps.add(this.getMapForPI(spi, resPath, filesystem)); + } + + return maps; + } + + /** + * Construct a list of MapConfigs for a Topology + * @return the list of MapConfigs + * @throws Exception + */ + public List<MapConfig> getMapConfigsForTopology(SamzaTopology topology) throws Exception { + List<MapConfig> configs = new ArrayList<MapConfig>(); + List<Map<String,String>> maps = this.getMapsForTopology(topology); + for(Map<String,String> map:maps) { + configs.add(new MapConfig(map)); + } + return configs; + } + + /* + * + */ + public void setSystemNameForStreams(Set<Stream> streams) { + Map<Integer, String> batchSizeMap = new HashMap<Integer, String>(); + batchSizeMap.put(1, "kafka0"); // default system with sync producer + int counter = 0; + for (Stream stream:streams) { + SamzaStream samzaStream = (SamzaStream) stream; + String systemName = batchSizeMap.get(samzaStream.getBatchSize()); + if (systemName == null) { + counter++; + // Add new system + systemName = "kafka"+Integer.toString(counter); + batchSizeMap.put(samzaStream.getBatchSize(), systemName); + } + samzaStream.setSystemName(systemName); + } + +} + + /* + * Generate a map with common properties for PIs and EPI + */ + private Map<String,String> getBasicSystemConfig() { + Map<String,String> map = new HashMap<String,String>(); + // Job & Task + if (this.isLocalMode) + map.put(JOB_FACTORY_CLASS_KEY, LocalJobFactory.class.getName()); + else { + map.put(JOB_FACTORY_CLASS_KEY, YarnJobFactory.class.getName()); + + // yarn + map.put(YARN_PACKAGE_KEY,jarPath); + map.put(CONTAINER_MEMORY_KEY, Integer.toString(this.containerMemory)); + map.put(AM_MEMORY_KEY, Integer.toString(this.amMemory)); + map.put(CONTAINER_COUNT_KEY, "1"); + map.put(YARN_CONF_HOME_KEY, SystemsUtils.getHadoopConfigHome()); + + // Task opts (Heap size = 0.75 container memory) + int heapSize = (int)(0.75*this.containerMemory); + map.put("task.opts", "-Xmx"+Integer.toString(heapSize)+"M -XX:+PrintGCDateStamps"); + } + + + map.put(JOB_NAME_KEY, ""); + map.put(TASK_CLASS_KEY, ""); + map.put(TASK_INPUTS_KEY, ""); + + // register serializer + map.put("serializers.registry.kryo.class",SamzaKryoSerdeFactory.class.getName()); + + // Serde registration + setKryoRegistration(map, this.kryoRegisterFile); + + return map; + } + + /* + * Helper methods to set different properties in the input map + */ + private static void setJobName(Map<String,String> map, String jobName) { + map.put(JOB_NAME_KEY, jobName); + } + + private static void setFileName(Map<String,String> map, String filename) { + map.put(FILE_KEY, filename); + } + + private static void setFileSystem(Map<String,String> map, String filesystem) { + map.put(FILESYSTEM_KEY, filesystem); + } + + private static void setTaskClass(Map<String,String> map, String taskClass) { + map.put(TASK_CLASS_KEY, taskClass); + } + + private static void setTaskInputs(Map<String,String> map, String inputs) { + map.put(TASK_INPUTS_KEY, inputs); + } + + private static void setKryoRegistration(Map<String, String> map, String kryoRegisterFile) { + if (kryoRegisterFile != null) { + String value = readKryoRegistration(kryoRegisterFile); + map.put(SERDE_REGISTRATION_KEY, value); + } + } + + private static void setNumberOfContainers(Map<String, String> map, int parallelism, int piPerContainer) { + int res = parallelism / piPerContainer; + if (parallelism % piPerContainer != 0) res++; + map.put(CONTAINER_COUNT_KEY, Integer.toString(res)); + } + + private static void setKafkaSystem(Map<String,String> map, String systemName, String zk, String brokers, int batchSize) { + map.put("systems."+systemName+".samza.factory",KafkaSystemFactory.class.getName()); + map.put("systems."+systemName+".samza.msg.serde","kryo"); + + map.put("systems."+systemName+"."+ZOOKEEPER_URI_KEY,zk); + map.put("systems."+systemName+"."+BROKER_URI_KEY,brokers); + map.put("systems."+systemName+"."+KAFKA_BATCHSIZE_KEY,Integer.toString(batchSize)); + + map.put("systems."+systemName+".samza.offset.default","oldest"); + + if (batchSize > 1) { + map.put("systems."+systemName+"."+KAFKA_PRODUCER_TYPE_KEY,"async"); + } + else { + map.put("systems."+systemName+"."+KAFKA_PRODUCER_TYPE_KEY,"sync"); + } + } + + // Set custom properties + private static void setValue(Map<String,String> map, String key, String value) { + map.put(key,value); + } + + /* + * Helper method to parse Kryo registration file + */ + private static String readKryoRegistration(String filePath) { + FileInputStream fis = null; + Properties props = new Properties(); + StringBuilder result = new StringBuilder(); + try { + fis = new FileInputStream(filePath); + props.load(fis); + + boolean first = true; + String value = null; + for(String k:props.stringPropertyNames()) { + if (!first) + result.append(COMMA); + else + first = false; + + // Need to avoid the dollar sign as samza pass all the properties in + // the config to containers via commandline parameters/enviroment variables + // We might escape the dollar sign, but it's more complicated than + // replacing it with something else + result.append(k.trim().replace(DOLLAR_SIGN, QUESTION_MARK)); + value = props.getProperty(k); + if (value != null && value.trim().length() > 0) { + result.append(COLON); + result.append(value.trim().replace(DOLLAR_SIGN, QUESTION_MARK)); + } + } + } catch (FileNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally { + if (fis != null) + try { + fis.close(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + return result.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java new file mode 100644 index 0000000..8e9e446 --- /dev/null +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java @@ -0,0 +1,155 @@ +package com.yahoo.labs.samoa.utils; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.ByteArrayOutputStream; + +import org.apache.samza.config.Config; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerdeFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * Implementation of Samza's SerdeFactory + * that uses Kryo to serialize/deserialize objects + * + * @author Anh Thu Vu + * @param <T> + * + */ +public class SamzaKryoSerdeFactory<T> implements SerdeFactory<T> { + + private static final Logger logger = LoggerFactory.getLogger(SamzaKryoSerdeFactory.class); + + public static class SamzaKryoSerde<V> implements Serde<V> { + private Kryo kryo; + + public SamzaKryoSerde (String registrationInfo) { + this.kryo = new Kryo(); + this.register(registrationInfo); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void register(String registrationInfo) { + if (registrationInfo == null) return; + + String[] infoList = registrationInfo.split(SamzaConfigFactory.COMMA); + + Class targetClass = null; + Class serializerClass = null; + Serializer serializer = null; + + for (String info:infoList) { + String[] fields = info.split(SamzaConfigFactory.COLON); + + targetClass = null; + serializerClass = null; + if (fields.length > 0) { + try { + targetClass = Class.forName(fields[0].replace(SamzaConfigFactory.QUESTION_MARK, SamzaConfigFactory.DOLLAR_SIGN)); + } catch (ClassNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + if (fields.length > 1) { + try { + serializerClass = Class.forName(fields[1].replace(SamzaConfigFactory.QUESTION_MARK, SamzaConfigFactory.DOLLAR_SIGN)); + } catch (ClassNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + if (targetClass != null) { + if (serializerClass == null) { + kryo.register(targetClass); + } + else { + serializer = resolveSerializerInstance(kryo, targetClass, (Class<? extends Serializer>)serializerClass) ; + kryo.register(targetClass, serializer); + } + } + else { + logger.info("Invalid registration info:{}",info); + } + } + } + + @SuppressWarnings("rawtypes") + private static Serializer resolveSerializerInstance(Kryo k, Class superClass, Class<? extends Serializer> serializerClass) { + try { + try { + return serializerClass.getConstructor(Kryo.class, Class.class).newInstance(k, superClass); + } catch (Exception ex1) { + try { + return serializerClass.getConstructor(Kryo.class).newInstance(k); + } catch (Exception ex2) { + try { + return serializerClass.getConstructor(Class.class).newInstance(superClass); + } catch (Exception ex3) { + return serializerClass.newInstance(); + } + } + } + } catch (Exception ex) { + throw new IllegalArgumentException("Unable to create serializer \"" + + serializerClass.getName() + + "\" for class: " + + superClass.getName(), ex); + } + } + + /* + * Implement Samza Serde interface + */ + @Override + public byte[] toBytes(V obj) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Output output = new Output(bos); + kryo.writeClassAndObject(output, obj); + output.flush(); + output.close(); + return bos.toByteArray(); + } + + @SuppressWarnings("unchecked") + @Override + public V fromBytes(byte[] byteArr) { + Input input = new Input(byteArr); + Object obj = kryo.readClassAndObject(input); + input.close(); + return (V)obj; + } + + } + + @Override + public Serde<T> getSerde(String name, Config config) { + return new SamzaKryoSerde<T>(config.get(SamzaConfigFactory.SERDE_REGISTRATION_KEY)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java new file mode 100644 index 0000000..4bdbafd --- /dev/null +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java @@ -0,0 +1,70 @@ +package com.yahoo.labs.samoa.utils; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * Serialize and deserialize objects with Java serialization + * + * @author Anh Thu Vu + */ +public class SerializableSerializer extends Serializer<Object> { + @Override + public void write(Kryo kryo, Output output, Object object) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(object); + oos.flush(); + } catch(IOException e) { + throw new RuntimeException(e); + } + byte[] ser = bos.toByteArray(); + output.writeInt(ser.length); + output.writeBytes(ser); + } + + @SuppressWarnings("rawtypes") + @Override + public Object read(Kryo kryo, Input input, Class c) { + int len = input.readInt(); + byte[] ser = new byte[len]; + input.readBytes(ser); + ByteArrayInputStream bis = new ByteArrayInputStream(ser); + try { + ObjectInputStream ois = new ObjectInputStream(bis); + return ois.readObject(); + } catch(Exception e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java new file mode 100644 index 0000000..367f9f9 --- /dev/null +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java @@ -0,0 +1,383 @@ +package com.yahoo.labs.samoa.utils; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.file.FileSystems; +import java.util.Map; +import java.util.Properties; + +import kafka.admin.AdminUtils; +import kafka.utils.ZKStringSerializer; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.exception.ZkMarshallingError; +import org.I0Itec.zkclient.serialize.ZkSerializer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utilities methods for: + * - Kafka + * - HDFS + * - Handling files on local FS + * + * @author Anh Thu Vu + */ +public class SystemsUtils { + private static final Logger logger = LoggerFactory.getLogger(SystemsUtils.class); + + public static final String HDFS = "hdfs"; + public static final String LOCAL_FS = "local"; + + private static final String TEMP_FILE = "samoaTemp"; + private static final String TEMP_FILE_SUFFIX = ".dat"; + + /* + * Kafka + */ + private static class KafkaUtils { + private static ZkClient zkClient; + + static void setZookeeper(String zk) { + zkClient = new ZkClient(zk, 30000, 30000, new ZKStringSerializerWrapper()); + } + + /* + * Create Kafka topic/stream + */ + static void createKafkaTopic(String name, int partitions, int replicas) { + AdminUtils.createTopic(zkClient, name, partitions, replicas, new Properties()); + } + + static class ZKStringSerializerWrapper implements ZkSerializer { + @Override + public Object deserialize(byte[] byteArray) throws ZkMarshallingError { + return ZKStringSerializer.deserialize(byteArray); + } + + @Override + public byte[] serialize(Object obj) throws ZkMarshallingError { + return ZKStringSerializer.serialize(obj); + } + } + } + + /* + * HDFS + */ + private static class HDFSUtils { + private static String coreConfPath; + private static String hdfsConfPath; + private static String configHomePath; + private static String samoaDir = null; + + static void setHadoopConfigHome(String hadoopConfPath) { + logger.info("Hadoop config home:{}",hadoopConfPath); + configHomePath = hadoopConfPath; + java.nio.file.Path coreSitePath = FileSystems.getDefault().getPath(hadoopConfPath, "core-site.xml"); + java.nio.file.Path hdfsSitePath = FileSystems.getDefault().getPath(hadoopConfPath, "hdfs-site.xml"); + coreConfPath = coreSitePath.toAbsolutePath().toString(); + hdfsConfPath = hdfsSitePath.toAbsolutePath().toString(); + } + + static String getNameNodeUri() { + Configuration config = new Configuration(); + config.addResource(new Path(coreConfPath)); + config.addResource(new Path(hdfsConfPath)); + + return config.get("fs.defaultFS"); + } + + static String getHadoopConfigHome() { + return configHomePath; + } + + static void setSAMOADir(String dir) { + if (dir != null) + samoaDir = getNameNodeUri()+dir; + else + samoaDir = null; + } + + static String getDefaultSAMOADir() throws IOException { + Configuration config = new Configuration(); + config.addResource(new Path(coreConfPath)); + config.addResource(new Path(hdfsConfPath)); + + FileSystem fs = FileSystem.get(config); + Path defaultDir = new Path(fs.getHomeDirectory(),".samoa"); + return defaultDir.toString(); + } + + static boolean deleteFileIfExist(String absPath) { + Path p = new Path(absPath); + return deleteFileIfExist(p); + } + + static boolean deleteFileIfExist(Path p) { + Configuration config = new Configuration(); + config.addResource(new Path(coreConfPath)); + config.addResource(new Path(hdfsConfPath)); + + FileSystem fs; + try { + fs = FileSystem.get(config); + if (fs.exists(p)) { + return fs.delete(p, false); + } + else + return true; + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return false; + } + + /* + * Write to HDFS + */ + static String writeToHDFS(File file, String dstPath) { + Configuration config = new Configuration(); + config.addResource(new Path(coreConfPath)); + config.addResource(new Path(hdfsConfPath)); + logger.info("Filesystem name:{}",config.get("fs.defaultFS")); + + // Default samoaDir + if (samoaDir == null) { + try { + samoaDir = getDefaultSAMOADir(); + } + catch (IOException e) { + e.printStackTrace(); + return null; + } + } + + // Setup src and dst paths + //java.nio.file.Path tempPath = FileSystems.getDefault().getPath(samoaDir, dstPath); + Path dst = new Path(samoaDir,dstPath); + Path src = new Path(file.getAbsolutePath()); + + // Delete file if already exists in HDFS + if (deleteFileIfExist(dst) == false) + return null; + + // Copy to HDFS + FileSystem fs; + try { + fs = FileSystem.get(config); + fs.copyFromLocalFile(src, dst); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + + return dst.toString(); // abs path to file + } + + /* + * Read from HDFS + */ + static Object deserializeObjectFromFile(String filePath) { + logger.info("Deserialize HDFS file:{}",filePath); + Configuration config = new Configuration(); + config.addResource(new Path(coreConfPath)); + config.addResource(new Path(hdfsConfPath)); + + Path file = new Path(filePath); + FSDataInputStream dataInputStream = null; + ObjectInputStream ois = null; + Object obj = null; + FileSystem fs; + try { + fs = FileSystem.get(config); + dataInputStream = fs.open(file); + ois = new ObjectInputStream(dataInputStream); + obj = ois.readObject(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (ClassNotFoundException e) { + try { + if (dataInputStream != null) dataInputStream.close(); + if (ois != null) ois.close(); + } catch (IOException ioException) { + // TODO auto-generated catch block + e.printStackTrace(); + } + } + + return obj; + } + + } + + private static class LocalFileSystemUtils { + static boolean serializObjectToFile(Object obj, String fn) { + FileOutputStream fos = null; + ObjectOutputStream oos = null; + try { + fos = new FileOutputStream(fn); + oos = new ObjectOutputStream(fos); + oos.writeObject(obj); + } catch (FileNotFoundException e) { + e.printStackTrace(); + return false; + } catch (IOException e) { + e.printStackTrace(); + return false; + } finally { + try { + if (fos != null) fos.close(); + if (oos != null) oos.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + return true; + } + + static Object deserializeObjectFromLocalFile(String filename) { + logger.info("Deserialize local file:{}",filename); + FileInputStream fis = null; + ObjectInputStream ois = null; + Object obj = null; + try { + fis = new FileInputStream(filename); + ois = new ObjectInputStream(fis); + obj = ois.readObject(); + } catch (IOException e) { + // TODO auto-generated catch block + e.printStackTrace(); + } catch (ClassNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally { + try { + if (fis != null) fis.close(); + if (ois != null) ois.close(); + } catch (IOException e) { + // TODO auto-generated catch block + e.printStackTrace(); + } + } + + return obj; + } + } + + + + /* + * Create streams + */ + public static void createKafkaTopic(String name, int partitions) { + createKafkaTopic(name, partitions, 1); + } + + public static void createKafkaTopic(String name, int partitions, int replicas) { + KafkaUtils.createKafkaTopic(name, partitions, replicas); + } + + /* + * Serialize object + */ + public static boolean serializeObjectToLocalFileSystem(Object object, String path) { + return LocalFileSystemUtils.serializObjectToFile(object, path); + } + + public static String serializeObjectToHDFS(Object object, String path) { + File tmpDatFile; + try { + tmpDatFile = File.createTempFile(TEMP_FILE, TEMP_FILE_SUFFIX); + if (serializeObjectToLocalFileSystem(object, tmpDatFile.getAbsolutePath())) { + return HDFSUtils.writeToHDFS(tmpDatFile, path); + } + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return null; + } + + /* + * Deserialize object + */ + @SuppressWarnings("unchecked") + public static Map<String,Object> deserializeMapFromFile(String filesystem, String filename) { + Map<String,Object> map; + if (filesystem.equals(HDFS)) { + map = (Map<String,Object>) HDFSUtils.deserializeObjectFromFile(filename); + } + else { + map = (Map<String,Object>) LocalFileSystemUtils.deserializeObjectFromLocalFile(filename); + } + return map; + } + + public static Object deserializeObjectFromFileAndKey(String filesystem, String filename, String key) { + Map<String,Object> map = deserializeMapFromFile(filesystem, filename); + if (map == null) return null; + return map.get(key); + } + + /* + * Setup + */ + public static void setZookeeper(String zookeeper) { + KafkaUtils.setZookeeper(zookeeper); + } + + public static void setHadoopConfigHome(String hadoopHome) { + HDFSUtils.setHadoopConfigHome(hadoopHome); + } + + public static void setSAMOADir(String samoaDir) { + HDFSUtils.setSAMOADir(samoaDir); + } + + /* + * Others + */ + public static String getHDFSNameNodeUri() { + return HDFSUtils.getNameNodeUri(); + } + public static String getHadoopConfigHome() { + return HDFSUtils.getHadoopConfigHome(); + } + + public static String copyToHDFS(File file, String dstPath) { + return HDFSUtils.writeToHDFS(file, dstPath); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/resources/log4j.xml b/samoa-samza/src/main/resources/log4j.xml new file mode 100644 index 0000000..86451cf --- /dev/null +++ b/samoa-samza/src/main/resources/log4j.xml @@ -0,0 +1,69 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + #%L + SAMOA + %% + Copyright (C) 2013 - 2014 Yahoo! Inc. + %% + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + #L% + --> + +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + + <appender name="RollingAppender" class="org.apache.log4j.DailyRollingFileAppender"> + <param name="File" value="${samza.log.dir}/${samza.container.name}.log" /> + <param name="DatePattern" value="'.'yyyy-MM-dd" /> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" /> + </layout> + </appender> + + <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender"> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d [%t] %-5p %c (%F:%L) - %m%n"/> + </layout> + </appender> + + <category name="com.yahoo.labs" additivity="false"> + <priority value="info"/> + <appender-ref ref="CONSOLE"/> + </category> + + <root> + <priority value="info" /> + <appender-ref ref="RollingAppender"/> + </root> +</log4j:configuration> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-storm/pom.xml b/samoa-storm/pom.xml new file mode 100644 index 0000000..7929593 --- /dev/null +++ b/samoa-storm/pom.xml @@ -0,0 +1,123 @@ +<!-- + #%L + SAMOA + %% + Copyright (C) 2013 Yahoo! Inc. + %% + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + #L% + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <name>samoa-storm</name> + <description>Storm bindings for SAMOA</description> + + <artifactId>samoa-storm</artifactId> + <parent> + <groupId>com.yahoo.labs.samoa</groupId> + <artifactId>samoa</artifactId> + <version>0.3.0-SNAPSHOT</version> + </parent> + + <repositories> + <repository> <!-- repository for Storm --> + <id>clojars</id> + <url>http://clojars.org/repo/</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>com.yahoo.labs.samoa</groupId> + <artifactId>samoa-api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.yahoo.labs.samoa</groupId> + <artifactId>samoa-test</artifactId> + <type>test-jar</type> + <classifier>test-jar-with-dependencies</classifier> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>storm</groupId> + <artifactId>storm</artifactId> + <version>${storm.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>${zookeeper.storm.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j-log4j12.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- SAMOA assembly --> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>${maven-assembly-plugin.version}</version> + <configuration> + <finalName>SAMOA-Storm-${project.version}</finalName> + <appendAssemblyId>false</appendAssemblyId> + <attach>false</attach> + <outputDirectory>../target</outputDirectory> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + <archive> + <manifestEntries> + <Bundle-Version>${parsedVersion.osgiVersion}</Bundle-Version> + <Bundle-Description>${project.description}</Bundle-Description> + <Implementation-Version>${project.version}</Implementation-Version> + <Implementation-Vendor>Yahoo Labs</Implementation-Vendor> + <Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id> + </manifestEntries> + </archive> + </configuration> + <executions> + <execution> + <id>make-assembly</id> <!-- this is used for inheritance merges --> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>${maven-surefire-plugin.version}</version> + <configuration> + <argLine>-Xmx1G</argLine> + <redirectTestOutputToFile>false</redirectTestOutputToFile> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java new file mode 100644 index 0000000..54792ae --- /dev/null +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java @@ -0,0 +1,78 @@ +package com.yahoo.labs.samoa; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.Config; +import backtype.storm.utils.Utils; + +import com.yahoo.labs.samoa.topology.impl.StormSamoaUtils; +import com.yahoo.labs.samoa.topology.impl.StormTopology; + +/** + * The main class to execute a SAMOA task in LOCAL mode in Storm. + * + * @author Arinto Murdopo + * + */ +public class LocalStormDoTask { + + private static final Logger logger = LoggerFactory.getLogger(LocalStormDoTask.class); + + /** + * The main method. + * + * @param args the arguments + */ + public static void main(String[] args) { + + List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + + int numWorker = StormSamoaUtils.numWorkers(tmpArgs); + + args = tmpArgs.toArray(new String[0]); + + //convert the arguments into Storm topology + StormTopology stormTopo = StormSamoaUtils.argsToTopology(args); + String topologyName = stormTopo.getTopologyName(); + + Config conf = new Config(); + //conf.putAll(Utils.readStormConfig()); + conf.setDebug(false); + + //local mode + conf.setMaxTaskParallelism(numWorker); + + backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster(); + cluster.submitTopology(topologyName, conf, stormTopo.getStormBuilder().createTopology()); + + backtype.storm.utils.Utils.sleep(600 * 1000); + + cluster.killTopology(topologyName); + cluster.shutdown(); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java new file mode 100644 index 0000000..ad9794d --- /dev/null +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java @@ -0,0 +1,65 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import backtype.storm.task.OutputCollector; +import backtype.storm.tuple.Values; +import com.yahoo.labs.samoa.core.ContentEvent; + +/** + * Storm Stream that connects into Bolt. It wraps Storm's outputCollector class + * @author Arinto Murdopo + * + */ +class StormBoltStream extends StormStream{ + + /** + * + */ + private static final long serialVersionUID = -5712513402991550847L; + + private OutputCollector outputCollector; + + StormBoltStream(String stormComponentId){ + super(stormComponentId); + } + + @Override + public void put(ContentEvent contentEvent) { + outputCollector.emit(this.outputStreamId, new Values(contentEvent, contentEvent.getKey())); + } + + public void setCollector(OutputCollector outputCollector){ + this.outputCollector = outputCollector; + } + +// @Override +// public void setStreamId(String streamId) { +// // TODO Auto-generated method stub +// //this.outputStreamId = streamId; +// } + + @Override + public String getStreamId() { + // TODO Auto-generated method stub + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java new file mode 100644 index 0000000..347fd50 --- /dev/null +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java @@ -0,0 +1,90 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.HashMap; +import java.util.Map; + +import com.yahoo.labs.samoa.core.EntranceProcessor; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.topology.ComponentFactory; +import com.yahoo.labs.samoa.topology.EntranceProcessingItem; +import com.yahoo.labs.samoa.topology.IProcessingItem; +import com.yahoo.labs.samoa.topology.ProcessingItem; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.Topology; + +/** + * Component factory implementation for samoa-storm + */ +public final class StormComponentFactory implements ComponentFactory { + + private final Map<String, Integer> processorList; + + public StormComponentFactory() { + processorList = new HashMap<>(); + } + + @Override + public ProcessingItem createPi(Processor processor) { + return new StormProcessingItem(processor, this.getComponentName(processor.getClass()), 1); + } + + @Override + public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { + return new StormEntranceProcessingItem(processor, this.getComponentName(processor.getClass())); + } + + @Override + public Stream createStream(IProcessingItem sourcePi) { + StormTopologyNode stormCompatiblePi = (StormTopologyNode) sourcePi; + return stormCompatiblePi.createStream(); + } + + @Override + public Topology createTopology(String topoName) { + return new StormTopology(topoName); + } + + private String getComponentName(Class<? extends Processor> clazz) { + StringBuilder componentName = new StringBuilder(clazz.getCanonicalName()); + String key = componentName.toString(); + Integer index; + + if (!processorList.containsKey(key)) { + index = 1; + } else { + index = processorList.get(key) + 1; + } + + processorList.put(key, index); + + componentName.append('_'); + componentName.append(index); + + return componentName.toString(); + } + + @Override + public ProcessingItem createPi(Processor processor, int parallelism) { + return new StormProcessingItem(processor, this.getComponentName(processor.getClass()), parallelism); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java new file mode 100644 index 0000000..fc0630a --- /dev/null +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java @@ -0,0 +1,117 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.Config; +import backtype.storm.utils.Utils; + +/** + * The main class that used by samoa script to execute SAMOA task. + * + * @author Arinto Murdopo + * + */ +public class StormDoTask { + private static final Logger logger = LoggerFactory.getLogger(StormDoTask.class); + private static String localFlag = "local"; + private static String clusterFlag = "cluster"; + + /** + * The main method. + * + * @param args the arguments + */ + public static void main(String[] args) { + + List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + + boolean isLocal = isLocal(tmpArgs); + int numWorker = StormSamoaUtils.numWorkers(tmpArgs); + + args = tmpArgs.toArray(new String[0]); + + //convert the arguments into Storm topology + StormTopology stormTopo = StormSamoaUtils.argsToTopology(args); + String topologyName = stormTopo.getTopologyName(); + + Config conf = new Config(); + conf.putAll(Utils.readStormConfig()); + conf.setDebug(false); + + + if(isLocal){ + //local mode + conf.setMaxTaskParallelism(numWorker); + + backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster(); + cluster.submitTopology(topologyName , conf, stormTopo.getStormBuilder().createTopology()); + + backtype.storm.utils.Utils.sleep(600*1000); + + cluster.killTopology(topologyName); + cluster.shutdown(); + + }else{ + //cluster mode + conf.setNumWorkers(numWorker); + try { + backtype.storm.StormSubmitter.submitTopology(topologyName, conf, + stormTopo.getStormBuilder().createTopology()); + } catch (backtype.storm.generated.AlreadyAliveException ale) { + ale.printStackTrace(); + } catch (backtype.storm.generated.InvalidTopologyException ite) { + ite.printStackTrace(); + } + } + } + + private static boolean isLocal(List<String> tmpArgs){ + ExecutionMode executionMode = ExecutionMode.UNDETERMINED; + + int position = tmpArgs.size() - 1; + String flag = tmpArgs.get(position); + boolean isLocal = true; + + if(flag.equals(clusterFlag)){ + executionMode = ExecutionMode.CLUSTER; + isLocal = false; + }else if(flag.equals(localFlag)){ + executionMode = ExecutionMode.LOCAL; + isLocal = true; + } + + if(executionMode != ExecutionMode.UNDETERMINED){ + tmpArgs.remove(position); + } + + return isLocal; + } + + private enum ExecutionMode {LOCAL, CLUSTER, UNDETERMINED}; +} + http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java new file mode 100644 index 0000000..d4d80bf --- /dev/null +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java @@ -0,0 +1,208 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.Map; +import java.util.UUID; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.EntranceProcessor; +import com.yahoo.labs.samoa.topology.AbstractEntranceProcessingItem; +import com.yahoo.labs.samoa.topology.EntranceProcessingItem; +import com.yahoo.labs.samoa.topology.Stream; + +/** + * EntranceProcessingItem implementation for Storm. + */ +class StormEntranceProcessingItem extends AbstractEntranceProcessingItem implements StormTopologyNode { + private final StormEntranceSpout piSpout; + + StormEntranceProcessingItem(EntranceProcessor processor) { + this(processor, UUID.randomUUID().toString()); + } + + StormEntranceProcessingItem(EntranceProcessor processor, String friendlyId) { + super(processor); + this.setName(friendlyId); + this.piSpout = new StormEntranceSpout(processor); + } + + @Override + public EntranceProcessingItem setOutputStream(Stream stream) { + // piSpout.streams.add(stream); + piSpout.setOutputStream((StormStream) stream); + return this; + } + + @Override + public Stream getOutputStream() { + return piSpout.getOutputStream(); + } + + @Override + public void addToTopology(StormTopology topology, int parallelismHint) { + topology.getStormBuilder().setSpout(this.getName(), piSpout, parallelismHint); + } + + @Override + public StormStream createStream() { + return piSpout.createStream(this.getName()); + } + + @Override + public String getId() { + return this.getName(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(super.toString()); + sb.insert(0, String.format("id: %s, ", this.getName())); + return sb.toString(); + } + + /** + * Resulting Spout of StormEntranceProcessingItem + */ + final static class StormEntranceSpout extends BaseRichSpout { + + private static final long serialVersionUID = -9066409791668954099L; + + // private final Set<StormSpoutStream> streams; + private final EntranceProcessor entranceProcessor; + private StormStream outputStream; + + // private transient SpoutStarter spoutStarter; + // private transient Executor spoutExecutors; + // private transient LinkedBlockingQueue<StormTupleInfo> tupleInfoQueue; + + private SpoutOutputCollector collector; + + StormEntranceSpout(EntranceProcessor processor) { + // this.streams = new HashSet<StormSpoutStream>(); + this.entranceProcessor = processor; + } + + public StormStream getOutputStream() { + return outputStream; + } + + public void setOutputStream(StormStream stream) { + this.outputStream = stream; + } + + @Override + public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; + // this.tupleInfoQueue = new LinkedBlockingQueue<StormTupleInfo>(); + + // Processor and this class share the same instance of stream + // for (StormSpoutStream stream : streams) { + // stream.setSpout(this); + // } + // outputStream.setSpout(this); + + this.entranceProcessor.onCreate(context.getThisTaskId()); + // this.spoutStarter = new SpoutStarter(this.starter); + + // this.spoutExecutors = Executors.newSingleThreadExecutor(); + // this.spoutExecutors.execute(spoutStarter); + } + + @Override + public void nextTuple() { + if (entranceProcessor.hasNext()) { + Values value = newValues(entranceProcessor.nextEvent()); + collector.emit(outputStream.getOutputId(), value); + } else + Utils.sleep(1000); + // StormTupleInfo tupleInfo = tupleInfoQueue.poll(50, TimeUnit.MILLISECONDS); + // if (tupleInfo != null) { + // Values value = newValues(tupleInfo.getContentEvent()); + // collector.emit(tupleInfo.getStormStream().getOutputId(), value); + // } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // for (StormStream stream : streams) { + // declarer.declareStream(stream.getOutputId(), new Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, StormSamoaUtils.KEY_FIELD)); + // } + declarer.declareStream(outputStream.getOutputId(), new Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, StormSamoaUtils.KEY_FIELD)); + } + + StormStream createStream(String piId) { + // StormSpoutStream stream = new StormSpoutStream(piId); + StormStream stream = new StormBoltStream(piId); + // streams.add(stream); + return stream; + } + + // void put(StormSpoutStream stream, ContentEvent contentEvent) { + // tupleInfoQueue.add(new StormTupleInfo(stream, contentEvent)); + // } + + private Values newValues(ContentEvent contentEvent) { + return new Values(contentEvent, contentEvent.getKey()); + } + + // private final static class StormTupleInfo { + // + // private final StormStream stream; + // private final ContentEvent event; + // + // StormTupleInfo(StormStream stream, ContentEvent event) { + // this.stream = stream; + // this.event = event; + // } + // + // public StormStream getStormStream() { + // return this.stream; + // } + // + // public ContentEvent getContentEvent() { + // return this.event; + // } + // } + + // private final static class SpoutStarter implements Runnable { + // + // private final TopologyStarter topoStarter; + // + // SpoutStarter(TopologyStarter topoStarter) { + // this.topoStarter = topoStarter; + // } + // + // @Override + // public void run() { + // this.topoStarter.start(); + // } + // } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java new file mode 100644 index 0000000..5f86855 --- /dev/null +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java @@ -0,0 +1,75 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Properties; + +import backtype.storm.Config; +import backtype.storm.StormSubmitter; +import backtype.storm.utils.Utils; + +/** + * Utility class to submit samoa-storm jar to a Storm cluster. + * + * @author Arinto Murdopo + * + */ +public class StormJarSubmitter { + + public final static String UPLOADED_JAR_LOCATION_KEY = "UploadedJarLocation"; + + /** + * @param args + * @throws IOException + */ + public static void main(String[] args) throws IOException { + + Config config = new Config(); + config.putAll(Utils.readCommandLineOpts()); + config.putAll(Utils.readStormConfig()); + + String nimbusHost = (String) config.get(Config.NIMBUS_HOST); + int nimbusThriftPort = Utils.getInt(config + .get(Config.NIMBUS_THRIFT_PORT)); + + System.out.println("Nimbus host " + nimbusHost); + System.out.println("Nimbus thrift port " + nimbusThriftPort); + + System.out.println("uploading jar from " + args[0]); + String uploadedJarLocation = StormSubmitter.submitJar(config, args[0]); + + System.out.println("Uploaded jar file location: "); + System.out.println(uploadedJarLocation); + + Properties props = StormSamoaUtils.getProperties(); + props.setProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY, uploadedJarLocation); + + File f = new File("src/main/resources/samoa-storm-cluster.properties"); + f.createNewFile(); + + OutputStream out = new FileOutputStream(f); + props.store(out, "properties file to store uploaded jar location from StormJarSubmitter"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java new file mode 100644 index 0000000..73879f6 --- /dev/null +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java @@ -0,0 +1,170 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.topology.AbstractProcessingItem; +import com.yahoo.labs.samoa.topology.ProcessingItem; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.impl.StormStream.InputStreamId; +import com.yahoo.labs.samoa.utils.PartitioningScheme; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.BoltDeclarer; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; + +/** + * ProcessingItem implementation for Storm. + * @author Arinto Murdopo + * + */ +class StormProcessingItem extends AbstractProcessingItem implements StormTopologyNode { + private final ProcessingItemBolt piBolt; + private BoltDeclarer piBoltDeclarer; + + //TODO: should we put parallelism hint here? + //imo, parallelism hint only declared when we add this PI in the topology + //open for dicussion :p + + StormProcessingItem(Processor processor, int parallelismHint){ + this(processor, UUID.randomUUID().toString(), parallelismHint); + } + + StormProcessingItem(Processor processor, String friendlyId, int parallelismHint){ + super(processor, parallelismHint); + this.piBolt = new ProcessingItemBolt(processor); + this.setName(friendlyId); + } + + @Override + protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { + StormStream stormInputStream = (StormStream) inputStream; + InputStreamId inputId = stormInputStream.getInputId(); + + switch(scheme) { + case SHUFFLE: + piBoltDeclarer.shuffleGrouping(inputId.getComponentId(),inputId.getStreamId()); + break; + case GROUP_BY_KEY: + piBoltDeclarer.fieldsGrouping( + inputId.getComponentId(), + inputId.getStreamId(), + new Fields(StormSamoaUtils.KEY_FIELD)); + break; + case BROADCAST: + piBoltDeclarer.allGrouping( + inputId.getComponentId(), + inputId.getStreamId()); + break; + } + return this; + } + + @Override + public void addToTopology(StormTopology topology, int parallelismHint) { + if(piBoltDeclarer != null){ + //throw exception that one PI only belong to one topology + }else{ + TopologyBuilder stormBuilder = topology.getStormBuilder(); + this.piBoltDeclarer = stormBuilder.setBolt(this.getName(), + this.piBolt, parallelismHint); + } + } + + @Override + public StormStream createStream() { + return piBolt.createStream(this.getName()); + } + + @Override + public String getId() { + return this.getName(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(super.toString()); + sb.insert(0, String.format("id: %s, ", this.getName())); + return sb.toString(); + } + + private final static class ProcessingItemBolt extends BaseRichBolt{ + + private static final long serialVersionUID = -6637673741263199198L; + + private final Set<StormBoltStream> streams; + private final Processor processor; + + private OutputCollector collector; + + ProcessingItemBolt(Processor processor){ + this.streams = new HashSet<StormBoltStream>(); + this.processor = processor; + } + + @Override + public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, + OutputCollector collector) { + this.collector = collector; + //Processor and this class share the same instance of stream + for(StormBoltStream stream: streams){ + stream.setCollector(this.collector); + } + + this.processor.onCreate(context.getThisTaskId()); + } + + @Override + public void execute(Tuple input) { + Object sentObject = input.getValue(0); + ContentEvent sentEvent = (ContentEvent)sentObject; + processor.process(sentEvent); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + for(StormStream stream: streams){ + declarer.declareStream(stream.getOutputId(), + new Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, + StormSamoaUtils.KEY_FIELD)); + } + } + + StormStream createStream(String piId){ + StormBoltStream stream = new StormBoltStream(piId); + streams.add(stream); + return stream; + } + } +} + +
