http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java index c1bf5a2..a855e46 100644 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java @@ -39,210 +39,209 @@ import com.yahoo.labs.samoa.utils.StreamDestination; * * @author Anh Thu Vu */ -public class SamzaStream extends AbstractStream implements Serializable { +public class SamzaStream extends AbstractStream implements Serializable { - /** + /** * */ - private static final long serialVersionUID = 1L; - - private static final String DEFAULT_SYSTEM_NAME = "kafka"; - - private List<SamzaSystemStream> systemStreams; - private transient MessageCollector collector; - private String systemName; - - /* - * Constructor - */ - public SamzaStream(IProcessingItem sourcePi) { - super(sourcePi); - this.systemName = DEFAULT_SYSTEM_NAME; - // Get name/id for this stream - SamzaProcessingNode samzaPi = (SamzaProcessingNode) sourcePi; - int index = samzaPi.addOutputStream(this); - this.setStreamId(samzaPi.getName()+"-"+Integer.toString(index)); - // init list of SamzaSystemStream - systemStreams = new ArrayList<SamzaSystemStream>(); - } - - /* - * System name (Kafka) - */ - public void setSystemName(String systemName) { - this.systemName = systemName; - for (SamzaSystemStream systemStream:systemStreams) { - systemStream.setSystem(systemName); - } - } - - public String getSystemName() { - return this.systemName; - } - - /* - * Add the PI to the list of destinations. - * Return the name of the corresponding SystemStream. - */ - public SamzaSystemStream addDestination(StreamDestination destination) { - PartitioningScheme scheme = destination.getPartitioningScheme(); - int parallelism = destination.getParallelism(); - - SamzaSystemStream resultStream = null; - for (int i=0; i<systemStreams.size(); i++) { - // There is an existing SystemStream that matches the settings. - // Do not create a new one - if (systemStreams.get(i).isSame(scheme, parallelism)) { - resultStream = systemStreams.get(i); - } - } - - // No existing SystemStream match the requirement - // Create a new one - if (resultStream == null) { - String topicName = this.getStreamId() + "-" + Integer.toString(systemStreams.size()); - resultStream = new SamzaSystemStream(this.systemName,topicName,scheme,parallelism); - systemStreams.add(resultStream); - } - - return resultStream; - } - - public void setCollector(MessageCollector collector) { - this.collector = collector; - } - - public MessageCollector getCollector(){ - return this.collector; - } - - public void onCreate() { - for (SamzaSystemStream stream:systemStreams) { - stream.initSystemStream(); - } - } - - /* - * Implement Stream interface - */ - @Override - public void put(ContentEvent event) { - for (SamzaSystemStream stream:systemStreams) { - stream.send(collector,event); - } - } - - public List<SamzaSystemStream> getSystemStreams() { - return this.systemStreams; - } - - /** - * SamzaSystemStream wrap around a Samza's SystemStream - * It contains the info to create a Samza stream during the - * constructing process of the topology and - * will create the actual Samza stream when the topology is submitted - * (invoking initSystemStream()) - * - * @author Anh Thu Vu - */ - public static class SamzaSystemStream implements Serializable { - /** + private static final long serialVersionUID = 1L; + + private static final String DEFAULT_SYSTEM_NAME = "kafka"; + + private List<SamzaSystemStream> systemStreams; + private transient MessageCollector collector; + private String systemName; + + /* + * Constructor + */ + public SamzaStream(IProcessingItem sourcePi) { + super(sourcePi); + this.systemName = DEFAULT_SYSTEM_NAME; + // Get name/id for this stream + SamzaProcessingNode samzaPi = (SamzaProcessingNode) sourcePi; + int index = samzaPi.addOutputStream(this); + this.setStreamId(samzaPi.getName() + "-" + Integer.toString(index)); + // init list of SamzaSystemStream + systemStreams = new ArrayList<SamzaSystemStream>(); + } + + /* + * System name (Kafka) + */ + public void setSystemName(String systemName) { + this.systemName = systemName; + for (SamzaSystemStream systemStream : systemStreams) { + systemStream.setSystem(systemName); + } + } + + public String getSystemName() { + return this.systemName; + } + + /* + * Add the PI to the list of destinations. Return the name of the + * corresponding SystemStream. + */ + public SamzaSystemStream addDestination(StreamDestination destination) { + PartitioningScheme scheme = destination.getPartitioningScheme(); + int parallelism = destination.getParallelism(); + + SamzaSystemStream resultStream = null; + for (int i = 0; i < systemStreams.size(); i++) { + // There is an existing SystemStream that matches the settings. + // Do not create a new one + if (systemStreams.get(i).isSame(scheme, parallelism)) { + resultStream = systemStreams.get(i); + } + } + + // No existing SystemStream match the requirement + // Create a new one + if (resultStream == null) { + String topicName = this.getStreamId() + "-" + Integer.toString(systemStreams.size()); + resultStream = new SamzaSystemStream(this.systemName, topicName, scheme, parallelism); + systemStreams.add(resultStream); + } + + return resultStream; + } + + public void setCollector(MessageCollector collector) { + this.collector = collector; + } + + public MessageCollector getCollector() { + return this.collector; + } + + public void onCreate() { + for (SamzaSystemStream stream : systemStreams) { + stream.initSystemStream(); + } + } + + /* + * Implement Stream interface + */ + @Override + public void put(ContentEvent event) { + for (SamzaSystemStream stream : systemStreams) { + stream.send(collector, event); + } + } + + public List<SamzaSystemStream> getSystemStreams() { + return this.systemStreams; + } + + /** + * SamzaSystemStream wrap around a Samza's SystemStream It contains the info + * to create a Samza stream during the constructing process of the topology + * and will create the actual Samza stream when the topology is submitted + * (invoking initSystemStream()) + * + * @author Anh Thu Vu + */ + public static class SamzaSystemStream implements Serializable { + /** * */ - private static final long serialVersionUID = 1L; - private String system; - private String stream; - private PartitioningScheme scheme; - private int parallelism; - - private transient SystemStream actualSystemStream = null; - - /* - * Constructors - */ - public SamzaSystemStream(String system, String stream, PartitioningScheme scheme, int parallelism) { - this.system = system; - this.stream = stream; - this.scheme = scheme; - this.parallelism = parallelism; - } - - public SamzaSystemStream(String system, String stream, PartitioningScheme scheme) { - this(system, stream, scheme, 1); - } - - /* - * Setters - */ - public void setSystem(String system) { - this.system = system; - } - - /* - * Getters - */ - public String getSystem() { - return this.system; - } - - public String getStream() { - return this.stream; - } - - public PartitioningScheme getPartitioningScheme() { - return this.scheme; - } - - public int getParallelism() { - return this.parallelism; - } - - public boolean isSame(PartitioningScheme scheme, int parallelismHint) { - return (this.scheme == scheme && this.parallelism == parallelismHint); - } - - /* - * Init the actual Samza stream - */ - public void initSystemStream() { - actualSystemStream = new SystemStream(this.system, this.stream); - } - - /* - * Send a ContentEvent - */ - public void send(MessageCollector collector, ContentEvent contentEvent) { - if (actualSystemStream == null) - this.initSystemStream(); - - switch(this.scheme) { - case SHUFFLE: - this.sendShuffle(collector, contentEvent); - break; - case GROUP_BY_KEY: - this.sendGroupByKey(collector, contentEvent); - break; - case BROADCAST: - this.sendBroadcast(collector, contentEvent); - break; - } - } - - /* - * Helpers - */ - private synchronized void sendShuffle(MessageCollector collector, ContentEvent event) { - collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, event)); - } - - private void sendGroupByKey(MessageCollector collector, ContentEvent event) { - collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, event.getKey(), null, event)); - } - - private synchronized void sendBroadcast(MessageCollector collector, ContentEvent event) { - for (int i=0; i<parallelism; i++) { - collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, i, null, event)); - } - } - } + private static final long serialVersionUID = 1L; + private String system; + private String stream; + private PartitioningScheme scheme; + private int parallelism; + + private transient SystemStream actualSystemStream = null; + + /* + * Constructors + */ + public SamzaSystemStream(String system, String stream, PartitioningScheme scheme, int parallelism) { + this.system = system; + this.stream = stream; + this.scheme = scheme; + this.parallelism = parallelism; + } + + public SamzaSystemStream(String system, String stream, PartitioningScheme scheme) { + this(system, stream, scheme, 1); + } + + /* + * Setters + */ + public void setSystem(String system) { + this.system = system; + } + + /* + * Getters + */ + public String getSystem() { + return this.system; + } + + public String getStream() { + return this.stream; + } + + public PartitioningScheme getPartitioningScheme() { + return this.scheme; + } + + public int getParallelism() { + return this.parallelism; + } + + public boolean isSame(PartitioningScheme scheme, int parallelismHint) { + return (this.scheme == scheme && this.parallelism == parallelismHint); + } + + /* + * Init the actual Samza stream + */ + public void initSystemStream() { + actualSystemStream = new SystemStream(this.system, this.stream); + } + + /* + * Send a ContentEvent + */ + public void send(MessageCollector collector, ContentEvent contentEvent) { + if (actualSystemStream == null) + this.initSystemStream(); + + switch (this.scheme) { + case SHUFFLE: + this.sendShuffle(collector, contentEvent); + break; + case GROUP_BY_KEY: + this.sendGroupByKey(collector, contentEvent); + break; + case BROADCAST: + this.sendBroadcast(collector, contentEvent); + break; + } + } + + /* + * Helpers + */ + private synchronized void sendShuffle(MessageCollector collector, ContentEvent event) { + collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, event)); + } + + private void sendGroupByKey(MessageCollector collector, ContentEvent event) { + collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, event.getKey(), null, event)); + } + + private synchronized void sendBroadcast(MessageCollector collector, ContentEvent event) { + for (int i = 0; i < parallelism; i++) { + collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, i, null, event)); + } + } + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java index a169bc2..4e52966 100644 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java @@ -32,33 +32,33 @@ import com.yahoo.labs.samoa.topology.AbstractTopology; * @author Anh Thu Vu */ public class SamzaTopology extends AbstractTopology { - private int procItemCounter; - - public SamzaTopology(String topoName) { - super(topoName); - procItemCounter = 0; - } - - @Override - public void addProcessingItem(IProcessingItem procItem, int parallelism) { - super.addProcessingItem(procItem, parallelism); - SamzaProcessingNode samzaPi = (SamzaProcessingNode) procItem; - samzaPi.setName(this.getTopologyName()+"-"+Integer.toString(procItemCounter)); - procItemCounter++; - } - - /* - * Gets the set of ProcessingItems, excluding EntrancePIs - * Used by SamzaConfigFactory as the config for EntrancePIs and - * normal PIs are different - */ - public Set<IProcessingItem> getNonEntranceProcessingItems() throws Exception { - Set<IProcessingItem> copiedSet = new HashSet<IProcessingItem>(); - copiedSet.addAll(this.getProcessingItems()); - boolean result = copiedSet.removeAll(this.getEntranceProcessingItems()); - if (!result) { - throw new Exception("Failed extracting the set of non-entrance processing items"); - } - return copiedSet; - } + private int procItemCounter; + + public SamzaTopology(String topoName) { + super(topoName); + procItemCounter = 0; + } + + @Override + public void addProcessingItem(IProcessingItem procItem, int parallelism) { + super.addProcessingItem(procItem, parallelism); + SamzaProcessingNode samzaPi = (SamzaProcessingNode) procItem; + samzaPi.setName(this.getTopologyName() + "-" + Integer.toString(procItemCounter)); + procItemCounter++; + } + + /* + * Gets the set of ProcessingItems, excluding EntrancePIs Used by + * SamzaConfigFactory as the config for EntrancePIs and normal PIs are + * different + */ + public Set<IProcessingItem> getNonEntranceProcessingItems() throws Exception { + Set<IProcessingItem> copiedSet = new HashSet<IProcessingItem>(); + copiedSet.addAll(this.getProcessingItems()); + boolean result = copiedSet.removeAll(this.getEntranceProcessingItems()); + if (!result) { + throw new Exception("Failed extracting the set of non-entrance processing items"); + } + return copiedSet; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java index 56427d0..03f35f1 100644 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java @@ -51,482 +51,489 @@ import com.yahoo.labs.samoa.topology.impl.SamzaTopology; import com.yahoo.labs.samoa.topology.impl.SamzaStream.SamzaSystemStream; /** - * Generate Configs that will be used to submit Samza jobs - * from the input topology (one config per PI/EntrancePI in - * the topology) + * Generate Configs that will be used to submit Samza jobs from the input + * topology (one config per PI/EntrancePI in the topology) * * @author Anh Thu Vu - * + * */ public class SamzaConfigFactory { - public static final String SYSTEM_NAME = "samoa"; - - // DEFAULT VALUES - private static final String DEFAULT_ZOOKEEPER = "localhost:2181"; - private static final String DEFAULT_BROKER_LIST = "localhost:9092"; - - // DELIMINATORS - public static final String COMMA = ","; - public static final String COLON = ":"; - public static final String DOT = "."; - public static final char DOLLAR_SIGN = '$'; - public static final char QUESTION_MARK = '?'; - - // PARTITIONING SCHEMES - public static final String SHUFFLE = "shuffle"; - public static final String KEY = "key"; - public static final String BROADCAST = "broadcast"; - - // PROPERTY KEYS - // JOB - public static final String JOB_FACTORY_CLASS_KEY = "job.factory.class"; - public static final String JOB_NAME_KEY = "job.name"; - // YARN - public static final String YARN_PACKAGE_KEY = "yarn.package.path"; - public static final String CONTAINER_MEMORY_KEY = "yarn.container.memory.mb"; - public static final String AM_MEMORY_KEY = "yarn.am.container.memory.mb"; - public static final String CONTAINER_COUNT_KEY = "yarn.container.count"; - // TASK (SAMZA original) - public static final String TASK_CLASS_KEY = "task.class"; - public static final String TASK_INPUTS_KEY = "task.inputs"; - // TASK (extra) - public static final String FILE_KEY = "task.processor.file"; - public static final String FILESYSTEM_KEY = "task.processor.filesystem"; - public static final String ENTRANCE_INPUT_KEY = "task.entrance.input"; - public static final String ENTRANCE_OUTPUT_KEY = "task.entrance.outputs"; - public static final String YARN_CONF_HOME_KEY = "yarn.config.home"; - // KAFKA - public static final String ZOOKEEPER_URI_KEY = "consumer.zookeeper.connect"; - public static final String BROKER_URI_KEY = "producer.metadata.broker.list"; - public static final String KAFKA_BATCHSIZE_KEY = "producer.batch.num.messages"; - public static final String KAFKA_PRODUCER_TYPE_KEY = "producer.producer.type"; - // SERDE - public static final String SERDE_REGISTRATION_KEY = "kryo.register"; - - // Instance variables - private boolean isLocalMode; - private String zookeeper; - private String kafkaBrokerList; - private int replicationFactor; - private int amMemory; - private int containerMemory; - private int piPerContainerRatio; - private int checkpointFrequency; // in ms - - private String jarPath; - private String kryoRegisterFile = null; - - public SamzaConfigFactory() { - this.isLocalMode = false; - this.zookeeper = DEFAULT_ZOOKEEPER; - this.kafkaBrokerList = DEFAULT_BROKER_LIST; - this.checkpointFrequency = 60000; // default: 1 minute - this.replicationFactor = 1; - } - - /* - * Setter methods - */ - public SamzaConfigFactory setYarnPackage(String packagePath) { - this.jarPath = packagePath; - return this; - } - - public SamzaConfigFactory setLocalMode(boolean isLocal) { - this.isLocalMode = isLocal; - return this; - } - - public SamzaConfigFactory setZookeeper(String zk) { - this.zookeeper = zk; - return this; - } - - public SamzaConfigFactory setKafka(String brokerList) { - this.kafkaBrokerList = brokerList; - return this; - } - - public SamzaConfigFactory setCheckpointFrequency(int freq) { - this.checkpointFrequency = freq; - return this; - } - - public SamzaConfigFactory setReplicationFactor(int replicationFactor) { - this.replicationFactor = replicationFactor; - return this; - } - - public SamzaConfigFactory setAMMemory(int mem) { - this.amMemory = mem; - return this; - } - - public SamzaConfigFactory setContainerMemory(int mem) { - this.containerMemory = mem; - return this; - } - - public SamzaConfigFactory setPiPerContainerRatio(int piPerContainer) { - this.piPerContainerRatio = piPerContainer; - return this; - } - - public SamzaConfigFactory setKryoRegisterFile(String kryoRegister) { - this.kryoRegisterFile = kryoRegister; - return this; - } - - /* - * Generate a map of all config properties for the input SamzaProcessingItem - */ - private Map<String,String> getMapForPI(SamzaProcessingItem pi, String filename, String filesystem) throws Exception { - Map<String,String> map = getBasicSystemConfig(); - - // Set job name, task class, task inputs (from SamzaProcessingItem) - setJobName(map, pi.getName()); - setTaskClass(map, SamzaProcessingItem.class.getName()); - - StringBuilder streamNames = new StringBuilder(); - boolean first = true; - for(SamzaSystemStream stream:pi.getInputStreams()) { - if (!first) streamNames.append(COMMA); - streamNames.append(stream.getSystem()+DOT+stream.getStream()); - if (first) first = false; - } - setTaskInputs(map, streamNames.toString()); - - // Processor file - setFileName(map, filename); - setFileSystem(map, filesystem); - - List<String> nameList = new ArrayList<String>(); - // Default kafka system: kafka0: sync producer - // This system is always required: it is used for checkpointing - nameList.add("kafka0"); - setKafkaSystem(map, "kafka0", this.zookeeper, this.kafkaBrokerList, 1); - // Output streams: set kafka systems - for (SamzaStream stream:pi.getOutputStreams()) { - boolean found = false; - for (String name:nameList) { - if (stream.getSystemName().equals(name)) { - found = true; - break; - } - } - if (!found) { - nameList.add(stream.getSystemName()); - setKafkaSystem(map, stream.getSystemName(), this.zookeeper, this.kafkaBrokerList, stream.getBatchSize()); - } - } - // Input streams: set kafka systems - for (SamzaSystemStream stream:pi.getInputStreams()) { - boolean found = false; - for (String name:nameList) { - if (stream.getSystem().equals(name)) { - found = true; - break; - } - } - if (!found) { - nameList.add(stream.getSystem()); - setKafkaSystem(map, stream.getSystem(), this.zookeeper, this.kafkaBrokerList, 1); - } - } - - // Checkpointing - setValue(map,"task.checkpoint.factory","org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory"); - setValue(map,"task.checkpoint.system","kafka0"); - setValue(map,"task.commit.ms","1000"); - setValue(map,"task.checkpoint.replication.factor",Integer.toString(this.replicationFactor)); - - // Number of containers - setNumberOfContainers(map, pi.getParallelism(), this.piPerContainerRatio); - - return map; - } - - /* - * Generate a map of all config properties for the input SamzaProcessingItem - */ - public Map<String,String> getMapForEntrancePI(SamzaEntranceProcessingItem epi, String filename, String filesystem) { - Map<String,String> map = getBasicSystemConfig(); - - // Set job name, task class (from SamzaEntranceProcessingItem) - setJobName(map, epi.getName()); - setTaskClass(map, SamzaEntranceProcessingItem.class.getName()); - - // Input for the entrance task (from our custom consumer) - setTaskInputs(map, SYSTEM_NAME+"."+epi.getName()); - - // Output from entrance task - // Since entrancePI should have only 1 output stream - // there is no need for checking the batch size, setting different system names - // The custom consumer (samoa system) does not suuport reading from a specific index - // => no need for checkpointing - SamzaStream outputStream = (SamzaStream)epi.getOutputStream(); - // Set samoa system factory - setValue(map, "systems."+SYSTEM_NAME+".samza.factory", SamoaSystemFactory.class.getName()); - // Set Kafka system (only if there is an output stream) - if (outputStream != null) - setKafkaSystem(map, outputStream.getSystemName(), this.zookeeper, this.kafkaBrokerList, outputStream.getBatchSize()); - - // Processor file - setFileName(map, filename); - setFileSystem(map, filesystem); - - // Number of containers - setNumberOfContainers(map, 1, this.piPerContainerRatio); - - return map; - } - - /* - * Generate a list of map (of config properties) for all PIs and EPI in - * the input topology - */ - public List<Map<String,String>> getMapsForTopology(SamzaTopology topology) throws Exception { - - List<Map<String,String>> maps = new ArrayList<Map<String,String>>(); - - // File to write serialized objects - String filename = topology.getTopologyName() + ".dat"; - Path dirPath = FileSystems.getDefault().getPath("dat"); - Path filePath= FileSystems.getDefault().getPath(dirPath.toString(), filename); - String dstPath = filePath.toString(); - String resPath; - String filesystem; - if (this.isLocalMode) { - filesystem = SystemsUtils.LOCAL_FS; - File dir = dirPath.toFile(); - if (!dir.exists()) - FileUtils.forceMkdir(dir); - } - else { - filesystem = SystemsUtils.HDFS; - } - - // Correct system name for streams - this.setSystemNameForStreams(topology.getStreams()); - - // Add all PIs to a collection (map) - Map<String,Object> piMap = new HashMap<String,Object>(); - Set<EntranceProcessingItem> entranceProcessingItems = topology.getEntranceProcessingItems(); - Set<IProcessingItem> processingItems = topology.getNonEntranceProcessingItems(); - for(EntranceProcessingItem epi:entranceProcessingItems) { - SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi; - piMap.put(sepi.getName(), sepi); - } - for(IProcessingItem pi:processingItems) { - SamzaProcessingItem spi = (SamzaProcessingItem) pi; - piMap.put(spi.getName(), spi); - } - - // Serialize all PIs - boolean serialized = false; - if (this.isLocalMode) { - serialized = SystemsUtils.serializeObjectToLocalFileSystem(piMap, dstPath); - resPath = dstPath; - } - else { - resPath = SystemsUtils.serializeObjectToHDFS(piMap, dstPath); - serialized = resPath != null; - } - - if (!serialized) { - throw new Exception("Fail serialize map of PIs to file"); - } - - // MapConfig for all PIs - for(EntranceProcessingItem epi:entranceProcessingItems) { - SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi; - maps.add(this.getMapForEntrancePI(sepi, resPath, filesystem)); - } - for(IProcessingItem pi:processingItems) { - SamzaProcessingItem spi = (SamzaProcessingItem) pi; - maps.add(this.getMapForPI(spi, resPath, filesystem)); - } - - return maps; - } - - /** - * Construct a list of MapConfigs for a Topology - * @return the list of MapConfigs - * @throws Exception - */ - public List<MapConfig> getMapConfigsForTopology(SamzaTopology topology) throws Exception { - List<MapConfig> configs = new ArrayList<MapConfig>(); - List<Map<String,String>> maps = this.getMapsForTopology(topology); - for(Map<String,String> map:maps) { - configs.add(new MapConfig(map)); - } - return configs; - } - - /* + public static final String SYSTEM_NAME = "samoa"; + + // DEFAULT VALUES + private static final String DEFAULT_ZOOKEEPER = "localhost:2181"; + private static final String DEFAULT_BROKER_LIST = "localhost:9092"; + + // DELIMINATORS + public static final String COMMA = ","; + public static final String COLON = ":"; + public static final String DOT = "."; + public static final char DOLLAR_SIGN = '$'; + public static final char QUESTION_MARK = '?'; + + // PARTITIONING SCHEMES + public static final String SHUFFLE = "shuffle"; + public static final String KEY = "key"; + public static final String BROADCAST = "broadcast"; + + // PROPERTY KEYS + // JOB + public static final String JOB_FACTORY_CLASS_KEY = "job.factory.class"; + public static final String JOB_NAME_KEY = "job.name"; + // YARN + public static final String YARN_PACKAGE_KEY = "yarn.package.path"; + public static final String CONTAINER_MEMORY_KEY = "yarn.container.memory.mb"; + public static final String AM_MEMORY_KEY = "yarn.am.container.memory.mb"; + public static final String CONTAINER_COUNT_KEY = "yarn.container.count"; + // TASK (SAMZA original) + public static final String TASK_CLASS_KEY = "task.class"; + public static final String TASK_INPUTS_KEY = "task.inputs"; + // TASK (extra) + public static final String FILE_KEY = "task.processor.file"; + public static final String FILESYSTEM_KEY = "task.processor.filesystem"; + public static final String ENTRANCE_INPUT_KEY = "task.entrance.input"; + public static final String ENTRANCE_OUTPUT_KEY = "task.entrance.outputs"; + public static final String YARN_CONF_HOME_KEY = "yarn.config.home"; + // KAFKA + public static final String ZOOKEEPER_URI_KEY = "consumer.zookeeper.connect"; + public static final String BROKER_URI_KEY = "producer.metadata.broker.list"; + public static final String KAFKA_BATCHSIZE_KEY = "producer.batch.num.messages"; + public static final String KAFKA_PRODUCER_TYPE_KEY = "producer.producer.type"; + // SERDE + public static final String SERDE_REGISTRATION_KEY = "kryo.register"; + + // Instance variables + private boolean isLocalMode; + private String zookeeper; + private String kafkaBrokerList; + private int replicationFactor; + private int amMemory; + private int containerMemory; + private int piPerContainerRatio; + private int checkpointFrequency; // in ms + + private String jarPath; + private String kryoRegisterFile = null; + + public SamzaConfigFactory() { + this.isLocalMode = false; + this.zookeeper = DEFAULT_ZOOKEEPER; + this.kafkaBrokerList = DEFAULT_BROKER_LIST; + this.checkpointFrequency = 60000; // default: 1 minute + this.replicationFactor = 1; + } + + /* + * Setter methods + */ + public SamzaConfigFactory setYarnPackage(String packagePath) { + this.jarPath = packagePath; + return this; + } + + public SamzaConfigFactory setLocalMode(boolean isLocal) { + this.isLocalMode = isLocal; + return this; + } + + public SamzaConfigFactory setZookeeper(String zk) { + this.zookeeper = zk; + return this; + } + + public SamzaConfigFactory setKafka(String brokerList) { + this.kafkaBrokerList = brokerList; + return this; + } + + public SamzaConfigFactory setCheckpointFrequency(int freq) { + this.checkpointFrequency = freq; + return this; + } + + public SamzaConfigFactory setReplicationFactor(int replicationFactor) { + this.replicationFactor = replicationFactor; + return this; + } + + public SamzaConfigFactory setAMMemory(int mem) { + this.amMemory = mem; + return this; + } + + public SamzaConfigFactory setContainerMemory(int mem) { + this.containerMemory = mem; + return this; + } + + public SamzaConfigFactory setPiPerContainerRatio(int piPerContainer) { + this.piPerContainerRatio = piPerContainer; + return this; + } + + public SamzaConfigFactory setKryoRegisterFile(String kryoRegister) { + this.kryoRegisterFile = kryoRegister; + return this; + } + + /* + * Generate a map of all config properties for the input SamzaProcessingItem + */ + private Map<String, String> getMapForPI(SamzaProcessingItem pi, String filename, String filesystem) throws Exception { + Map<String, String> map = getBasicSystemConfig(); + + // Set job name, task class, task inputs (from SamzaProcessingItem) + setJobName(map, pi.getName()); + setTaskClass(map, SamzaProcessingItem.class.getName()); + + StringBuilder streamNames = new StringBuilder(); + boolean first = true; + for (SamzaSystemStream stream : pi.getInputStreams()) { + if (!first) + streamNames.append(COMMA); + streamNames.append(stream.getSystem() + DOT + stream.getStream()); + if (first) + first = false; + } + setTaskInputs(map, streamNames.toString()); + + // Processor file + setFileName(map, filename); + setFileSystem(map, filesystem); + + List<String> nameList = new ArrayList<String>(); + // Default kafka system: kafka0: sync producer + // This system is always required: it is used for checkpointing + nameList.add("kafka0"); + setKafkaSystem(map, "kafka0", this.zookeeper, this.kafkaBrokerList, 1); + // Output streams: set kafka systems + for (SamzaStream stream : pi.getOutputStreams()) { + boolean found = false; + for (String name : nameList) { + if (stream.getSystemName().equals(name)) { + found = true; + break; + } + } + if (!found) { + nameList.add(stream.getSystemName()); + setKafkaSystem(map, stream.getSystemName(), this.zookeeper, this.kafkaBrokerList, stream.getBatchSize()); + } + } + // Input streams: set kafka systems + for (SamzaSystemStream stream : pi.getInputStreams()) { + boolean found = false; + for (String name : nameList) { + if (stream.getSystem().equals(name)) { + found = true; + break; + } + } + if (!found) { + nameList.add(stream.getSystem()); + setKafkaSystem(map, stream.getSystem(), this.zookeeper, this.kafkaBrokerList, 1); + } + } + + // Checkpointing + setValue(map, "task.checkpoint.factory", "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory"); + setValue(map, "task.checkpoint.system", "kafka0"); + setValue(map, "task.commit.ms", "1000"); + setValue(map, "task.checkpoint.replication.factor", Integer.toString(this.replicationFactor)); + + // Number of containers + setNumberOfContainers(map, pi.getParallelism(), this.piPerContainerRatio); + + return map; + } + + /* + * Generate a map of all config properties for the input SamzaProcessingItem + */ + public Map<String, String> getMapForEntrancePI(SamzaEntranceProcessingItem epi, String filename, String filesystem) { + Map<String, String> map = getBasicSystemConfig(); + + // Set job name, task class (from SamzaEntranceProcessingItem) + setJobName(map, epi.getName()); + setTaskClass(map, SamzaEntranceProcessingItem.class.getName()); + + // Input for the entrance task (from our custom consumer) + setTaskInputs(map, SYSTEM_NAME + "." + epi.getName()); + + // Output from entrance task + // Since entrancePI should have only 1 output stream + // there is no need for checking the batch size, setting different system + // names + // The custom consumer (samoa system) does not suuport reading from a + // specific index + // => no need for checkpointing + SamzaStream outputStream = (SamzaStream) epi.getOutputStream(); + // Set samoa system factory + setValue(map, "systems." + SYSTEM_NAME + ".samza.factory", SamoaSystemFactory.class.getName()); + // Set Kafka system (only if there is an output stream) + if (outputStream != null) + setKafkaSystem(map, outputStream.getSystemName(), this.zookeeper, this.kafkaBrokerList, + outputStream.getBatchSize()); + + // Processor file + setFileName(map, filename); + setFileSystem(map, filesystem); + + // Number of containers + setNumberOfContainers(map, 1, this.piPerContainerRatio); + + return map; + } + + /* + * Generate a list of map (of config properties) for all PIs and EPI in the + * input topology + */ + public List<Map<String, String>> getMapsForTopology(SamzaTopology topology) throws Exception { + + List<Map<String, String>> maps = new ArrayList<Map<String, String>>(); + + // File to write serialized objects + String filename = topology.getTopologyName() + ".dat"; + Path dirPath = FileSystems.getDefault().getPath("dat"); + Path filePath = FileSystems.getDefault().getPath(dirPath.toString(), filename); + String dstPath = filePath.toString(); + String resPath; + String filesystem; + if (this.isLocalMode) { + filesystem = SystemsUtils.LOCAL_FS; + File dir = dirPath.toFile(); + if (!dir.exists()) + FileUtils.forceMkdir(dir); + } + else { + filesystem = SystemsUtils.HDFS; + } + + // Correct system name for streams + this.setSystemNameForStreams(topology.getStreams()); + + // Add all PIs to a collection (map) + Map<String, Object> piMap = new HashMap<String, Object>(); + Set<EntranceProcessingItem> entranceProcessingItems = topology.getEntranceProcessingItems(); + Set<IProcessingItem> processingItems = topology.getNonEntranceProcessingItems(); + for (EntranceProcessingItem epi : entranceProcessingItems) { + SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi; + piMap.put(sepi.getName(), sepi); + } + for (IProcessingItem pi : processingItems) { + SamzaProcessingItem spi = (SamzaProcessingItem) pi; + piMap.put(spi.getName(), spi); + } + + // Serialize all PIs + boolean serialized = false; + if (this.isLocalMode) { + serialized = SystemsUtils.serializeObjectToLocalFileSystem(piMap, dstPath); + resPath = dstPath; + } + else { + resPath = SystemsUtils.serializeObjectToHDFS(piMap, dstPath); + serialized = resPath != null; + } + + if (!serialized) { + throw new Exception("Fail serialize map of PIs to file"); + } + + // MapConfig for all PIs + for (EntranceProcessingItem epi : entranceProcessingItems) { + SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi; + maps.add(this.getMapForEntrancePI(sepi, resPath, filesystem)); + } + for (IProcessingItem pi : processingItems) { + SamzaProcessingItem spi = (SamzaProcessingItem) pi; + maps.add(this.getMapForPI(spi, resPath, filesystem)); + } + + return maps; + } + + /** + * Construct a list of MapConfigs for a Topology + * + * @return the list of MapConfigs + * @throws Exception + */ + public List<MapConfig> getMapConfigsForTopology(SamzaTopology topology) throws Exception { + List<MapConfig> configs = new ArrayList<MapConfig>(); + List<Map<String, String>> maps = this.getMapsForTopology(topology); + for (Map<String, String> map : maps) { + configs.add(new MapConfig(map)); + } + return configs; + } + + /* * */ - public void setSystemNameForStreams(Set<Stream> streams) { - Map<Integer, String> batchSizeMap = new HashMap<Integer, String>(); - batchSizeMap.put(1, "kafka0"); // default system with sync producer - int counter = 0; - for (Stream stream:streams) { - SamzaStream samzaStream = (SamzaStream) stream; - String systemName = batchSizeMap.get(samzaStream.getBatchSize()); - if (systemName == null) { - counter++; - // Add new system - systemName = "kafka"+Integer.toString(counter); - batchSizeMap.put(samzaStream.getBatchSize(), systemName); - } - samzaStream.setSystemName(systemName); - } - -} - - /* - * Generate a map with common properties for PIs and EPI - */ - private Map<String,String> getBasicSystemConfig() { - Map<String,String> map = new HashMap<String,String>(); - // Job & Task - if (this.isLocalMode) - map.put(JOB_FACTORY_CLASS_KEY, LocalJobFactory.class.getName()); - else { - map.put(JOB_FACTORY_CLASS_KEY, YarnJobFactory.class.getName()); - - // yarn - map.put(YARN_PACKAGE_KEY,jarPath); - map.put(CONTAINER_MEMORY_KEY, Integer.toString(this.containerMemory)); - map.put(AM_MEMORY_KEY, Integer.toString(this.amMemory)); - map.put(CONTAINER_COUNT_KEY, "1"); - map.put(YARN_CONF_HOME_KEY, SystemsUtils.getHadoopConfigHome()); - - // Task opts (Heap size = 0.75 container memory) - int heapSize = (int)(0.75*this.containerMemory); - map.put("task.opts", "-Xmx"+Integer.toString(heapSize)+"M -XX:+PrintGCDateStamps"); - } - - - map.put(JOB_NAME_KEY, ""); - map.put(TASK_CLASS_KEY, ""); - map.put(TASK_INPUTS_KEY, ""); - - // register serializer - map.put("serializers.registry.kryo.class",SamzaKryoSerdeFactory.class.getName()); - - // Serde registration - setKryoRegistration(map, this.kryoRegisterFile); - - return map; - } - - /* - * Helper methods to set different properties in the input map - */ - private static void setJobName(Map<String,String> map, String jobName) { - map.put(JOB_NAME_KEY, jobName); - } - - private static void setFileName(Map<String,String> map, String filename) { - map.put(FILE_KEY, filename); - } - - private static void setFileSystem(Map<String,String> map, String filesystem) { - map.put(FILESYSTEM_KEY, filesystem); - } - - private static void setTaskClass(Map<String,String> map, String taskClass) { - map.put(TASK_CLASS_KEY, taskClass); - } - - private static void setTaskInputs(Map<String,String> map, String inputs) { - map.put(TASK_INPUTS_KEY, inputs); - } - - private static void setKryoRegistration(Map<String, String> map, String kryoRegisterFile) { - if (kryoRegisterFile != null) { - String value = readKryoRegistration(kryoRegisterFile); - map.put(SERDE_REGISTRATION_KEY, value); - } - } - - private static void setNumberOfContainers(Map<String, String> map, int parallelism, int piPerContainer) { - int res = parallelism / piPerContainer; - if (parallelism % piPerContainer != 0) res++; - map.put(CONTAINER_COUNT_KEY, Integer.toString(res)); - } - - private static void setKafkaSystem(Map<String,String> map, String systemName, String zk, String brokers, int batchSize) { - map.put("systems."+systemName+".samza.factory",KafkaSystemFactory.class.getName()); - map.put("systems."+systemName+".samza.msg.serde","kryo"); - - map.put("systems."+systemName+"."+ZOOKEEPER_URI_KEY,zk); - map.put("systems."+systemName+"."+BROKER_URI_KEY,brokers); - map.put("systems."+systemName+"."+KAFKA_BATCHSIZE_KEY,Integer.toString(batchSize)); - - map.put("systems."+systemName+".samza.offset.default","oldest"); - - if (batchSize > 1) { - map.put("systems."+systemName+"."+KAFKA_PRODUCER_TYPE_KEY,"async"); - } - else { - map.put("systems."+systemName+"."+KAFKA_PRODUCER_TYPE_KEY,"sync"); - } - } - - // Set custom properties - private static void setValue(Map<String,String> map, String key, String value) { - map.put(key,value); - } - - /* - * Helper method to parse Kryo registration file - */ - private static String readKryoRegistration(String filePath) { - FileInputStream fis = null; - Properties props = new Properties(); - StringBuilder result = new StringBuilder(); - try { - fis = new FileInputStream(filePath); - props.load(fis); - - boolean first = true; - String value = null; - for(String k:props.stringPropertyNames()) { - if (!first) - result.append(COMMA); - else - first = false; - - // Need to avoid the dollar sign as samza pass all the properties in - // the config to containers via commandline parameters/enviroment variables - // We might escape the dollar sign, but it's more complicated than - // replacing it with something else - result.append(k.trim().replace(DOLLAR_SIGN, QUESTION_MARK)); - value = props.getProperty(k); - if (value != null && value.trim().length() > 0) { - result.append(COLON); - result.append(value.trim().replace(DOLLAR_SIGN, QUESTION_MARK)); - } - } - } catch (FileNotFoundException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } finally { - if (fis != null) - try { - fis.close(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - return result.toString(); - } + public void setSystemNameForStreams(Set<Stream> streams) { + Map<Integer, String> batchSizeMap = new HashMap<Integer, String>(); + batchSizeMap.put(1, "kafka0"); // default system with sync producer + int counter = 0; + for (Stream stream : streams) { + SamzaStream samzaStream = (SamzaStream) stream; + String systemName = batchSizeMap.get(samzaStream.getBatchSize()); + if (systemName == null) { + counter++; + // Add new system + systemName = "kafka" + Integer.toString(counter); + batchSizeMap.put(samzaStream.getBatchSize(), systemName); + } + samzaStream.setSystemName(systemName); + } + + } + + /* + * Generate a map with common properties for PIs and EPI + */ + private Map<String, String> getBasicSystemConfig() { + Map<String, String> map = new HashMap<String, String>(); + // Job & Task + if (this.isLocalMode) + map.put(JOB_FACTORY_CLASS_KEY, LocalJobFactory.class.getName()); + else { + map.put(JOB_FACTORY_CLASS_KEY, YarnJobFactory.class.getName()); + + // yarn + map.put(YARN_PACKAGE_KEY, jarPath); + map.put(CONTAINER_MEMORY_KEY, Integer.toString(this.containerMemory)); + map.put(AM_MEMORY_KEY, Integer.toString(this.amMemory)); + map.put(CONTAINER_COUNT_KEY, "1"); + map.put(YARN_CONF_HOME_KEY, SystemsUtils.getHadoopConfigHome()); + + // Task opts (Heap size = 0.75 container memory) + int heapSize = (int) (0.75 * this.containerMemory); + map.put("task.opts", "-Xmx" + Integer.toString(heapSize) + "M -XX:+PrintGCDateStamps"); + } + + map.put(JOB_NAME_KEY, ""); + map.put(TASK_CLASS_KEY, ""); + map.put(TASK_INPUTS_KEY, ""); + + // register serializer + map.put("serializers.registry.kryo.class", SamzaKryoSerdeFactory.class.getName()); + + // Serde registration + setKryoRegistration(map, this.kryoRegisterFile); + + return map; + } + + /* + * Helper methods to set different properties in the input map + */ + private static void setJobName(Map<String, String> map, String jobName) { + map.put(JOB_NAME_KEY, jobName); + } + + private static void setFileName(Map<String, String> map, String filename) { + map.put(FILE_KEY, filename); + } + + private static void setFileSystem(Map<String, String> map, String filesystem) { + map.put(FILESYSTEM_KEY, filesystem); + } + + private static void setTaskClass(Map<String, String> map, String taskClass) { + map.put(TASK_CLASS_KEY, taskClass); + } + + private static void setTaskInputs(Map<String, String> map, String inputs) { + map.put(TASK_INPUTS_KEY, inputs); + } + + private static void setKryoRegistration(Map<String, String> map, String kryoRegisterFile) { + if (kryoRegisterFile != null) { + String value = readKryoRegistration(kryoRegisterFile); + map.put(SERDE_REGISTRATION_KEY, value); + } + } + + private static void setNumberOfContainers(Map<String, String> map, int parallelism, int piPerContainer) { + int res = parallelism / piPerContainer; + if (parallelism % piPerContainer != 0) + res++; + map.put(CONTAINER_COUNT_KEY, Integer.toString(res)); + } + + private static void setKafkaSystem(Map<String, String> map, String systemName, String zk, String brokers, + int batchSize) { + map.put("systems." + systemName + ".samza.factory", KafkaSystemFactory.class.getName()); + map.put("systems." + systemName + ".samza.msg.serde", "kryo"); + + map.put("systems." + systemName + "." + ZOOKEEPER_URI_KEY, zk); + map.put("systems." + systemName + "." + BROKER_URI_KEY, brokers); + map.put("systems." + systemName + "." + KAFKA_BATCHSIZE_KEY, Integer.toString(batchSize)); + + map.put("systems." + systemName + ".samza.offset.default", "oldest"); + + if (batchSize > 1) { + map.put("systems." + systemName + "." + KAFKA_PRODUCER_TYPE_KEY, "async"); + } + else { + map.put("systems." + systemName + "." + KAFKA_PRODUCER_TYPE_KEY, "sync"); + } + } + + // Set custom properties + private static void setValue(Map<String, String> map, String key, String value) { + map.put(key, value); + } + + /* + * Helper method to parse Kryo registration file + */ + private static String readKryoRegistration(String filePath) { + FileInputStream fis = null; + Properties props = new Properties(); + StringBuilder result = new StringBuilder(); + try { + fis = new FileInputStream(filePath); + props.load(fis); + + boolean first = true; + String value = null; + for (String k : props.stringPropertyNames()) { + if (!first) + result.append(COMMA); + else + first = false; + + // Need to avoid the dollar sign as samza pass all the properties in + // the config to containers via commandline parameters/enviroment + // variables + // We might escape the dollar sign, but it's more complicated than + // replacing it with something else + result.append(k.trim().replace(DOLLAR_SIGN, QUESTION_MARK)); + value = props.getProperty(k); + if (value != null && value.trim().length() > 0) { + result.append(COLON); + result.append(value.trim().replace(DOLLAR_SIGN, QUESTION_MARK)); + } + } + } catch (FileNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally { + if (fis != null) + try { + fis.close(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + return result.toString(); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java index 8e9e446..cd4b846 100644 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java @@ -34,122 +34,126 @@ import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; /** - * Implementation of Samza's SerdeFactory - * that uses Kryo to serialize/deserialize objects + * Implementation of Samza's SerdeFactory that uses Kryo to + * serialize/deserialize objects * * @author Anh Thu Vu * @param <T> - * + * */ public class SamzaKryoSerdeFactory<T> implements SerdeFactory<T> { - - private static final Logger logger = LoggerFactory.getLogger(SamzaKryoSerdeFactory.class); - - public static class SamzaKryoSerde<V> implements Serde<V> { - private Kryo kryo; - - public SamzaKryoSerde (String registrationInfo) { - this.kryo = new Kryo(); - this.register(registrationInfo); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private void register(String registrationInfo) { - if (registrationInfo == null) return; - - String[] infoList = registrationInfo.split(SamzaConfigFactory.COMMA); - - Class targetClass = null; - Class serializerClass = null; - Serializer serializer = null; - - for (String info:infoList) { - String[] fields = info.split(SamzaConfigFactory.COLON); - - targetClass = null; - serializerClass = null; - if (fields.length > 0) { - try { - targetClass = Class.forName(fields[0].replace(SamzaConfigFactory.QUESTION_MARK, SamzaConfigFactory.DOLLAR_SIGN)); - } catch (ClassNotFoundException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - if (fields.length > 1) { - try { - serializerClass = Class.forName(fields[1].replace(SamzaConfigFactory.QUESTION_MARK, SamzaConfigFactory.DOLLAR_SIGN)); - } catch (ClassNotFoundException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - if (targetClass != null) { - if (serializerClass == null) { - kryo.register(targetClass); - } - else { - serializer = resolveSerializerInstance(kryo, targetClass, (Class<? extends Serializer>)serializerClass) ; - kryo.register(targetClass, serializer); - } - } - else { - logger.info("Invalid registration info:{}",info); - } - } - } - - @SuppressWarnings("rawtypes") - private static Serializer resolveSerializerInstance(Kryo k, Class superClass, Class<? extends Serializer> serializerClass) { - try { - try { - return serializerClass.getConstructor(Kryo.class, Class.class).newInstance(k, superClass); - } catch (Exception ex1) { - try { - return serializerClass.getConstructor(Kryo.class).newInstance(k); - } catch (Exception ex2) { - try { - return serializerClass.getConstructor(Class.class).newInstance(superClass); - } catch (Exception ex3) { - return serializerClass.newInstance(); - } - } - } - } catch (Exception ex) { - throw new IllegalArgumentException("Unable to create serializer \"" - + serializerClass.getName() - + "\" for class: " - + superClass.getName(), ex); - } - } - - /* - * Implement Samza Serde interface - */ - @Override - public byte[] toBytes(V obj) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - Output output = new Output(bos); - kryo.writeClassAndObject(output, obj); - output.flush(); - output.close(); - return bos.toByteArray(); - } - - @SuppressWarnings("unchecked") - @Override - public V fromBytes(byte[] byteArr) { - Input input = new Input(byteArr); - Object obj = kryo.readClassAndObject(input); - input.close(); - return (V)obj; - } - - } - - @Override - public Serde<T> getSerde(String name, Config config) { - return new SamzaKryoSerde<T>(config.get(SamzaConfigFactory.SERDE_REGISTRATION_KEY)); - } + + private static final Logger logger = LoggerFactory.getLogger(SamzaKryoSerdeFactory.class); + + public static class SamzaKryoSerde<V> implements Serde<V> { + private Kryo kryo; + + public SamzaKryoSerde(String registrationInfo) { + this.kryo = new Kryo(); + this.register(registrationInfo); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void register(String registrationInfo) { + if (registrationInfo == null) + return; + + String[] infoList = registrationInfo.split(SamzaConfigFactory.COMMA); + + Class targetClass = null; + Class serializerClass = null; + Serializer serializer = null; + + for (String info : infoList) { + String[] fields = info.split(SamzaConfigFactory.COLON); + + targetClass = null; + serializerClass = null; + if (fields.length > 0) { + try { + targetClass = Class.forName(fields[0].replace(SamzaConfigFactory.QUESTION_MARK, + SamzaConfigFactory.DOLLAR_SIGN)); + } catch (ClassNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + if (fields.length > 1) { + try { + serializerClass = Class.forName(fields[1].replace(SamzaConfigFactory.QUESTION_MARK, + SamzaConfigFactory.DOLLAR_SIGN)); + } catch (ClassNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + if (targetClass != null) { + if (serializerClass == null) { + kryo.register(targetClass); + } + else { + serializer = resolveSerializerInstance(kryo, targetClass, (Class<? extends Serializer>) serializerClass); + kryo.register(targetClass, serializer); + } + } + else { + logger.info("Invalid registration info:{}", info); + } + } + } + + @SuppressWarnings("rawtypes") + private static Serializer resolveSerializerInstance(Kryo k, Class superClass, + Class<? extends Serializer> serializerClass) { + try { + try { + return serializerClass.getConstructor(Kryo.class, Class.class).newInstance(k, superClass); + } catch (Exception ex1) { + try { + return serializerClass.getConstructor(Kryo.class).newInstance(k); + } catch (Exception ex2) { + try { + return serializerClass.getConstructor(Class.class).newInstance(superClass); + } catch (Exception ex3) { + return serializerClass.newInstance(); + } + } + } + } catch (Exception ex) { + throw new IllegalArgumentException("Unable to create serializer \"" + + serializerClass.getName() + + "\" for class: " + + superClass.getName(), ex); + } + } + + /* + * Implement Samza Serde interface + */ + @Override + public byte[] toBytes(V obj) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Output output = new Output(bos); + kryo.writeClassAndObject(output, obj); + output.flush(); + output.close(); + return bos.toByteArray(); + } + + @SuppressWarnings("unchecked") + @Override + public V fromBytes(byte[] byteArr) { + Input input = new Input(byteArr); + Object obj = kryo.readClassAndObject(input); + input.close(); + return (V) obj; + } + + } + + @Override + public Serde<T> getSerde(String name, Config config) { + return new SamzaKryoSerde<T>(config.get(SamzaConfigFactory.SERDE_REGISTRATION_KEY)); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java index 4bdbafd..62fa3fd 100644 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java @@ -37,34 +37,34 @@ import com.esotericsoftware.kryo.io.Output; * @author Anh Thu Vu */ public class SerializableSerializer extends Serializer<Object> { - @Override - public void write(Kryo kryo, Output output, Object object) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - try { - ObjectOutputStream oos = new ObjectOutputStream(bos); - oos.writeObject(object); - oos.flush(); - } catch(IOException e) { - throw new RuntimeException(e); - } - byte[] ser = bos.toByteArray(); - output.writeInt(ser.length); - output.writeBytes(ser); + @Override + public void write(Kryo kryo, Output output, Object object) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(object); + oos.flush(); + } catch (IOException e) { + throw new RuntimeException(e); } - - @SuppressWarnings("rawtypes") - @Override - public Object read(Kryo kryo, Input input, Class c) { - int len = input.readInt(); - byte[] ser = new byte[len]; - input.readBytes(ser); - ByteArrayInputStream bis = new ByteArrayInputStream(ser); - try { - ObjectInputStream ois = new ObjectInputStream(bis); - return ois.readObject(); - } catch(Exception e) { - throw new RuntimeException(e); - } + byte[] ser = bos.toByteArray(); + output.writeInt(ser.length); + output.writeBytes(ser); + } + + @SuppressWarnings("rawtypes") + @Override + public Object read(Kryo kryo, Input input, Class c) { + int len = input.readInt(); + byte[] ser = new byte[len]; + input.readBytes(ser); + ByteArrayInputStream bis = new ByteArrayInputStream(ser); + try { + ObjectInputStream ois = new ObjectInputStream(bis); + return ois.readObject(); + } catch (Exception e) { + throw new RuntimeException(e); } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java index 367f9f9..f8e6dcd 100644 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java +++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java @@ -45,339 +45,342 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Utilities methods for: - * - Kafka - * - HDFS - * - Handling files on local FS + * Utilities methods for: - Kafka - HDFS - Handling files on local FS * * @author Anh Thu Vu */ public class SystemsUtils { - private static final Logger logger = LoggerFactory.getLogger(SystemsUtils.class); - - public static final String HDFS = "hdfs"; - public static final String LOCAL_FS = "local"; - - private static final String TEMP_FILE = "samoaTemp"; - private static final String TEMP_FILE_SUFFIX = ".dat"; - - /* - * Kafka - */ - private static class KafkaUtils { - private static ZkClient zkClient; - - static void setZookeeper(String zk) { - zkClient = new ZkClient(zk, 30000, 30000, new ZKStringSerializerWrapper()); - } - - /* - * Create Kafka topic/stream - */ - static void createKafkaTopic(String name, int partitions, int replicas) { - AdminUtils.createTopic(zkClient, name, partitions, replicas, new Properties()); - } - - static class ZKStringSerializerWrapper implements ZkSerializer { - @Override - public Object deserialize(byte[] byteArray) throws ZkMarshallingError { - return ZKStringSerializer.deserialize(byteArray); - } - - @Override - public byte[] serialize(Object obj) throws ZkMarshallingError { - return ZKStringSerializer.serialize(obj); - } - } - } - - /* - * HDFS - */ - private static class HDFSUtils { - private static String coreConfPath; - private static String hdfsConfPath; - private static String configHomePath; - private static String samoaDir = null; - - static void setHadoopConfigHome(String hadoopConfPath) { - logger.info("Hadoop config home:{}",hadoopConfPath); - configHomePath = hadoopConfPath; - java.nio.file.Path coreSitePath = FileSystems.getDefault().getPath(hadoopConfPath, "core-site.xml"); - java.nio.file.Path hdfsSitePath = FileSystems.getDefault().getPath(hadoopConfPath, "hdfs-site.xml"); - coreConfPath = coreSitePath.toAbsolutePath().toString(); - hdfsConfPath = hdfsSitePath.toAbsolutePath().toString(); - } - - static String getNameNodeUri() { - Configuration config = new Configuration(); - config.addResource(new Path(coreConfPath)); - config.addResource(new Path(hdfsConfPath)); - - return config.get("fs.defaultFS"); - } - - static String getHadoopConfigHome() { - return configHomePath; - } - - static void setSAMOADir(String dir) { - if (dir != null) - samoaDir = getNameNodeUri()+dir; - else - samoaDir = null; - } - - static String getDefaultSAMOADir() throws IOException { - Configuration config = new Configuration(); - config.addResource(new Path(coreConfPath)); - config.addResource(new Path(hdfsConfPath)); - - FileSystem fs = FileSystem.get(config); - Path defaultDir = new Path(fs.getHomeDirectory(),".samoa"); - return defaultDir.toString(); - } - - static boolean deleteFileIfExist(String absPath) { - Path p = new Path(absPath); - return deleteFileIfExist(p); - } - - static boolean deleteFileIfExist(Path p) { - Configuration config = new Configuration(); - config.addResource(new Path(coreConfPath)); - config.addResource(new Path(hdfsConfPath)); - - FileSystem fs; - try { - fs = FileSystem.get(config); - if (fs.exists(p)) { - return fs.delete(p, false); - } - else - return true; - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return false; - } - - /* - * Write to HDFS - */ - static String writeToHDFS(File file, String dstPath) { - Configuration config = new Configuration(); - config.addResource(new Path(coreConfPath)); - config.addResource(new Path(hdfsConfPath)); - logger.info("Filesystem name:{}",config.get("fs.defaultFS")); - - // Default samoaDir - if (samoaDir == null) { - try { - samoaDir = getDefaultSAMOADir(); - } - catch (IOException e) { - e.printStackTrace(); - return null; - } - } - - // Setup src and dst paths - //java.nio.file.Path tempPath = FileSystems.getDefault().getPath(samoaDir, dstPath); - Path dst = new Path(samoaDir,dstPath); - Path src = new Path(file.getAbsolutePath()); - - // Delete file if already exists in HDFS - if (deleteFileIfExist(dst) == false) - return null; - - // Copy to HDFS - FileSystem fs; - try { - fs = FileSystem.get(config); - fs.copyFromLocalFile(src, dst); - } catch (IOException e) { - e.printStackTrace(); - return null; - } - - return dst.toString(); // abs path to file - } - - /* - * Read from HDFS - */ - static Object deserializeObjectFromFile(String filePath) { - logger.info("Deserialize HDFS file:{}",filePath); - Configuration config = new Configuration(); - config.addResource(new Path(coreConfPath)); - config.addResource(new Path(hdfsConfPath)); - - Path file = new Path(filePath); - FSDataInputStream dataInputStream = null; - ObjectInputStream ois = null; - Object obj = null; - FileSystem fs; - try { - fs = FileSystem.get(config); - dataInputStream = fs.open(file); - ois = new ObjectInputStream(dataInputStream); - obj = ois.readObject(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (ClassNotFoundException e) { - try { - if (dataInputStream != null) dataInputStream.close(); - if (ois != null) ois.close(); - } catch (IOException ioException) { - // TODO auto-generated catch block - e.printStackTrace(); - } - } - - return obj; - } - - } - - private static class LocalFileSystemUtils { - static boolean serializObjectToFile(Object obj, String fn) { - FileOutputStream fos = null; - ObjectOutputStream oos = null; - try { - fos = new FileOutputStream(fn); - oos = new ObjectOutputStream(fos); - oos.writeObject(obj); - } catch (FileNotFoundException e) { - e.printStackTrace(); - return false; - } catch (IOException e) { - e.printStackTrace(); - return false; - } finally { - try { - if (fos != null) fos.close(); - if (oos != null) oos.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - return true; - } - - static Object deserializeObjectFromLocalFile(String filename) { - logger.info("Deserialize local file:{}",filename); - FileInputStream fis = null; - ObjectInputStream ois = null; - Object obj = null; - try { - fis = new FileInputStream(filename); - ois = new ObjectInputStream(fis); - obj = ois.readObject(); - } catch (IOException e) { - // TODO auto-generated catch block - e.printStackTrace(); - } catch (ClassNotFoundException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } finally { - try { - if (fis != null) fis.close(); - if (ois != null) ois.close(); - } catch (IOException e) { - // TODO auto-generated catch block - e.printStackTrace(); - } - } - - return obj; - } - } - - - - /* - * Create streams - */ - public static void createKafkaTopic(String name, int partitions) { - createKafkaTopic(name, partitions, 1); - } - - public static void createKafkaTopic(String name, int partitions, int replicas) { - KafkaUtils.createKafkaTopic(name, partitions, replicas); - } - - /* - * Serialize object - */ - public static boolean serializeObjectToLocalFileSystem(Object object, String path) { - return LocalFileSystemUtils.serializObjectToFile(object, path); - } - - public static String serializeObjectToHDFS(Object object, String path) { - File tmpDatFile; - try { - tmpDatFile = File.createTempFile(TEMP_FILE, TEMP_FILE_SUFFIX); - if (serializeObjectToLocalFileSystem(object, tmpDatFile.getAbsolutePath())) { - return HDFSUtils.writeToHDFS(tmpDatFile, path); - } - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return null; - } - - /* - * Deserialize object - */ - @SuppressWarnings("unchecked") - public static Map<String,Object> deserializeMapFromFile(String filesystem, String filename) { - Map<String,Object> map; - if (filesystem.equals(HDFS)) { - map = (Map<String,Object>) HDFSUtils.deserializeObjectFromFile(filename); - } - else { - map = (Map<String,Object>) LocalFileSystemUtils.deserializeObjectFromLocalFile(filename); - } - return map; - } - - public static Object deserializeObjectFromFileAndKey(String filesystem, String filename, String key) { - Map<String,Object> map = deserializeMapFromFile(filesystem, filename); - if (map == null) return null; - return map.get(key); - } - - /* - * Setup - */ - public static void setZookeeper(String zookeeper) { - KafkaUtils.setZookeeper(zookeeper); - } - - public static void setHadoopConfigHome(String hadoopHome) { - HDFSUtils.setHadoopConfigHome(hadoopHome); - } - - public static void setSAMOADir(String samoaDir) { - HDFSUtils.setSAMOADir(samoaDir); - } - - /* - * Others - */ - public static String getHDFSNameNodeUri() { - return HDFSUtils.getNameNodeUri(); - } - public static String getHadoopConfigHome() { - return HDFSUtils.getHadoopConfigHome(); - } - - public static String copyToHDFS(File file, String dstPath) { - return HDFSUtils.writeToHDFS(file, dstPath); - } + private static final Logger logger = LoggerFactory.getLogger(SystemsUtils.class); + + public static final String HDFS = "hdfs"; + public static final String LOCAL_FS = "local"; + + private static final String TEMP_FILE = "samoaTemp"; + private static final String TEMP_FILE_SUFFIX = ".dat"; + + /* + * Kafka + */ + private static class KafkaUtils { + private static ZkClient zkClient; + + static void setZookeeper(String zk) { + zkClient = new ZkClient(zk, 30000, 30000, new ZKStringSerializerWrapper()); + } + + /* + * Create Kafka topic/stream + */ + static void createKafkaTopic(String name, int partitions, int replicas) { + AdminUtils.createTopic(zkClient, name, partitions, replicas, new Properties()); + } + + static class ZKStringSerializerWrapper implements ZkSerializer { + @Override + public Object deserialize(byte[] byteArray) throws ZkMarshallingError { + return ZKStringSerializer.deserialize(byteArray); + } + + @Override + public byte[] serialize(Object obj) throws ZkMarshallingError { + return ZKStringSerializer.serialize(obj); + } + } + } + + /* + * HDFS + */ + private static class HDFSUtils { + private static String coreConfPath; + private static String hdfsConfPath; + private static String configHomePath; + private static String samoaDir = null; + + static void setHadoopConfigHome(String hadoopConfPath) { + logger.info("Hadoop config home:{}", hadoopConfPath); + configHomePath = hadoopConfPath; + java.nio.file.Path coreSitePath = FileSystems.getDefault().getPath(hadoopConfPath, "core-site.xml"); + java.nio.file.Path hdfsSitePath = FileSystems.getDefault().getPath(hadoopConfPath, "hdfs-site.xml"); + coreConfPath = coreSitePath.toAbsolutePath().toString(); + hdfsConfPath = hdfsSitePath.toAbsolutePath().toString(); + } + + static String getNameNodeUri() { + Configuration config = new Configuration(); + config.addResource(new Path(coreConfPath)); + config.addResource(new Path(hdfsConfPath)); + + return config.get("fs.defaultFS"); + } + + static String getHadoopConfigHome() { + return configHomePath; + } + + static void setSAMOADir(String dir) { + if (dir != null) + samoaDir = getNameNodeUri() + dir; + else + samoaDir = null; + } + + static String getDefaultSAMOADir() throws IOException { + Configuration config = new Configuration(); + config.addResource(new Path(coreConfPath)); + config.addResource(new Path(hdfsConfPath)); + + FileSystem fs = FileSystem.get(config); + Path defaultDir = new Path(fs.getHomeDirectory(), ".samoa"); + return defaultDir.toString(); + } + + static boolean deleteFileIfExist(String absPath) { + Path p = new Path(absPath); + return deleteFileIfExist(p); + } + + static boolean deleteFileIfExist(Path p) { + Configuration config = new Configuration(); + config.addResource(new Path(coreConfPath)); + config.addResource(new Path(hdfsConfPath)); + + FileSystem fs; + try { + fs = FileSystem.get(config); + if (fs.exists(p)) { + return fs.delete(p, false); + } + else + return true; + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return false; + } + + /* + * Write to HDFS + */ + static String writeToHDFS(File file, String dstPath) { + Configuration config = new Configuration(); + config.addResource(new Path(coreConfPath)); + config.addResource(new Path(hdfsConfPath)); + logger.info("Filesystem name:{}", config.get("fs.defaultFS")); + + // Default samoaDir + if (samoaDir == null) { + try { + samoaDir = getDefaultSAMOADir(); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + } + + // Setup src and dst paths + // java.nio.file.Path tempPath = + // FileSystems.getDefault().getPath(samoaDir, dstPath); + Path dst = new Path(samoaDir, dstPath); + Path src = new Path(file.getAbsolutePath()); + + // Delete file if already exists in HDFS + if (deleteFileIfExist(dst) == false) + return null; + + // Copy to HDFS + FileSystem fs; + try { + fs = FileSystem.get(config); + fs.copyFromLocalFile(src, dst); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + + return dst.toString(); // abs path to file + } + + /* + * Read from HDFS + */ + static Object deserializeObjectFromFile(String filePath) { + logger.info("Deserialize HDFS file:{}", filePath); + Configuration config = new Configuration(); + config.addResource(new Path(coreConfPath)); + config.addResource(new Path(hdfsConfPath)); + + Path file = new Path(filePath); + FSDataInputStream dataInputStream = null; + ObjectInputStream ois = null; + Object obj = null; + FileSystem fs; + try { + fs = FileSystem.get(config); + dataInputStream = fs.open(file); + ois = new ObjectInputStream(dataInputStream); + obj = ois.readObject(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (ClassNotFoundException e) { + try { + if (dataInputStream != null) + dataInputStream.close(); + if (ois != null) + ois.close(); + } catch (IOException ioException) { + // TODO auto-generated catch block + e.printStackTrace(); + } + } + + return obj; + } + + } + + private static class LocalFileSystemUtils { + static boolean serializObjectToFile(Object obj, String fn) { + FileOutputStream fos = null; + ObjectOutputStream oos = null; + try { + fos = new FileOutputStream(fn); + oos = new ObjectOutputStream(fos); + oos.writeObject(obj); + } catch (FileNotFoundException e) { + e.printStackTrace(); + return false; + } catch (IOException e) { + e.printStackTrace(); + return false; + } finally { + try { + if (fos != null) + fos.close(); + if (oos != null) + oos.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + return true; + } + + static Object deserializeObjectFromLocalFile(String filename) { + logger.info("Deserialize local file:{}", filename); + FileInputStream fis = null; + ObjectInputStream ois = null; + Object obj = null; + try { + fis = new FileInputStream(filename); + ois = new ObjectInputStream(fis); + obj = ois.readObject(); + } catch (IOException e) { + // TODO auto-generated catch block + e.printStackTrace(); + } catch (ClassNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally { + try { + if (fis != null) + fis.close(); + if (ois != null) + ois.close(); + } catch (IOException e) { + // TODO auto-generated catch block + e.printStackTrace(); + } + } + + return obj; + } + } + + /* + * Create streams + */ + public static void createKafkaTopic(String name, int partitions) { + createKafkaTopic(name, partitions, 1); + } + + public static void createKafkaTopic(String name, int partitions, int replicas) { + KafkaUtils.createKafkaTopic(name, partitions, replicas); + } + + /* + * Serialize object + */ + public static boolean serializeObjectToLocalFileSystem(Object object, String path) { + return LocalFileSystemUtils.serializObjectToFile(object, path); + } + + public static String serializeObjectToHDFS(Object object, String path) { + File tmpDatFile; + try { + tmpDatFile = File.createTempFile(TEMP_FILE, TEMP_FILE_SUFFIX); + if (serializeObjectToLocalFileSystem(object, tmpDatFile.getAbsolutePath())) { + return HDFSUtils.writeToHDFS(tmpDatFile, path); + } + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return null; + } + + /* + * Deserialize object + */ + @SuppressWarnings("unchecked") + public static Map<String, Object> deserializeMapFromFile(String filesystem, String filename) { + Map<String, Object> map; + if (filesystem.equals(HDFS)) { + map = (Map<String, Object>) HDFSUtils.deserializeObjectFromFile(filename); + } + else { + map = (Map<String, Object>) LocalFileSystemUtils.deserializeObjectFromLocalFile(filename); + } + return map; + } + + public static Object deserializeObjectFromFileAndKey(String filesystem, String filename, String key) { + Map<String, Object> map = deserializeMapFromFile(filesystem, filename); + if (map == null) + return null; + return map.get(key); + } + + /* + * Setup + */ + public static void setZookeeper(String zookeeper) { + KafkaUtils.setZookeeper(zookeeper); + } + + public static void setHadoopConfigHome(String hadoopHome) { + HDFSUtils.setHadoopConfigHome(hadoopHome); + } + + public static void setSAMOADir(String samoaDir) { + HDFSUtils.setSAMOADir(samoaDir); + } + + /* + * Others + */ + public static String getHDFSNameNodeUri() { + return HDFSUtils.getNameNodeUri(); + } + + public static String getHadoopConfigHome() { + return HDFSUtils.getHadoopConfigHome(); + } + + public static String copyToHDFS(File file, String dstPath) { + return HDFSUtils.writeToHDFS(file, dstPath); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java index 54792ae..d6ea26e 100644 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java @@ -34,45 +34,46 @@ import com.yahoo.labs.samoa.topology.impl.StormTopology; /** * The main class to execute a SAMOA task in LOCAL mode in Storm. - * + * * @author Arinto Murdopo - * + * */ public class LocalStormDoTask { - private static final Logger logger = LoggerFactory.getLogger(LocalStormDoTask.class); + private static final Logger logger = LoggerFactory.getLogger(LocalStormDoTask.class); - /** - * The main method. - * - * @param args the arguments - */ - public static void main(String[] args) { + /** + * The main method. + * + * @param args + * the arguments + */ + public static void main(String[] args) { - List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); - int numWorker = StormSamoaUtils.numWorkers(tmpArgs); + int numWorker = StormSamoaUtils.numWorkers(tmpArgs); - args = tmpArgs.toArray(new String[0]); + args = tmpArgs.toArray(new String[0]); - //convert the arguments into Storm topology - StormTopology stormTopo = StormSamoaUtils.argsToTopology(args); - String topologyName = stormTopo.getTopologyName(); + // convert the arguments into Storm topology + StormTopology stormTopo = StormSamoaUtils.argsToTopology(args); + String topologyName = stormTopo.getTopologyName(); - Config conf = new Config(); - //conf.putAll(Utils.readStormConfig()); - conf.setDebug(false); + Config conf = new Config(); + // conf.putAll(Utils.readStormConfig()); + conf.setDebug(false); - //local mode - conf.setMaxTaskParallelism(numWorker); + // local mode + conf.setMaxTaskParallelism(numWorker); - backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster(); - cluster.submitTopology(topologyName, conf, stormTopo.getStormBuilder().createTopology()); + backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster(); + cluster.submitTopology(topologyName, conf, stormTopo.getStormBuilder().createTopology()); - backtype.storm.utils.Utils.sleep(600 * 1000); + backtype.storm.utils.Utils.sleep(600 * 1000); - cluster.killTopology(topologyName); - cluster.shutdown(); + cluster.killTopology(topologyName); + cluster.shutdown(); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java index ad9794d..84a9336 100644 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java @@ -26,40 +26,41 @@ import com.yahoo.labs.samoa.core.ContentEvent; /** * Storm Stream that connects into Bolt. It wraps Storm's outputCollector class + * * @author Arinto Murdopo - * + * */ -class StormBoltStream extends StormStream{ - - /** +class StormBoltStream extends StormStream { + + /** * */ - private static final long serialVersionUID = -5712513402991550847L; - - private OutputCollector outputCollector; + private static final long serialVersionUID = -5712513402991550847L; + + private OutputCollector outputCollector; + + StormBoltStream(String stormComponentId) { + super(stormComponentId); + } - StormBoltStream(String stormComponentId){ - super(stormComponentId); - } + @Override + public void put(ContentEvent contentEvent) { + outputCollector.emit(this.outputStreamId, new Values(contentEvent, contentEvent.getKey())); + } - @Override - public void put(ContentEvent contentEvent) { - outputCollector.emit(this.outputStreamId, new Values(contentEvent, contentEvent.getKey())); - } - - public void setCollector(OutputCollector outputCollector){ - this.outputCollector = outputCollector; - } + public void setCollector(OutputCollector outputCollector) { + this.outputCollector = outputCollector; + } -// @Override -// public void setStreamId(String streamId) { -// // TODO Auto-generated method stub -// //this.outputStreamId = streamId; -// } + // @Override + // public void setStreamId(String streamId) { + // // TODO Auto-generated method stub + // //this.outputStreamId = streamId; + // } - @Override - public String getStreamId() { - // TODO Auto-generated method stub - return null; - } + @Override + public String getStreamId() { + // TODO Auto-generated method stub + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java index 347fd50..9a2bc65 100644 --- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java @@ -37,54 +37,54 @@ import com.yahoo.labs.samoa.topology.Topology; */ public final class StormComponentFactory implements ComponentFactory { - private final Map<String, Integer> processorList; - - public StormComponentFactory() { - processorList = new HashMap<>(); - } - - @Override - public ProcessingItem createPi(Processor processor) { - return new StormProcessingItem(processor, this.getComponentName(processor.getClass()), 1); + private final Map<String, Integer> processorList; + + public StormComponentFactory() { + processorList = new HashMap<>(); + } + + @Override + public ProcessingItem createPi(Processor processor) { + return new StormProcessingItem(processor, this.getComponentName(processor.getClass()), 1); + } + + @Override + public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { + return new StormEntranceProcessingItem(processor, this.getComponentName(processor.getClass())); + } + + @Override + public Stream createStream(IProcessingItem sourcePi) { + StormTopologyNode stormCompatiblePi = (StormTopologyNode) sourcePi; + return stormCompatiblePi.createStream(); + } + + @Override + public Topology createTopology(String topoName) { + return new StormTopology(topoName); + } + + private String getComponentName(Class<? extends Processor> clazz) { + StringBuilder componentName = new StringBuilder(clazz.getCanonicalName()); + String key = componentName.toString(); + Integer index; + + if (!processorList.containsKey(key)) { + index = 1; + } else { + index = processorList.get(key) + 1; } - @Override - public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { - return new StormEntranceProcessingItem(processor, this.getComponentName(processor.getClass())); - } + processorList.put(key, index); - @Override - public Stream createStream(IProcessingItem sourcePi) { - StormTopologyNode stormCompatiblePi = (StormTopologyNode) sourcePi; - return stormCompatiblePi.createStream(); - } - - @Override - public Topology createTopology(String topoName) { - return new StormTopology(topoName); - } + componentName.append('_'); + componentName.append(index); - private String getComponentName(Class<? extends Processor> clazz) { - StringBuilder componentName = new StringBuilder(clazz.getCanonicalName()); - String key = componentName.toString(); - Integer index; + return componentName.toString(); + } - if (!processorList.containsKey(key)) { - index = 1; - } else { - index = processorList.get(key) + 1; - } - - processorList.put(key, index); - - componentName.append('_'); - componentName.append(index); - - return componentName.toString(); - } - - @Override - public ProcessingItem createPi(Processor processor, int parallelism) { - return new StormProcessingItem(processor, this.getComponentName(processor.getClass()), parallelism); - } + @Override + public ProcessingItem createPi(Processor processor, int parallelism) { + return new StormProcessingItem(processor, this.getComponentName(processor.getClass()), parallelism); + } }
