http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java deleted file mode 100644 index 4b9b7ce..0000000 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java +++ /dev/null @@ -1,195 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.List; -import java.util.Set; - -import org.apache.samza.config.MapConfig; -import org.apache.samza.job.JobRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.yahoo.labs.samoa.topology.Stream; -import com.yahoo.labs.samoa.topology.Topology; -import com.yahoo.labs.samoa.topology.impl.SamzaStream.SamzaSystemStream; -import com.yahoo.labs.samoa.utils.SamzaConfigFactory; -import com.yahoo.labs.samoa.utils.SystemsUtils; - -/** - * This class will submit a list of Samza jobs with the Configs generated from the input topology - * - * @author Anh Thu Vu - * - */ -public class SamzaEngine { - - private static final Logger logger = LoggerFactory.getLogger(SamzaEngine.class); - - /* - * Singleton instance - */ - private static SamzaEngine engine = new SamzaEngine(); - - private String zookeeper; - private String kafka; - private int kafkaReplicationFactor; - private boolean isLocalMode; - private String yarnPackagePath; - private String yarnConfHome; - - private String kryoRegisterFile; - - private int amMem; - private int containerMem; - private int piPerContainerRatio; - - private int checkpointFrequency; - - private void _submitTopology(SamzaTopology topology) { - - // Setup SamzaConfigFactory - SamzaConfigFactory configFactory = new SamzaConfigFactory(); - configFactory.setLocalMode(isLocalMode) - .setZookeeper(zookeeper) - .setKafka(kafka) - .setYarnPackage(yarnPackagePath) - .setAMMemory(amMem) - .setContainerMemory(containerMem) - .setPiPerContainerRatio(piPerContainerRatio) - .setKryoRegisterFile(kryoRegisterFile) - .setCheckpointFrequency(checkpointFrequency) - .setReplicationFactor(kafkaReplicationFactor); - - // Generate the list of Configs - List<MapConfig> configs; - try { - // ConfigFactory generate a list of configs - // Serialize a map of PIs and store in a file in the jar at jarFilePath - // (in dat/ folder) - configs = configFactory.getMapConfigsForTopology(topology); - } catch (Exception e) { - e.printStackTrace(); - return; - } - - // Create kafka streams - Set<Stream> streams = topology.getStreams(); - for (Stream stream : streams) { - SamzaStream samzaStream = (SamzaStream) stream; - List<SamzaSystemStream> systemStreams = samzaStream.getSystemStreams(); - for (SamzaSystemStream systemStream : systemStreams) { - // all streams should be kafka streams - SystemsUtils.createKafkaTopic(systemStream.getStream(), systemStream.getParallelism(), kafkaReplicationFactor); - } - } - - // Submit the jobs with those configs - for (MapConfig config : configs) { - logger.info("Config:{}", config); - JobRunner jobRunner = new JobRunner(config); - jobRunner.run(); - } - } - - private void _setupSystemsUtils() { - // Setup Utils - if (!isLocalMode) - SystemsUtils.setHadoopConfigHome(yarnConfHome); - SystemsUtils.setZookeeper(zookeeper); - } - - /* - * Setter methods - */ - public static SamzaEngine getEngine() { - return engine; - } - - public SamzaEngine setZooKeeper(String zk) { - this.zookeeper = zk; - return this; - } - - public SamzaEngine setKafka(String kafka) { - this.kafka = kafka; - return this; - } - - public SamzaEngine setKafkaReplicationFactor(int replicationFactor) { - this.kafkaReplicationFactor = replicationFactor; - return this; - } - - public SamzaEngine setCheckpointFrequency(int freq) { - this.checkpointFrequency = freq; - return this; - } - - public SamzaEngine setLocalMode(boolean isLocal) { - this.isLocalMode = isLocal; - return this; - } - - public SamzaEngine setYarnPackage(String yarnPackagePath) { - this.yarnPackagePath = yarnPackagePath; - return this; - } - - public SamzaEngine setConfigHome(String configHome) { - this.yarnConfHome = configHome; - return this; - } - - public SamzaEngine setAMMemory(int mem) { - this.amMem = mem; - return this; - } - - public SamzaEngine setContainerMemory(int mem) { - this.containerMem = mem; - return this; - } - - public SamzaEngine setPiPerContainerRatio(int piPerContainer) { - this.piPerContainerRatio = piPerContainer; - return this; - } - - public SamzaEngine setKryoRegisterFile(String registerFile) { - this.kryoRegisterFile = registerFile; - return this; - } - - /** - * Submit a list of Samza jobs correspond to the submitted topology - * - * @param topo - * the submitted topology - */ - public static void submitTopology(SamzaTopology topo) { - // Setup SystemsUtils - engine._setupSystemsUtils(); - - // Submit topology - engine._submitTopology(topo); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java deleted file mode 100644 index bfbd40c..0000000 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java +++ /dev/null @@ -1,229 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.Serializable; -import java.util.concurrent.TimeUnit; - -import org.apache.samza.Partition; -import org.apache.samza.config.Config; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.util.BlockingEnvelopeMap; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.task.InitableTask; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.topology.AbstractEntranceProcessingItem; -import com.yahoo.labs.samoa.topology.Stream; -import com.yahoo.labs.samoa.utils.SamzaConfigFactory; -import com.yahoo.labs.samoa.utils.SystemsUtils; - -/** - * EntranceProcessingItem for Samza which is also a Samza task (StreamTask & InitableTask) - * - * @author Anh Thu Vu - * - */ -public class SamzaEntranceProcessingItem extends AbstractEntranceProcessingItem - implements SamzaProcessingNode, Serializable, StreamTask, InitableTask { - - /** - * - */ - private static final long serialVersionUID = 7157734520046135039L; - - /* - * Constructors - */ - public SamzaEntranceProcessingItem(EntranceProcessor processor) { - super(processor); - } - - // Need this so Samza can initialize a StreamTask - public SamzaEntranceProcessingItem() { - } - - /* - * Simple setters, getters - */ - @Override - public int addOutputStream(SamzaStream stream) { - this.setOutputStream(stream); - return 1; // entrance PI should have only 1 output stream - } - - /* - * Serialization - */ - private Object writeReplace() { - return new SerializationProxy(this); - } - - private static class SerializationProxy implements Serializable { - /** - * - */ - private static final long serialVersionUID = 313907132721414634L; - - private EntranceProcessor processor; - private SamzaStream outputStream; - private String name; - - public SerializationProxy(SamzaEntranceProcessingItem epi) { - this.processor = epi.getProcessor(); - this.outputStream = (SamzaStream) epi.getOutputStream(); - this.name = epi.getName(); - } - } - - /* - * Implement Samza Task - */ - @Override - public void init(Config config, TaskContext context) throws Exception { - String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY); - if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property is set, otherwise, assume we are running in local mode and ignore this - SystemsUtils.setHadoopConfigHome(yarnConfHome); - - String filename = config.get(SamzaConfigFactory.FILE_KEY); - String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY); - - this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY)); - SerializationProxy wrapper = (SerializationProxy) SystemsUtils.deserializeObjectFromFileAndKey(filesystem, - filename, this.getName()); - this.setOutputStream(wrapper.outputStream); - SamzaStream output = (SamzaStream) this.getOutputStream(); - if (output != null) // if output stream exists, set it up - output.onCreate(); - } - - @Override - public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) - throws Exception { - SamzaStream output = (SamzaStream) this.getOutputStream(); - if (output == null) - return; // if there is no output stream, do nothing - output.setCollector(collector); - ContentEvent event = (ContentEvent) envelope.getMessage(); - output.put(event); - } - - /* - * Implementation of Samza's SystemConsumer to get events from source and feed - * to SAMOA system - */ - /* - * Current implementation: buffer the incoming events and send a batch of them - * when poll() is called by Samza system. - * - * Currently: it has a "soft" limit on the size of the buffer: when the buffer - * size reaches the limit, the reading thread will sleep for 100ms. A hard - * limit can be achieved by overriding the method protected - * BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() of - * BlockingEnvelopeMap But then we have handle the case when the queue is - * full. - */ - public static class SamoaSystemConsumer extends BlockingEnvelopeMap { - - private EntranceProcessor entranceProcessor = null; - private SystemStreamPartition systemStreamPartition; - - private static final Logger logger = LoggerFactory.getLogger(SamoaSystemConsumer.class); - - public SamoaSystemConsumer(String systemName, Config config) { - String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY); - if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property is set, otherwise, assume we are running in local mode and ignore this - SystemsUtils.setHadoopConfigHome(yarnConfHome); - - String filename = config.get(SamzaConfigFactory.FILE_KEY); - String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY); - String name = config.get(SamzaConfigFactory.JOB_NAME_KEY); - SerializationProxy wrapper = (SerializationProxy) SystemsUtils.deserializeObjectFromFileAndKey(filesystem, - filename, name); - - this.entranceProcessor = wrapper.processor; - this.entranceProcessor.onCreate(0); - - // Internal stream from SystemConsumer to EntranceTask, so we - // need only one partition - this.systemStreamPartition = new SystemStreamPartition(systemName, wrapper.name, new Partition(0)); - } - - @Override - public void start() { - Thread processorPollingThread = new Thread( - new Runnable() { - @Override - public void run() { - try { - pollingEntranceProcessor(); - setIsAtHead(systemStreamPartition, true); - } catch (InterruptedException e) { - e.getStackTrace(); - stop(); - } - } - } - ); - - processorPollingThread.start(); - } - - @Override - public void stop() { - - } - - private void pollingEntranceProcessor() throws InterruptedException { - int messageCnt = 0; - while (!this.entranceProcessor.isFinished()) { - messageCnt = this.getNumMessagesInQueue(systemStreamPartition); - if (this.entranceProcessor.hasNext() && messageCnt < 10000) { // soft - // limit - // on the - // size of - // the - // queue - this.put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, null, null, - this.entranceProcessor.nextEvent())); - } else { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - break; - } - } - } - - // Send last event - this.put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, null, null, - this.entranceProcessor.nextEvent())); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java deleted file mode 100644 index 6998aa2..0000000 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java +++ /dev/null @@ -1,167 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.Serializable; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.topology.AbstractProcessingItem; -import com.yahoo.labs.samoa.topology.ProcessingItem; -import com.yahoo.labs.samoa.topology.Stream; -import com.yahoo.labs.samoa.topology.impl.SamzaStream.SamzaSystemStream; -import com.yahoo.labs.samoa.utils.PartitioningScheme; -import com.yahoo.labs.samoa.utils.SamzaConfigFactory; -import com.yahoo.labs.samoa.utils.SystemsUtils; -import com.yahoo.labs.samoa.utils.StreamDestination; - -import org.apache.samza.config.Config; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.task.InitableTask; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; - -/** - * ProcessingItem for Samza which is also a Samza task (StreamTask and InitableTask) - * - * @author Anh Thu Vu - */ -public class SamzaProcessingItem extends AbstractProcessingItem - implements SamzaProcessingNode, Serializable, StreamTask, InitableTask { - - /** - * - */ - private static final long serialVersionUID = 1L; - - private Set<SamzaSystemStream> inputStreams; // input streams: system.stream - private List<SamzaStream> outputStreams; - - /* - * Constructors - */ - // Need this so Samza can initialize a StreamTask - public SamzaProcessingItem() { - } - - /* - * Implement com.yahoo.labs.samoa.topology.ProcessingItem - */ - public SamzaProcessingItem(Processor processor, int parallelismHint) { - super(processor, parallelismHint); - this.inputStreams = new HashSet<SamzaSystemStream>(); - this.outputStreams = new LinkedList<SamzaStream>(); - } - - /* - * Simple setters, getters - */ - public Set<SamzaSystemStream> getInputStreams() { - return this.inputStreams; - } - - /* - * Extends AbstractProcessingItem - */ - @Override - protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { - SamzaSystemStream stream = ((SamzaStream) inputStream).addDestination(new StreamDestination(this, this - .getParallelism(), scheme)); - this.inputStreams.add(stream); - return this; - } - - /* - * Implement com.yahoo.samoa.topology.impl.SamzaProcessingNode - */ - @Override - public int addOutputStream(SamzaStream stream) { - this.outputStreams.add(stream); - return this.outputStreams.size(); - } - - public List<SamzaStream> getOutputStreams() { - return this.outputStreams; - } - - /* - * Implement Samza task - */ - @Override - public void init(Config config, TaskContext context) throws Exception { - String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY); - if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property is set, otherwise, assume we are running in local mode and ignore this // set , otherwise, - SystemsUtils.setHadoopConfigHome(yarnConfHome); - - String filename = config.get(SamzaConfigFactory.FILE_KEY); - String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY); - this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY)); - SerializationProxy wrapper = (SerializationProxy) SystemsUtils.deserializeObjectFromFileAndKey(filesystem, - filename, this.getName()); - this.setProcessor(wrapper.processor); - this.outputStreams = wrapper.outputStreams; - - // Init Processor and Streams - this.getProcessor().onCreate(0); - for (SamzaStream stream : this.outputStreams) { - stream.onCreate(); - } - - } - - @Override - public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) - throws Exception { - for (SamzaStream stream : this.outputStreams) { - stream.setCollector(collector); - } - this.getProcessor().process((ContentEvent) envelope.getMessage()); - } - - /* - * SerializationProxy - */ - private Object writeReplace() { - return new SerializationProxy(this); - } - - private static class SerializationProxy implements Serializable { - /** - * - */ - private static final long serialVersionUID = 1534643987559070336L; - - private Processor processor; - private List<SamzaStream> outputStreams; - - public SerializationProxy(SamzaProcessingItem pi) { - this.processor = pi.getProcessor(); - this.outputStreams = pi.getOutputStreams(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java deleted file mode 100644 index 8a0e01e..0000000 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.topology.IProcessingItem; - -/** - * Common interface of SamzaEntranceProcessingItem and SamzaProcessingItem - * - * @author Anh Thu Vu - */ -public interface SamzaProcessingNode extends IProcessingItem { - /** - * Registers an output stream with this processing item - * - * @param stream - * the output stream - * @return the number of output streams of this processing item - */ - public int addOutputStream(SamzaStream stream); - - /** - * Gets the name/id of this processing item - * - * @return the name/id of this processing item - */ - // TODO: include getName() and setName() in IProcessingItem and/or - // AbstractEPI/PI - public String getName(); - - /** - * Sets the name/id for this processing item - * - * @param name - * the name/id of this processing item - */ - // TODO: include getName() and setName() in IProcessingItem and/or - // AbstractEPI/PI - public void setName(String name); -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java deleted file mode 100644 index 98165d4..0000000 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java +++ /dev/null @@ -1,246 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.SystemStream; -import org.apache.samza.task.MessageCollector; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.topology.IProcessingItem; -import com.yahoo.labs.samoa.topology.AbstractStream; -import com.yahoo.labs.samoa.utils.PartitioningScheme; -import com.yahoo.labs.samoa.utils.StreamDestination; - -/** - * Stream for SAMOA on Samza - * - * @author Anh Thu Vu - */ -public class SamzaStream extends AbstractStream implements Serializable { - - /** - * - */ - private static final long serialVersionUID = 1L; - - private static final String DEFAULT_SYSTEM_NAME = "kafka"; - - private List<SamzaSystemStream> systemStreams; - private transient MessageCollector collector; - private String systemName; - - /* - * Constructor - */ - public SamzaStream(IProcessingItem sourcePi) { - super(sourcePi); - this.systemName = DEFAULT_SYSTEM_NAME; - // Get name/id for this stream - SamzaProcessingNode samzaPi = (SamzaProcessingNode) sourcePi; - int index = samzaPi.addOutputStream(this); - this.setStreamId(samzaPi.getName() + "-" + Integer.toString(index)); - // init list of SamzaSystemStream - systemStreams = new ArrayList<SamzaSystemStream>(); - } - - /* - * System name (Kafka) - */ - public void setSystemName(String systemName) { - this.systemName = systemName; - for (SamzaSystemStream systemStream : systemStreams) { - systemStream.setSystem(systemName); - } - } - - public String getSystemName() { - return this.systemName; - } - - /* - * Add the PI to the list of destinations. Return the name of the - * corresponding SystemStream. - */ - public SamzaSystemStream addDestination(StreamDestination destination) { - PartitioningScheme scheme = destination.getPartitioningScheme(); - int parallelism = destination.getParallelism(); - - SamzaSystemStream resultStream = null; - for (int i = 0; i < systemStreams.size(); i++) { - // There is an existing SystemStream that matches the settings. - // Do not create a new one - if (systemStreams.get(i).isSame(scheme, parallelism)) { - resultStream = systemStreams.get(i); - } - } - - // No existing SystemStream match the requirement - // Create a new one - if (resultStream == null) { - String topicName = this.getStreamId() + "-" + Integer.toString(systemStreams.size()); - resultStream = new SamzaSystemStream(this.systemName, topicName, scheme, parallelism); - systemStreams.add(resultStream); - } - - return resultStream; - } - - public void setCollector(MessageCollector collector) { - this.collector = collector; - } - - public MessageCollector getCollector() { - return this.collector; - } - - public void onCreate() { - for (SamzaSystemStream stream : systemStreams) { - stream.initSystemStream(); - } - } - - /* - * Implement Stream interface - */ - @Override - public void put(ContentEvent event) { - for (SamzaSystemStream stream : systemStreams) { - stream.send(collector, event); - } - } - - public List<SamzaSystemStream> getSystemStreams() { - return this.systemStreams; - } - - /** - * SamzaSystemStream wrap around a Samza's SystemStream It contains the info to create a Samza stream during the - * constructing process of the topology and will create the actual Samza stream when the topology is submitted - * (invoking initSystemStream()) - * - * @author Anh Thu Vu - */ - public static class SamzaSystemStream implements Serializable { - /** - * - */ - private static final long serialVersionUID = 1L; - private String system; - private String stream; - private PartitioningScheme scheme; - private int parallelism; - - private transient SystemStream actualSystemStream = null; - - /* - * Constructors - */ - public SamzaSystemStream(String system, String stream, PartitioningScheme scheme, int parallelism) { - this.system = system; - this.stream = stream; - this.scheme = scheme; - this.parallelism = parallelism; - } - - public SamzaSystemStream(String system, String stream, PartitioningScheme scheme) { - this(system, stream, scheme, 1); - } - - /* - * Setters - */ - public void setSystem(String system) { - this.system = system; - } - - /* - * Getters - */ - public String getSystem() { - return this.system; - } - - public String getStream() { - return this.stream; - } - - public PartitioningScheme getPartitioningScheme() { - return this.scheme; - } - - public int getParallelism() { - return this.parallelism; - } - - public boolean isSame(PartitioningScheme scheme, int parallelismHint) { - return (this.scheme == scheme && this.parallelism == parallelismHint); - } - - /* - * Init the actual Samza stream - */ - public void initSystemStream() { - actualSystemStream = new SystemStream(this.system, this.stream); - } - - /* - * Send a ContentEvent - */ - public void send(MessageCollector collector, ContentEvent contentEvent) { - if (actualSystemStream == null) - this.initSystemStream(); - - switch (this.scheme) { - case SHUFFLE: - this.sendShuffle(collector, contentEvent); - break; - case GROUP_BY_KEY: - this.sendGroupByKey(collector, contentEvent); - break; - case BROADCAST: - this.sendBroadcast(collector, contentEvent); - break; - } - } - - /* - * Helpers - */ - private synchronized void sendShuffle(MessageCollector collector, ContentEvent event) { - collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, event)); - } - - private void sendGroupByKey(MessageCollector collector, ContentEvent event) { - collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, event.getKey(), null, event)); - } - - private synchronized void sendBroadcast(MessageCollector collector, ContentEvent event) { - for (int i = 0; i < parallelism; i++) { - collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, i, null, event)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java deleted file mode 100644 index a30a3a3..0000000 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.HashSet; -import java.util.Set; - -import com.yahoo.labs.samoa.topology.IProcessingItem; -import com.yahoo.labs.samoa.topology.AbstractTopology; - -/** - * Topology for Samza - * - * @author Anh Thu Vu - */ -public class SamzaTopology extends AbstractTopology { - private int procItemCounter; - - public SamzaTopology(String topoName) { - super(topoName); - procItemCounter = 0; - } - - @Override - public void addProcessingItem(IProcessingItem procItem, int parallelism) { - super.addProcessingItem(procItem, parallelism); - SamzaProcessingNode samzaPi = (SamzaProcessingNode) procItem; - samzaPi.setName(this.getTopologyName() + "-" + Integer.toString(procItemCounter)); - procItemCounter++; - } - - /* - * Gets the set of ProcessingItems, excluding EntrancePIs Used by - * SamzaConfigFactory as the config for EntrancePIs and normal PIs are - * different - */ - public Set<IProcessingItem> getNonEntranceProcessingItems() throws Exception { - Set<IProcessingItem> copiedSet = new HashSet<IProcessingItem>(); - copiedSet.addAll(this.getProcessingItems()); - boolean result = copiedSet.removeAll(this.getEntranceProcessingItems()); - if (!result) { - throw new Exception("Failed extracting the set of non-entrance processing items"); - } - return copiedSet; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java deleted file mode 100644 index 972daa2..0000000 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java +++ /dev/null @@ -1,539 +0,0 @@ -package com.yahoo.labs.samoa.utils; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.file.FileSystems; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -import org.apache.commons.io.FileUtils; -import org.apache.samza.config.MapConfig; -import org.apache.samza.job.local.LocalJobFactory; -import org.apache.samza.job.yarn.YarnJobFactory; -import org.apache.samza.system.kafka.KafkaSystemFactory; - -import com.yahoo.labs.samoa.topology.EntranceProcessingItem; -import com.yahoo.labs.samoa.topology.IProcessingItem; -import com.yahoo.labs.samoa.topology.ProcessingItem; -import com.yahoo.labs.samoa.topology.Stream; -import com.yahoo.labs.samoa.topology.impl.SamoaSystemFactory; -import com.yahoo.labs.samoa.topology.impl.SamzaEntranceProcessingItem; -import com.yahoo.labs.samoa.topology.impl.SamzaProcessingItem; -import com.yahoo.labs.samoa.topology.impl.SamzaStream; -import com.yahoo.labs.samoa.topology.impl.SamzaTopology; -import com.yahoo.labs.samoa.topology.impl.SamzaStream.SamzaSystemStream; - -/** - * Generate Configs that will be used to submit Samza jobs from the input topology (one config per PI/EntrancePI in the - * topology) - * - * @author Anh Thu Vu - * - */ -public class SamzaConfigFactory { - public static final String SYSTEM_NAME = "samoa"; - - // DEFAULT VALUES - private static final String DEFAULT_ZOOKEEPER = "localhost:2181"; - private static final String DEFAULT_BROKER_LIST = "localhost:9092"; - - // DELIMINATORS - public static final String COMMA = ","; - public static final String COLON = ":"; - public static final String DOT = "."; - public static final char DOLLAR_SIGN = '$'; - public static final char QUESTION_MARK = '?'; - - // PARTITIONING SCHEMES - public static final String SHUFFLE = "shuffle"; - public static final String KEY = "key"; - public static final String BROADCAST = "broadcast"; - - // PROPERTY KEYS - // JOB - public static final String JOB_FACTORY_CLASS_KEY = "job.factory.class"; - public static final String JOB_NAME_KEY = "job.name"; - // YARN - public static final String YARN_PACKAGE_KEY = "yarn.package.path"; - public static final String CONTAINER_MEMORY_KEY = "yarn.container.memory.mb"; - public static final String AM_MEMORY_KEY = "yarn.am.container.memory.mb"; - public static final String CONTAINER_COUNT_KEY = "yarn.container.count"; - // TASK (SAMZA original) - public static final String TASK_CLASS_KEY = "task.class"; - public static final String TASK_INPUTS_KEY = "task.inputs"; - // TASK (extra) - public static final String FILE_KEY = "task.processor.file"; - public static final String FILESYSTEM_KEY = "task.processor.filesystem"; - public static final String ENTRANCE_INPUT_KEY = "task.entrance.input"; - public static final String ENTRANCE_OUTPUT_KEY = "task.entrance.outputs"; - public static final String YARN_CONF_HOME_KEY = "yarn.config.home"; - // KAFKA - public static final String ZOOKEEPER_URI_KEY = "consumer.zookeeper.connect"; - public static final String BROKER_URI_KEY = "producer.metadata.broker.list"; - public static final String KAFKA_BATCHSIZE_KEY = "producer.batch.num.messages"; - public static final String KAFKA_PRODUCER_TYPE_KEY = "producer.producer.type"; - // SERDE - public static final String SERDE_REGISTRATION_KEY = "kryo.register"; - - // Instance variables - private boolean isLocalMode; - private String zookeeper; - private String kafkaBrokerList; - private int replicationFactor; - private int amMemory; - private int containerMemory; - private int piPerContainerRatio; - private int checkpointFrequency; // in ms - - private String jarPath; - private String kryoRegisterFile = null; - - public SamzaConfigFactory() { - this.isLocalMode = false; - this.zookeeper = DEFAULT_ZOOKEEPER; - this.kafkaBrokerList = DEFAULT_BROKER_LIST; - this.checkpointFrequency = 60000; // default: 1 minute - this.replicationFactor = 1; - } - - /* - * Setter methods - */ - public SamzaConfigFactory setYarnPackage(String packagePath) { - this.jarPath = packagePath; - return this; - } - - public SamzaConfigFactory setLocalMode(boolean isLocal) { - this.isLocalMode = isLocal; - return this; - } - - public SamzaConfigFactory setZookeeper(String zk) { - this.zookeeper = zk; - return this; - } - - public SamzaConfigFactory setKafka(String brokerList) { - this.kafkaBrokerList = brokerList; - return this; - } - - public SamzaConfigFactory setCheckpointFrequency(int freq) { - this.checkpointFrequency = freq; - return this; - } - - public SamzaConfigFactory setReplicationFactor(int replicationFactor) { - this.replicationFactor = replicationFactor; - return this; - } - - public SamzaConfigFactory setAMMemory(int mem) { - this.amMemory = mem; - return this; - } - - public SamzaConfigFactory setContainerMemory(int mem) { - this.containerMemory = mem; - return this; - } - - public SamzaConfigFactory setPiPerContainerRatio(int piPerContainer) { - this.piPerContainerRatio = piPerContainer; - return this; - } - - public SamzaConfigFactory setKryoRegisterFile(String kryoRegister) { - this.kryoRegisterFile = kryoRegister; - return this; - } - - /* - * Generate a map of all config properties for the input SamzaProcessingItem - */ - private Map<String, String> getMapForPI(SamzaProcessingItem pi, String filename, String filesystem) throws Exception { - Map<String, String> map = getBasicSystemConfig(); - - // Set job name, task class, task inputs (from SamzaProcessingItem) - setJobName(map, pi.getName()); - setTaskClass(map, SamzaProcessingItem.class.getName()); - - StringBuilder streamNames = new StringBuilder(); - boolean first = true; - for (SamzaSystemStream stream : pi.getInputStreams()) { - if (!first) - streamNames.append(COMMA); - streamNames.append(stream.getSystem() + DOT + stream.getStream()); - if (first) - first = false; - } - setTaskInputs(map, streamNames.toString()); - - // Processor file - setFileName(map, filename); - setFileSystem(map, filesystem); - - List<String> nameList = new ArrayList<String>(); - // Default kafka system: kafka0: sync producer - // This system is always required: it is used for checkpointing - nameList.add("kafka0"); - setKafkaSystem(map, "kafka0", this.zookeeper, this.kafkaBrokerList, 1); - // Output streams: set kafka systems - for (SamzaStream stream : pi.getOutputStreams()) { - boolean found = false; - for (String name : nameList) { - if (stream.getSystemName().equals(name)) { - found = true; - break; - } - } - if (!found) { - nameList.add(stream.getSystemName()); - setKafkaSystem(map, stream.getSystemName(), this.zookeeper, this.kafkaBrokerList, stream.getBatchSize()); - } - } - // Input streams: set kafka systems - for (SamzaSystemStream stream : pi.getInputStreams()) { - boolean found = false; - for (String name : nameList) { - if (stream.getSystem().equals(name)) { - found = true; - break; - } - } - if (!found) { - nameList.add(stream.getSystem()); - setKafkaSystem(map, stream.getSystem(), this.zookeeper, this.kafkaBrokerList, 1); - } - } - - // Checkpointing - setValue(map, "task.checkpoint.factory", "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory"); - setValue(map, "task.checkpoint.system", "kafka0"); - setValue(map, "task.commit.ms", "1000"); - setValue(map, "task.checkpoint.replication.factor", Integer.toString(this.replicationFactor)); - - // Number of containers - setNumberOfContainers(map, pi.getParallelism(), this.piPerContainerRatio); - - return map; - } - - /* - * Generate a map of all config properties for the input SamzaProcessingItem - */ - public Map<String, String> getMapForEntrancePI(SamzaEntranceProcessingItem epi, String filename, String filesystem) { - Map<String, String> map = getBasicSystemConfig(); - - // Set job name, task class (from SamzaEntranceProcessingItem) - setJobName(map, epi.getName()); - setTaskClass(map, SamzaEntranceProcessingItem.class.getName()); - - // Input for the entrance task (from our custom consumer) - setTaskInputs(map, SYSTEM_NAME + "." + epi.getName()); - - // Output from entrance task - // Since entrancePI should have only 1 output stream - // there is no need for checking the batch size, setting different system - // names - // The custom consumer (samoa system) does not suuport reading from a - // specific index - // => no need for checkpointing - SamzaStream outputStream = (SamzaStream) epi.getOutputStream(); - // Set samoa system factory - setValue(map, "systems." + SYSTEM_NAME + ".samza.factory", SamoaSystemFactory.class.getName()); - // Set Kafka system (only if there is an output stream) - if (outputStream != null) - setKafkaSystem(map, outputStream.getSystemName(), this.zookeeper, this.kafkaBrokerList, - outputStream.getBatchSize()); - - // Processor file - setFileName(map, filename); - setFileSystem(map, filesystem); - - // Number of containers - setNumberOfContainers(map, 1, this.piPerContainerRatio); - - return map; - } - - /* - * Generate a list of map (of config properties) for all PIs and EPI in the - * input topology - */ - public List<Map<String, String>> getMapsForTopology(SamzaTopology topology) throws Exception { - - List<Map<String, String>> maps = new ArrayList<Map<String, String>>(); - - // File to write serialized objects - String filename = topology.getTopologyName() + ".dat"; - Path dirPath = FileSystems.getDefault().getPath("dat"); - Path filePath = FileSystems.getDefault().getPath(dirPath.toString(), filename); - String dstPath = filePath.toString(); - String resPath; - String filesystem; - if (this.isLocalMode) { - filesystem = SystemsUtils.LOCAL_FS; - File dir = dirPath.toFile(); - if (!dir.exists()) - FileUtils.forceMkdir(dir); - } - else { - filesystem = SystemsUtils.HDFS; - } - - // Correct system name for streams - this.setSystemNameForStreams(topology.getStreams()); - - // Add all PIs to a collection (map) - Map<String, Object> piMap = new HashMap<String, Object>(); - Set<EntranceProcessingItem> entranceProcessingItems = topology.getEntranceProcessingItems(); - Set<IProcessingItem> processingItems = topology.getNonEntranceProcessingItems(); - for (EntranceProcessingItem epi : entranceProcessingItems) { - SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi; - piMap.put(sepi.getName(), sepi); - } - for (IProcessingItem pi : processingItems) { - SamzaProcessingItem spi = (SamzaProcessingItem) pi; - piMap.put(spi.getName(), spi); - } - - // Serialize all PIs - boolean serialized = false; - if (this.isLocalMode) { - serialized = SystemsUtils.serializeObjectToLocalFileSystem(piMap, dstPath); - resPath = dstPath; - } - else { - resPath = SystemsUtils.serializeObjectToHDFS(piMap, dstPath); - serialized = resPath != null; - } - - if (!serialized) { - throw new Exception("Fail serialize map of PIs to file"); - } - - // MapConfig for all PIs - for (EntranceProcessingItem epi : entranceProcessingItems) { - SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi; - maps.add(this.getMapForEntrancePI(sepi, resPath, filesystem)); - } - for (IProcessingItem pi : processingItems) { - SamzaProcessingItem spi = (SamzaProcessingItem) pi; - maps.add(this.getMapForPI(spi, resPath, filesystem)); - } - - return maps; - } - - /** - * Construct a list of MapConfigs for a Topology - * - * @return the list of MapConfigs - * @throws Exception - */ - public List<MapConfig> getMapConfigsForTopology(SamzaTopology topology) throws Exception { - List<MapConfig> configs = new ArrayList<MapConfig>(); - List<Map<String, String>> maps = this.getMapsForTopology(topology); - for (Map<String, String> map : maps) { - configs.add(new MapConfig(map)); - } - return configs; - } - - /* - * - */ - public void setSystemNameForStreams(Set<Stream> streams) { - Map<Integer, String> batchSizeMap = new HashMap<Integer, String>(); - batchSizeMap.put(1, "kafka0"); // default system with sync producer - int counter = 0; - for (Stream stream : streams) { - SamzaStream samzaStream = (SamzaStream) stream; - String systemName = batchSizeMap.get(samzaStream.getBatchSize()); - if (systemName == null) { - counter++; - // Add new system - systemName = "kafka" + Integer.toString(counter); - batchSizeMap.put(samzaStream.getBatchSize(), systemName); - } - samzaStream.setSystemName(systemName); - } - - } - - /* - * Generate a map with common properties for PIs and EPI - */ - private Map<String, String> getBasicSystemConfig() { - Map<String, String> map = new HashMap<String, String>(); - // Job & Task - if (this.isLocalMode) - map.put(JOB_FACTORY_CLASS_KEY, LocalJobFactory.class.getName()); - else { - map.put(JOB_FACTORY_CLASS_KEY, YarnJobFactory.class.getName()); - - // yarn - map.put(YARN_PACKAGE_KEY, jarPath); - map.put(CONTAINER_MEMORY_KEY, Integer.toString(this.containerMemory)); - map.put(AM_MEMORY_KEY, Integer.toString(this.amMemory)); - map.put(CONTAINER_COUNT_KEY, "1"); - map.put(YARN_CONF_HOME_KEY, SystemsUtils.getHadoopConfigHome()); - - // Task opts (Heap size = 0.75 container memory) - int heapSize = (int) (0.75 * this.containerMemory); - map.put("task.opts", "-Xmx" + Integer.toString(heapSize) + "M -XX:+PrintGCDateStamps"); - } - - map.put(JOB_NAME_KEY, ""); - map.put(TASK_CLASS_KEY, ""); - map.put(TASK_INPUTS_KEY, ""); - - // register serializer - map.put("serializers.registry.kryo.class", SamzaKryoSerdeFactory.class.getName()); - - // Serde registration - setKryoRegistration(map, this.kryoRegisterFile); - - return map; - } - - /* - * Helper methods to set different properties in the input map - */ - private static void setJobName(Map<String, String> map, String jobName) { - map.put(JOB_NAME_KEY, jobName); - } - - private static void setFileName(Map<String, String> map, String filename) { - map.put(FILE_KEY, filename); - } - - private static void setFileSystem(Map<String, String> map, String filesystem) { - map.put(FILESYSTEM_KEY, filesystem); - } - - private static void setTaskClass(Map<String, String> map, String taskClass) { - map.put(TASK_CLASS_KEY, taskClass); - } - - private static void setTaskInputs(Map<String, String> map, String inputs) { - map.put(TASK_INPUTS_KEY, inputs); - } - - private static void setKryoRegistration(Map<String, String> map, String kryoRegisterFile) { - if (kryoRegisterFile != null) { - String value = readKryoRegistration(kryoRegisterFile); - map.put(SERDE_REGISTRATION_KEY, value); - } - } - - private static void setNumberOfContainers(Map<String, String> map, int parallelism, int piPerContainer) { - int res = parallelism / piPerContainer; - if (parallelism % piPerContainer != 0) - res++; - map.put(CONTAINER_COUNT_KEY, Integer.toString(res)); - } - - private static void setKafkaSystem(Map<String, String> map, String systemName, String zk, String brokers, - int batchSize) { - map.put("systems." + systemName + ".samza.factory", KafkaSystemFactory.class.getName()); - map.put("systems." + systemName + ".samza.msg.serde", "kryo"); - - map.put("systems." + systemName + "." + ZOOKEEPER_URI_KEY, zk); - map.put("systems." + systemName + "." + BROKER_URI_KEY, brokers); - map.put("systems." + systemName + "." + KAFKA_BATCHSIZE_KEY, Integer.toString(batchSize)); - - map.put("systems." + systemName + ".samza.offset.default", "oldest"); - - if (batchSize > 1) { - map.put("systems." + systemName + "." + KAFKA_PRODUCER_TYPE_KEY, "async"); - } - else { - map.put("systems." + systemName + "." + KAFKA_PRODUCER_TYPE_KEY, "sync"); - } - } - - // Set custom properties - private static void setValue(Map<String, String> map, String key, String value) { - map.put(key, value); - } - - /* - * Helper method to parse Kryo registration file - */ - private static String readKryoRegistration(String filePath) { - FileInputStream fis = null; - Properties props = new Properties(); - StringBuilder result = new StringBuilder(); - try { - fis = new FileInputStream(filePath); - props.load(fis); - - boolean first = true; - String value = null; - for (String k : props.stringPropertyNames()) { - if (!first) - result.append(COMMA); - else - first = false; - - // Need to avoid the dollar sign as samza pass all the properties in - // the config to containers via commandline parameters/enviroment - // variables - // We might escape the dollar sign, but it's more complicated than - // replacing it with something else - result.append(k.trim().replace(DOLLAR_SIGN, QUESTION_MARK)); - value = props.getProperty(k); - if (value != null && value.trim().length() > 0) { - result.append(COLON); - result.append(value.trim().replace(DOLLAR_SIGN, QUESTION_MARK)); - } - } - } catch (FileNotFoundException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } finally { - if (fis != null) - try { - fis.close(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - return result.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java deleted file mode 100644 index 1874fa3..0000000 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java +++ /dev/null @@ -1,158 +0,0 @@ -package com.yahoo.labs.samoa.utils; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.ByteArrayOutputStream; - -import org.apache.samza.config.Config; -import org.apache.samza.serializers.Serde; -import org.apache.samza.serializers.SerdeFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -/** - * Implementation of Samza's SerdeFactory that uses Kryo to serialize/deserialize objects - * - * @author Anh Thu Vu - * @param <T> - * - */ -public class SamzaKryoSerdeFactory<T> implements SerdeFactory<T> { - - private static final Logger logger = LoggerFactory.getLogger(SamzaKryoSerdeFactory.class); - - public static class SamzaKryoSerde<V> implements Serde<V> { - private Kryo kryo; - - public SamzaKryoSerde(String registrationInfo) { - this.kryo = new Kryo(); - this.register(registrationInfo); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private void register(String registrationInfo) { - if (registrationInfo == null) - return; - - String[] infoList = registrationInfo.split(SamzaConfigFactory.COMMA); - - Class targetClass = null; - Class serializerClass = null; - Serializer serializer = null; - - for (String info : infoList) { - String[] fields = info.split(SamzaConfigFactory.COLON); - - targetClass = null; - serializerClass = null; - if (fields.length > 0) { - try { - targetClass = Class.forName(fields[0].replace(SamzaConfigFactory.QUESTION_MARK, - SamzaConfigFactory.DOLLAR_SIGN)); - } catch (ClassNotFoundException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - if (fields.length > 1) { - try { - serializerClass = Class.forName(fields[1].replace(SamzaConfigFactory.QUESTION_MARK, - SamzaConfigFactory.DOLLAR_SIGN)); - } catch (ClassNotFoundException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - if (targetClass != null) { - if (serializerClass == null) { - kryo.register(targetClass); - } - else { - serializer = resolveSerializerInstance(kryo, targetClass, (Class<? extends Serializer>) serializerClass); - kryo.register(targetClass, serializer); - } - } - else { - logger.info("Invalid registration info:{}", info); - } - } - } - - @SuppressWarnings("rawtypes") - private static Serializer resolveSerializerInstance(Kryo k, Class superClass, - Class<? extends Serializer> serializerClass) { - try { - try { - return serializerClass.getConstructor(Kryo.class, Class.class).newInstance(k, superClass); - } catch (Exception ex1) { - try { - return serializerClass.getConstructor(Kryo.class).newInstance(k); - } catch (Exception ex2) { - try { - return serializerClass.getConstructor(Class.class).newInstance(superClass); - } catch (Exception ex3) { - return serializerClass.newInstance(); - } - } - } - } catch (Exception ex) { - throw new IllegalArgumentException("Unable to create serializer \"" - + serializerClass.getName() - + "\" for class: " - + superClass.getName(), ex); - } - } - - /* - * Implement Samza Serde interface - */ - @Override - public byte[] toBytes(V obj) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - Output output = new Output(bos); - kryo.writeClassAndObject(output, obj); - output.flush(); - output.close(); - return bos.toByteArray(); - } - - @SuppressWarnings("unchecked") - @Override - public V fromBytes(byte[] byteArr) { - Input input = new Input(byteArr); - Object obj = kryo.readClassAndObject(input); - input.close(); - return (V) obj; - } - - } - - @Override - public Serde<T> getSerde(String name, Config config) { - return new SamzaKryoSerde<T>(config.get(SamzaConfigFactory.SERDE_REGISTRATION_KEY)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java deleted file mode 100644 index 4b7850d..0000000 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.yahoo.labs.samoa.utils; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -/** - * Serialize and deserialize objects with Java serialization - * - * @author Anh Thu Vu - */ -public class SerializableSerializer extends Serializer<Object> { - @Override - public void write(Kryo kryo, Output output, Object object) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - try { - ObjectOutputStream oos = new ObjectOutputStream(bos); - oos.writeObject(object); - oos.flush(); - } catch (IOException e) { - throw new RuntimeException(e); - } - byte[] ser = bos.toByteArray(); - output.writeInt(ser.length); - output.writeBytes(ser); - } - - @SuppressWarnings("rawtypes") - @Override - public Object read(Kryo kryo, Input input, Class c) { - int len = input.readInt(); - byte[] ser = new byte[len]; - input.readBytes(ser); - ByteArrayInputStream bis = new ByteArrayInputStream(ser); - try { - ObjectInputStream ois = new ObjectInputStream(bis); - return ois.readObject(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java deleted file mode 100644 index 0b12971..0000000 --- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java +++ /dev/null @@ -1,386 +0,0 @@ -package com.yahoo.labs.samoa.utils; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.nio.file.FileSystems; -import java.util.Map; -import java.util.Properties; - -import kafka.admin.AdminUtils; -import kafka.utils.ZKStringSerializer; - -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.exception.ZkMarshallingError; -import org.I0Itec.zkclient.serialize.ZkSerializer; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utilities methods for: - Kafka - HDFS - Handling files on local FS - * - * @author Anh Thu Vu - */ -public class SystemsUtils { - private static final Logger logger = LoggerFactory.getLogger(SystemsUtils.class); - - public static final String HDFS = "hdfs"; - public static final String LOCAL_FS = "local"; - - private static final String TEMP_FILE = "samoaTemp"; - private static final String TEMP_FILE_SUFFIX = ".dat"; - - /* - * Kafka - */ - private static class KafkaUtils { - private static ZkClient zkClient; - - static void setZookeeper(String zk) { - zkClient = new ZkClient(zk, 30000, 30000, new ZKStringSerializerWrapper()); - } - - /* - * Create Kafka topic/stream - */ - static void createKafkaTopic(String name, int partitions, int replicas) { - AdminUtils.createTopic(zkClient, name, partitions, replicas, new Properties()); - } - - static class ZKStringSerializerWrapper implements ZkSerializer { - @Override - public Object deserialize(byte[] byteArray) throws ZkMarshallingError { - return ZKStringSerializer.deserialize(byteArray); - } - - @Override - public byte[] serialize(Object obj) throws ZkMarshallingError { - return ZKStringSerializer.serialize(obj); - } - } - } - - /* - * HDFS - */ - private static class HDFSUtils { - private static String coreConfPath; - private static String hdfsConfPath; - private static String configHomePath; - private static String samoaDir = null; - - static void setHadoopConfigHome(String hadoopConfPath) { - logger.info("Hadoop config home:{}", hadoopConfPath); - configHomePath = hadoopConfPath; - java.nio.file.Path coreSitePath = FileSystems.getDefault().getPath(hadoopConfPath, "core-site.xml"); - java.nio.file.Path hdfsSitePath = FileSystems.getDefault().getPath(hadoopConfPath, "hdfs-site.xml"); - coreConfPath = coreSitePath.toAbsolutePath().toString(); - hdfsConfPath = hdfsSitePath.toAbsolutePath().toString(); - } - - static String getNameNodeUri() { - Configuration config = new Configuration(); - config.addResource(new Path(coreConfPath)); - config.addResource(new Path(hdfsConfPath)); - - return config.get("fs.defaultFS"); - } - - static String getHadoopConfigHome() { - return configHomePath; - } - - static void setSAMOADir(String dir) { - if (dir != null) - samoaDir = getNameNodeUri() + dir; - else - samoaDir = null; - } - - static String getDefaultSAMOADir() throws IOException { - Configuration config = new Configuration(); - config.addResource(new Path(coreConfPath)); - config.addResource(new Path(hdfsConfPath)); - - FileSystem fs = FileSystem.get(config); - Path defaultDir = new Path(fs.getHomeDirectory(), ".samoa"); - return defaultDir.toString(); - } - - static boolean deleteFileIfExist(String absPath) { - Path p = new Path(absPath); - return deleteFileIfExist(p); - } - - static boolean deleteFileIfExist(Path p) { - Configuration config = new Configuration(); - config.addResource(new Path(coreConfPath)); - config.addResource(new Path(hdfsConfPath)); - - FileSystem fs; - try { - fs = FileSystem.get(config); - if (fs.exists(p)) { - return fs.delete(p, false); - } - else - return true; - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return false; - } - - /* - * Write to HDFS - */ - static String writeToHDFS(File file, String dstPath) { - Configuration config = new Configuration(); - config.addResource(new Path(coreConfPath)); - config.addResource(new Path(hdfsConfPath)); - logger.info("Filesystem name:{}", config.get("fs.defaultFS")); - - // Default samoaDir - if (samoaDir == null) { - try { - samoaDir = getDefaultSAMOADir(); - } catch (IOException e) { - e.printStackTrace(); - return null; - } - } - - // Setup src and dst paths - // java.nio.file.Path tempPath = - // FileSystems.getDefault().getPath(samoaDir, dstPath); - Path dst = new Path(samoaDir, dstPath); - Path src = new Path(file.getAbsolutePath()); - - // Delete file if already exists in HDFS - if (deleteFileIfExist(dst) == false) - return null; - - // Copy to HDFS - FileSystem fs; - try { - fs = FileSystem.get(config); - fs.copyFromLocalFile(src, dst); - } catch (IOException e) { - e.printStackTrace(); - return null; - } - - return dst.toString(); // abs path to file - } - - /* - * Read from HDFS - */ - static Object deserializeObjectFromFile(String filePath) { - logger.info("Deserialize HDFS file:{}", filePath); - Configuration config = new Configuration(); - config.addResource(new Path(coreConfPath)); - config.addResource(new Path(hdfsConfPath)); - - Path file = new Path(filePath); - FSDataInputStream dataInputStream = null; - ObjectInputStream ois = null; - Object obj = null; - FileSystem fs; - try { - fs = FileSystem.get(config); - dataInputStream = fs.open(file); - ois = new ObjectInputStream(dataInputStream); - obj = ois.readObject(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (ClassNotFoundException e) { - try { - if (dataInputStream != null) - dataInputStream.close(); - if (ois != null) - ois.close(); - } catch (IOException ioException) { - // TODO auto-generated catch block - e.printStackTrace(); - } - } - - return obj; - } - - } - - private static class LocalFileSystemUtils { - static boolean serializObjectToFile(Object obj, String fn) { - FileOutputStream fos = null; - ObjectOutputStream oos = null; - try { - fos = new FileOutputStream(fn); - oos = new ObjectOutputStream(fos); - oos.writeObject(obj); - } catch (FileNotFoundException e) { - e.printStackTrace(); - return false; - } catch (IOException e) { - e.printStackTrace(); - return false; - } finally { - try { - if (fos != null) - fos.close(); - if (oos != null) - oos.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - return true; - } - - static Object deserializeObjectFromLocalFile(String filename) { - logger.info("Deserialize local file:{}", filename); - FileInputStream fis = null; - ObjectInputStream ois = null; - Object obj = null; - try { - fis = new FileInputStream(filename); - ois = new ObjectInputStream(fis); - obj = ois.readObject(); - } catch (IOException e) { - // TODO auto-generated catch block - e.printStackTrace(); - } catch (ClassNotFoundException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } finally { - try { - if (fis != null) - fis.close(); - if (ois != null) - ois.close(); - } catch (IOException e) { - // TODO auto-generated catch block - e.printStackTrace(); - } - } - - return obj; - } - } - - /* - * Create streams - */ - public static void createKafkaTopic(String name, int partitions) { - createKafkaTopic(name, partitions, 1); - } - - public static void createKafkaTopic(String name, int partitions, int replicas) { - KafkaUtils.createKafkaTopic(name, partitions, replicas); - } - - /* - * Serialize object - */ - public static boolean serializeObjectToLocalFileSystem(Object object, String path) { - return LocalFileSystemUtils.serializObjectToFile(object, path); - } - - public static String serializeObjectToHDFS(Object object, String path) { - File tmpDatFile; - try { - tmpDatFile = File.createTempFile(TEMP_FILE, TEMP_FILE_SUFFIX); - if (serializeObjectToLocalFileSystem(object, tmpDatFile.getAbsolutePath())) { - return HDFSUtils.writeToHDFS(tmpDatFile, path); - } - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return null; - } - - /* - * Deserialize object - */ - @SuppressWarnings("unchecked") - public static Map<String, Object> deserializeMapFromFile(String filesystem, String filename) { - Map<String, Object> map; - if (filesystem.equals(HDFS)) { - map = (Map<String, Object>) HDFSUtils.deserializeObjectFromFile(filename); - } - else { - map = (Map<String, Object>) LocalFileSystemUtils.deserializeObjectFromLocalFile(filename); - } - return map; - } - - public static Object deserializeObjectFromFileAndKey(String filesystem, String filename, String key) { - Map<String, Object> map = deserializeMapFromFile(filesystem, filename); - if (map == null) - return null; - return map.get(key); - } - - /* - * Setup - */ - public static void setZookeeper(String zookeeper) { - KafkaUtils.setZookeeper(zookeeper); - } - - public static void setHadoopConfigHome(String hadoopHome) { - HDFSUtils.setHadoopConfigHome(hadoopHome); - } - - public static void setSAMOADir(String samoaDir) { - HDFSUtils.setSAMOADir(samoaDir); - } - - /* - * Others - */ - public static String getHDFSNameNodeUri() { - return HDFSUtils.getNameNodeUri(); - } - - public static String getHadoopConfigHome() { - return HDFSUtils.getHadoopConfigHome(); - } - - public static String copyToHDFS(File file, String dstPath) { - return HDFSUtils.writeToHDFS(file, dstPath); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/SamzaDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/org/apache/samoa/SamzaDoTask.java b/samoa-samza/src/main/java/org/apache/samoa/SamzaDoTask.java new file mode 100644 index 0000000..3fe3a25 --- /dev/null +++ b/samoa-samza/src/main/java/org/apache/samoa/SamzaDoTask.java @@ -0,0 +1,226 @@ +package org.apache.samoa; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.samoa.tasks.Task; +import org.apache.samoa.topology.impl.SamzaComponentFactory; +import org.apache.samoa.topology.impl.SamzaEngine; +import org.apache.samoa.topology.impl.SamzaTopology; +import org.apache.samoa.utils.SystemsUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.ClassOption; + +/** + * Main class to run the task on Samza + * + * @author Anh Thu Vu + */ +public class SamzaDoTask { + + private static final Logger logger = LoggerFactory.getLogger(SamzaDoTask.class); + + private static final String LOCAL_MODE = "local"; + private static final String YARN_MODE = "yarn"; + + // FLAGS + private static final String YARN_CONF_FLAG = "--yarn_home"; + private static final String MODE_FLAG = "--mode"; + private static final String ZK_FLAG = "--zookeeper"; + private static final String KAFKA_FLAG = "--kafka"; + private static final String KAFKA_REPLICATION_FLAG = "--kafka_replication_factor"; + private static final String CHECKPOINT_FREQ_FLAG = "--checkpoint_frequency"; + private static final String JAR_PACKAGE_FLAG = "--jar_package"; + private static final String SAMOA_HDFS_DIR_FLAG = "--samoa_hdfs_dir"; + private static final String AM_MEMORY_FLAG = "--yarn_am_mem"; + private static final String CONTAINER_MEMORY_FLAG = "--yarn_container_mem"; + private static final String PI_PER_CONTAINER_FLAG = "--pi_per_container"; + + private static final String KRYO_REGISTER_FLAG = "--kryo_register"; + + // config values + private static int kafkaReplicationFactor = 1; + private static int checkpointFrequency = 60000; + private static String kafka = "localhost:9092"; + private static String zookeeper = "localhost:2181"; + private static boolean isLocal = true; + private static String yarnConfHome = null; + private static String samoaHDFSDir = null; + private static String jarPackagePath = null; + private static int amMem = 1024; + private static int containerMem = 1024; + private static int piPerContainer = 2; + private static String kryoRegisterFile = null; + + /* + * 1. Read arguments 2. Construct topology/task 3. Upload the JAR to HDFS if + * we are running on YARN 4. Submit topology to SamzaEngine + */ + public static void main(String[] args) { + // Read arguments + List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + parseArguments(tmpArgs); + + args = tmpArgs.toArray(new String[0]); + + // Init Task + StringBuilder cliString = new StringBuilder(); + for (int i = 0; i < args.length; i++) { + cliString.append(" ").append(args[i]); + } + logger.debug("Command line string = {}", cliString.toString()); + System.out.println("Command line string = " + cliString.toString()); + + Task task = null; + try { + task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, null); + logger.info("Sucessfully instantiating {}", task.getClass().getCanonicalName()); + } catch (Exception e) { + logger.error("Fail to initialize the task", e); + System.out.println("Fail to initialize the task" + e); + return; + } + task.setFactory(new SamzaComponentFactory()); + task.init(); + + // Upload JAR file to HDFS + String hdfsPath = null; + if (!isLocal) { + Path path = FileSystems.getDefault().getPath(jarPackagePath); + hdfsPath = uploadJarToHDFS(path.toFile()); + if (hdfsPath == null) { + System.out.println("Fail uploading JAR file \"" + path.toAbsolutePath().toString() + "\" to HDFS."); + return; + } + } + + // Set parameters + SamzaEngine.getEngine() + .setLocalMode(isLocal) + .setZooKeeper(zookeeper) + .setKafka(kafka) + .setYarnPackage(hdfsPath) + .setKafkaReplicationFactor(kafkaReplicationFactor) + .setConfigHome(yarnConfHome) + .setAMMemory(amMem) + .setContainerMemory(containerMem) + .setPiPerContainerRatio(piPerContainer) + .setKryoRegisterFile(kryoRegisterFile) + .setCheckpointFrequency(checkpointFrequency); + + // Submit topology + SamzaEngine.submitTopology((SamzaTopology) task.getTopology()); + + } + + private static boolean isLocalMode(String mode) { + return mode.equals(LOCAL_MODE); + } + + private static void parseArguments(List<String> args) { + for (int i = args.size() - 1; i >= 0; i--) { + String arg = args.get(i).trim(); + String[] splitted = arg.split("=", 2); + + if (splitted.length >= 2) { + // YARN config folder which contains conf/core-site.xml, + // conf/hdfs-site.xml, conf/yarn-site.xml + if (splitted[0].equals(YARN_CONF_FLAG)) { + yarnConfHome = splitted[1]; + args.remove(i); + } + // host:port for zookeeper cluster + else if (splitted[0].equals(ZK_FLAG)) { + zookeeper = splitted[1]; + args.remove(i); + } + // host:port,... for kafka broker(s) + else if (splitted[0].equals(KAFKA_FLAG)) { + kafka = splitted[1]; + args.remove(i); + } + // whether we are running Samza in Local mode or YARN mode + else if (splitted[0].equals(MODE_FLAG)) { + isLocal = isLocalMode(splitted[1]); + args.remove(i); + } + // memory requirement for YARN application master + else if (splitted[0].equals(AM_MEMORY_FLAG)) { + amMem = Integer.parseInt(splitted[1]); + args.remove(i); + } + // memory requirement for YARN worker container + else if (splitted[0].equals(CONTAINER_MEMORY_FLAG)) { + containerMem = Integer.parseInt(splitted[1]); + args.remove(i); + } + // the path to JAR archive that we need to upload to HDFS + else if (splitted[0].equals(JAR_PACKAGE_FLAG)) { + jarPackagePath = splitted[1]; + args.remove(i); + } + // the HDFS dir for SAMOA files + else if (splitted[0].equals(SAMOA_HDFS_DIR_FLAG)) { + samoaHDFSDir = splitted[1]; + if (samoaHDFSDir.length() < 1) + samoaHDFSDir = null; + args.remove(i); + } + // number of max PI instances per container + // this will be used to compute the number of containers + // AM will request for the job + else if (splitted[0].equals(PI_PER_CONTAINER_FLAG)) { + piPerContainer = Integer.parseInt(splitted[1]); + args.remove(i); + } + // kafka streams replication factor + else if (splitted[0].equals(KAFKA_REPLICATION_FLAG)) { + kafkaReplicationFactor = Integer.parseInt(splitted[1]); + args.remove(i); + } + // checkpoint frequency in ms + else if (splitted[0].equals(CHECKPOINT_FREQ_FLAG)) { + checkpointFrequency = Integer.parseInt(splitted[1]); + args.remove(i); + } + // the file contains registration information for Kryo serializer + else if (splitted[0].equals(KRYO_REGISTER_FLAG)) { + kryoRegisterFile = splitted[1]; + args.remove(i); + } + } + } + } + + private static String uploadJarToHDFS(File file) { + SystemsUtils.setHadoopConfigHome(yarnConfHome); + SystemsUtils.setSAMOADir(samoaHDFSDir); + return SystemsUtils.copyToHDFS(file, file.getName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamoaSystemFactory.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamoaSystemFactory.java b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamoaSystemFactory.java new file mode 100644 index 0000000..7d4f0a1 --- /dev/null +++ b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamoaSystemFactory.java @@ -0,0 +1,54 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.topology.impl.SamzaEntranceProcessingItem.SamoaSystemConsumer; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemProducer; +import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin; + +/** + * Implementation of Samza's SystemFactory Samza will use this factory to get our custom consumer which gets the events + * from SAMOA EntranceProcessor and feed them to EntranceProcessingItem task + * + * @author Anh Thu Vu + */ +public class SamoaSystemFactory implements SystemFactory { + @Override + public SystemAdmin getAdmin(String systemName, Config config) { + return new SinglePartitionWithoutOffsetsSystemAdmin(); + } + + @Override + public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { + return new SamoaSystemConsumer(systemName, config); + } + + @Override + public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { + throw new SamzaException("This implementation is not supposed to produce anything."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaComponentFactory.java b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaComponentFactory.java new file mode 100644 index 0000000..5eb9de9 --- /dev/null +++ b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaComponentFactory.java @@ -0,0 +1,62 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; + +/** + * Implementation of SAMOA ComponentFactory for Samza + * + * @author Anh Thu Vu + */ +public class SamzaComponentFactory implements ComponentFactory { + @Override + public ProcessingItem createPi(Processor processor) { + return this.createPi(processor, 1); + } + + @Override + public ProcessingItem createPi(Processor processor, int parallelism) { + return new SamzaProcessingItem(processor, parallelism); + } + + @Override + public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { + return new SamzaEntranceProcessingItem(entranceProcessor); + } + + @Override + public Stream createStream(IProcessingItem sourcePi) { + return new SamzaStream(sourcePi); + } + + @Override + public Topology createTopology(String topoName) { + return new SamzaTopology(topoName); + } +}
