Changes in tests (removing unused tests and classses, minor refactoring) Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/a5cda69c Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/a5cda69c Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/a5cda69c
Branch: refs/heads/master Commit: a5cda69cde10eade36ea192854143ea482314e8a Parents: 84c9487 Author: pwawrzyniak <[email protected]> Authored: Wed Jul 5 10:28:19 2017 +0200 Committer: nkourtellis <[email protected]> Committed: Fri Jul 21 21:12:18 2017 +0300 ---------------------------------------------------------------------- .../apache/samoa/streams/kafka/KafkaTask.java | 291 +++++------ .../samoa/streams/kafka/KafkaTaskTest.java | 153 ------ .../samoa/streams/kafka/KafkaUtilsTest.java | 490 ++++++++++--------- .../kafka/topology/SimpleComponentFactory.java | 53 -- .../streams/kafka/topology/SimpleEngine.java | 37 -- .../topology/SimpleEntranceProcessingItem.java | 33 -- .../kafka/topology/SimpleProcessingItem.java | 87 ---- .../streams/kafka/topology/SimpleStream.java | 95 ---- .../streams/kafka/topology/SimpleTopology.java | 46 -- 9 files changed, 394 insertions(+), 891 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java index 0c8f138..b3d638f 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java @@ -1,144 +1,147 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samoa.streams.kafka; - -/* - * #%L - * SAMOA - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Properties; - -import org.apache.samoa.tasks.Task; -import org.apache.samoa.topology.ComponentFactory; -import org.apache.samoa.topology.Stream; -import org.apache.samoa.topology.Topology; -import org.apache.samoa.topology.TopologyBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.javacliparser.Configurable; -import com.github.javacliparser.IntOption; -import com.github.javacliparser.StringOption; - -/** - * Kafka task - * - * @author Jakub Jankowski - * @version 0.5.0-incubating-SNAPSHOT - * @since 0.5.0-incubating - * - */ - -public class KafkaTask implements Task, Configurable { - - private static final long serialVersionUID = 3984474041982397855L; - private static Logger logger = LoggerFactory.getLogger(KafkaTask.class); - - //czy identyczne dla enterance i destination? - Properties producerProps; - Properties consumerProps; - int timeout; - private final KafkaDeserializer deserializer; - private final KafkaSerializer serializer; - private final String topic; - - private TopologyBuilder builder; - private Topology kafkaTopology; - - public IntOption kafkaParallelismOption = new IntOption("parallelismOption", 'p', - "Number of destination Processors", 1, 1, Integer.MAX_VALUE); - - public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", - "KafkaTask" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); - - /** - * Class constructor - * @param props Properties of Kafka Producer and Consumer - * @see <a href="http://kafka.apache.org/documentation/#producerconfigs">Kafka Producer configuration</a> - * @see <a href="http://kafka.apache.org/documentation/#consumerconfigs">Kafka Consumer configuration</a> - * @param topic Topic to which destination processor will write into - * @param timeout Timeout used when polling Kafka for new messages - * @param serializer Implementation of KafkaSerializer that handles arriving data serialization - * @param serializer Implementation of KafkaDeserializer that handles arriving data deserialization - */ - public KafkaTask(Properties producerProps, Properties consumerProps, String topic, int timeout, KafkaSerializer serializer, KafkaDeserializer deserializer) { - this.producerProps = producerProps; - this.consumerProps = consumerProps; - this.deserializer = deserializer; - this.serializer = serializer; - this.topic = topic; - this.timeout = timeout; - } - - @Override - public void init() { - logger.info("Invoking init"); - if (builder == null) { - builder = new TopologyBuilder(); - logger.info("Successfully instantiating TopologyBuilder"); - - builder.initTopology(evaluationNameOption.getValue()); - logger.info("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); - } - - // create enterance processor - KafkaEntranceProcessor sourceProcessor = new KafkaEntranceProcessor(consumerProps, topic, timeout, deserializer); - builder.addEntranceProcessor(sourceProcessor); - - // create stream - Stream stream = builder.createStream(sourceProcessor); - - // create destination processor - KafkaDestinationProcessor destProcessor = new KafkaDestinationProcessor(producerProps, topic, serializer); - builder.addProcessor(destProcessor, kafkaParallelismOption.getValue()); - builder.connectInputShuffleStream(stream, destProcessor); - - // build topology - kafkaTopology = builder.build(); - logger.info("Successfully built the topology"); - } - - @Override - public Topology getTopology() { - return kafkaTopology; - } - - @Override - public void setFactory(ComponentFactory factory) { - logger.info("Invoking setFactory: "+factory.toString()); - builder = new TopologyBuilder(factory); - logger.info("Successfully instantiating TopologyBuilder"); - - builder.initTopology(evaluationNameOption.getValue()); - logger.info("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); - - } - -} +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Properties; + +import org.apache.samoa.tasks.Task; +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; +import org.apache.samoa.topology.TopologyBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.Configurable; +import com.github.javacliparser.IntOption; +import com.github.javacliparser.StringOption; + +/** + * Kafka task + * + * @author Jakub Jankowski + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + * + */ + +public class KafkaTask implements Task, Configurable { + + private static final long serialVersionUID = 3984474041982397855L; + private static Logger logger = LoggerFactory.getLogger(KafkaTask.class); + + //czy identyczne dla enterance i destination? + Properties producerProps; + Properties consumerProps; + int timeout; + private final KafkaDeserializer deserializer; + private final KafkaSerializer serializer; + + private TopologyBuilder builder; + private Topology kafkaTopology; + + public IntOption kafkaParallelismOption = new IntOption("parallelismOption", 'p', + "Number of destination Processors", 1, 1, Integer.MAX_VALUE); + + public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", + "KafkaTask" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); + + private final String inTopic; + private final String outTopic; + + /** + * Class constructor + * @param props Properties of Kafka Producer and Consumer + * @see <a href="http://kafka.apache.org/documentation/#producerconfigs">Kafka Producer configuration</a> + * @see <a href="http://kafka.apache.org/documentation/#consumerconfigs">Kafka Consumer configuration</a> + * @param topic Topic to which destination processor will write into + * @param timeout Timeout used when polling Kafka for new messages + * @param serializer Implementation of KafkaSerializer that handles arriving data serialization + * @param serializer Implementation of KafkaDeserializer that handles arriving data deserialization + */ + public KafkaTask(Properties producerProps, Properties consumerProps, String inTopic, String outTopic, int timeout, KafkaSerializer serializer, KafkaDeserializer deserializer) { + this.producerProps = producerProps; + this.consumerProps = consumerProps; + this.deserializer = deserializer; + this.serializer = serializer; + this.inTopic = inTopic; + this.outTopic = outTopic; + this.timeout = timeout; + } + + @Override + public void init() { + logger.info("Invoking init"); + if (builder == null) { + builder = new TopologyBuilder(); + logger.info("Successfully instantiating TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue()); + logger.info("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + } + + // create enterance processor + KafkaEntranceProcessor sourceProcessor = new KafkaEntranceProcessor(consumerProps, inTopic, timeout, deserializer); + builder.addEntranceProcessor(sourceProcessor); + + // create stream + Stream stream = builder.createStream(sourceProcessor); + + // create destination processor + KafkaDestinationProcessor destProcessor = new KafkaDestinationProcessor(producerProps, outTopic, serializer); + builder.addProcessor(destProcessor, kafkaParallelismOption.getValue()); + builder.connectInputShuffleStream(stream, destProcessor); + + // build topology + kafkaTopology = builder.build(); + logger.info("Successfully built the topology"); + } + + @Override + public Topology getTopology() { + return kafkaTopology; + } + + @Override + public void setFactory(ComponentFactory factory) { + logger.info("Invoking setFactory: "+factory.toString()); + builder = new TopologyBuilder(factory); + logger.info("Successfully instantiating TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue()); + logger.info("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java deleted file mode 100644 index adecac1..0000000 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samoa.streams.kafka; - -import java.io.IOException; -import java.util.Properties; -import java.util.Random; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.logging.Level; -import java.util.logging.Logger; - - -import org.I0Itec.zkclient.ZkClient; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import kafka.server.KafkaServer; -import kafka.zk.EmbeddedZookeeper; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.samoa.instances.InstancesHeader; -import org.apache.samoa.streams.kafka.topology.SimpleComponentFactory; -import org.apache.samoa.streams.kafka.topology.SimpleEngine; - -/* - * #%L - * SAMOA - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -/** - * - * @author Jakub Jankowski - */ -@Ignore -public class KafkaTaskTest { - - private static final String ZKHOST = "127.0.0.1";//10.255.251.202"; //10.255.251.202 - private static final String BROKERHOST = "127.0.0.1";//"10.255.251.214"; //10.255.251.214 - private static final String BROKERPORT = "9092"; //6667, local: 9092 - private static final String TOPIC = "samoa_test"; //samoa_test, local: test - private static final int NUM_INSTANCES = 125922; - - private static KafkaServer kafkaServer; - private static EmbeddedZookeeper zkServer; - private static ZkClient zkClient; - private static String zkConnect; - - @BeforeClass - public static void setUpClass() throws IOException { - // setup Zookeeper -// zkServer = new EmbeddedZookeeper(); -// zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port(); -// zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); -// ZkUtils zkUtils = ZkUtils.apply(zkClient, false); - - // setup Broker - /*Properties brokerProps = new Properties(); - brokerProps.setProperty("zookeeper.connect", zkConnect); - brokerProps.setProperty("broker.id", "0"); - brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); - brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); - KafkaConfig config = new KafkaConfig(brokerProps); - Time mock = new MockTime(); - kafkaServer = TestUtils.createServer(config, mock);*/ - // create topic - //AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); - } - - @AfterClass - public static void tearDownClass() { - //kafkaServer.shutdown(); -// zkClient.close(); -// zkServer.shutdown(); - } - - @Before - public void setUp() throws IOException { - - } - - @After - public void tearDown() { - - } - - @Test - public void testKafkaTask() throws InterruptedException, ExecutionException, TimeoutException { - Logger logger = Logger.getLogger(KafkaTaskTest.class.getName()); - logger.log(Level.INFO, "KafkaTask"); - Properties producerProps = TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT); - Properties consumerProps = TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT); - - KafkaTask task = new KafkaTask(producerProps, consumerProps, "kafkaTaskTest", 10000, new OosTestSerializer(), new OosTestSerializer()); - task.setFactory(new SimpleComponentFactory()); - task.init(); - SimpleEngine.submitTopology(task.getTopology()); - - Thread th = new Thread(new Runnable() { - @Override - public void run() { - KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT)); - - Random r = new Random(); - InstancesHeader header = TestUtilsForKafka.generateHeader(10); - OosTestSerializer serializer = new OosTestSerializer(); - int i = 0; - for (i = 0; i < NUM_INSTANCES; i++) { - try { - ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC, serializer.serialize(TestUtilsForKafka.getData(r, 10, header))); - long stat = producer.send(record).get(10, TimeUnit.DAYS).offset(); -// Thread.sleep(5); - Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, "Sent message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat}); - } catch (InterruptedException | ExecutionException | TimeoutException ex) { - Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); - } - } - producer.flush(); - producer.close(); - } - }); - th.start(); - - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java index 5dc4542..186d97b 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java @@ -1,243 +1,247 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samoa.streams.kafka; - -/* - * #%L - * SAMOA - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -import com.google.gson.Gson; -import java.io.IOException; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import java.util.Random; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; -import java.util.logging.Level; -import java.util.logging.Logger; -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.utils.MockTime; -import kafka.utils.TestUtils; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; -import kafka.zk.EmbeddedZookeeper; -import org.I0Itec.zkclient.ZkClient; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.utils.Time; -import org.apache.samoa.instances.InstancesHeader; -import org.junit.After; -import org.junit.AfterClass; -import static org.junit.Assert.*; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * - * @author pwawrzyniak - */ -public class KafkaUtilsTest { - - private static final String ZKHOST = "127.0.0.1"; - private static final String BROKERHOST = "127.0.0.1"; - private static final String BROKERPORT = "9092"; - private static final String TOPIC_R = "test-r"; - private static final String TOPIC_S = "test-s"; - private static final int NUM_INSTANCES = 50; - - private static KafkaServer kafkaServer; - private static EmbeddedZookeeper zkServer; - private static ZkClient zkClient; - private static String zkConnect; - - private static final Logger logger = Logger.getLogger(KafkaUtilsTest.class.getCanonicalName()); - private final long CONSUMER_TIMEOUT = 1500; - - public KafkaUtilsTest() { - } - - @BeforeClass - public static void setUpClass() throws IOException { - // setup Zookeeper - zkServer = new EmbeddedZookeeper(); - zkConnect = ZKHOST + ":" + zkServer.port(); - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); - ZkUtils zkUtils = ZkUtils.apply(zkClient, false); - - // setup Broker - Properties brokerProps = new Properties(); - brokerProps.setProperty("zookeeper.connect", zkConnect); - brokerProps.setProperty("broker.id", "0"); - brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafkaUtils-").toAbsolutePath().toString()); - brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); - KafkaConfig config = new KafkaConfig(brokerProps); - Time mock = new MockTime(); - kafkaServer = TestUtils.createServer(config, mock); - - // create topics - AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); - AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); - - } - - @AfterClass - public static void tearDownClass() { - kafkaServer.shutdown(); - zkClient.close(); - zkServer.shutdown(); - } - - @Before - public void setUp() { - } - - @After - public void tearDown() { - } - - /** - * Test of initializeConsumer method, of class KafkaUtils. - */ - @Test - public void testInitializeConsumer() throws Exception { - logger.log(Level.INFO, "initializeConsumer"); - Collection<String> topics = Arrays.asList(TOPIC_R); - KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT), CONSUMER_TIMEOUT); - assertNotNull(instance); - - instance.initializeConsumer(topics); - Thread.sleep(1000); - instance.closeConsumer(); - - Thread.sleep(CONSUMER_TIMEOUT); - - instance.initializeConsumer(topics); - Thread.sleep(1000); - instance.closeConsumer(); - assertTrue(true); - } - - /** - * Test of getKafkaMessages method, of class KafkaUtils. - */ - @Test - public void testGetKafkaMessages() throws Exception { - logger.log(Level.INFO, "getKafkaMessages"); - Collection<String> topics = Arrays.asList(TOPIC_R); - KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT), CONSUMER_TIMEOUT); - assertNotNull(instance); - - logger.log(Level.INFO, "Initialising consumer"); - instance.initializeConsumer(topics); - - logger.log(Level.INFO, "Produce data"); - List expResult = sendAndGetMessages(NUM_INSTANCES); - - logger.log(Level.INFO, "Wait a moment"); - Thread.sleep(CONSUMER_TIMEOUT); - - logger.log(Level.INFO, "Get results from Kafka"); - List<byte[]> result = instance.getKafkaMessages(); - - assertArrayEquals(expResult.toArray(), result.toArray()); - instance.closeConsumer(); - } - - private List<byte[]> sendAndGetMessages(int maxNum) throws InterruptedException, ExecutionException, TimeoutException { - List<byte[]> ret; - try (KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties("sendM-test",BROKERHOST,BROKERPORT))) { - ret = new ArrayList<>(); - Random r = new Random(); - InstancesHeader header = TestUtilsForKafka.generateHeader(10); - Gson gson = new Gson(); - int i = 0; - for (i = 0; i < maxNum; i++) { - ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC_R, gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes()); - ret.add(record.value()); - producer.send(record); - } - producer.flush(); - } - return ret; - } - - /** - * Test of sendKafkaMessage method, of class KafkaUtils. - * - * @throws java.lang.InterruptedException - */ - @Test - public void testSendKafkaMessage() throws InterruptedException { - logger.log(Level.INFO, "sendKafkaMessage"); - - logger.log(Level.INFO, "Initialising producer"); - KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), TestUtilsForKafka.getProducerProperties("rcv-test", BROKERHOST,BROKERPORT), CONSUMER_TIMEOUT); - instance.initializeProducer(); - - logger.log(Level.INFO, "Initialising consumer"); - KafkaConsumer<String, byte[]> consumer; - consumer = new KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT)); - consumer.subscribe(Arrays.asList(TOPIC_S)); - - logger.log(Level.INFO, "Produce data"); - List<byte[]> sent = new ArrayList<>(); - Random r = new Random(); - InstancesHeader header = TestUtilsForKafka.generateHeader(10); - Gson gson = new Gson(); - for (int i = 0; i < NUM_INSTANCES; i++) { - byte[] val = gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes(); - sent.add(val); - instance.sendKafkaMessage(TOPIC_S, val); - } - // wait for Kafka a bit :) - Thread.sleep(2*CONSUMER_TIMEOUT); - - logger.log(Level.INFO, "Get results from Kafka"); - ConsumerRecords<String, byte[]> records = consumer.poll(CONSUMER_TIMEOUT); - Iterator<ConsumerRecord<String, byte[]>> it = records.iterator(); - List<byte[]> consumed = new ArrayList<>(); - while (it.hasNext()) { - consumed.add(it.next().value()); - } - consumer.close(); - - assertArrayEquals(sent.toArray(), consumed.toArray()); - } - -} +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import com.google.gson.Gson; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.utils.Time; +import org.apache.samoa.instances.InstancesHeader; +import org.junit.After; +import org.junit.AfterClass; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * + * @author pwawrzyniak + */ +public class KafkaUtilsTest { + + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private static final String TOPIC_R = "test-r"; + private static final String TOPIC_S = "test-s"; + private static final int NUM_INSTANCES = 50; + + private static KafkaServer kafkaServer; + private static EmbeddedZookeeper zkServer; + private static ZkClient zkClient; + private static String zkConnect; + + private static final Logger logger = Logger.getLogger(KafkaUtilsTest.class.getCanonicalName()); + private final long CONSUMER_TIMEOUT = 1500; + + public KafkaUtilsTest() { + } + + @BeforeClass + public static void setUpClass() throws IOException { + // setup Zookeeper + zkServer = new EmbeddedZookeeper(); + zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafkaUtils-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + + // create topics + AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + + } + + @AfterClass + public static void tearDownClass() { + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } + + @Before + public void setUp() { + } + + @After + public void tearDown() { + } + + /** + * Test of initializeConsumer method, of class KafkaUtils. + */ + @Test + public void testInitializeConsumer() throws Exception { + logger.log(Level.INFO, "initializeConsumer"); + Collection<String> topics = Arrays.asList(TOPIC_R); + KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT), TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT), CONSUMER_TIMEOUT); + assertNotNull(instance); + + instance.initializeConsumer(topics); + Thread.sleep(1000); + instance.closeConsumer(); + + Thread.sleep(CONSUMER_TIMEOUT); + + instance.initializeConsumer(topics); + Thread.sleep(1000); + instance.closeConsumer(); + assertTrue(true); + } + + /** + * Test of getKafkaMessages method, of class KafkaUtils. + */ + @Test + public void testGetKafkaMessages() throws Exception { + logger.log(Level.INFO, "getKafkaMessages"); + Collection<String> topics = Arrays.asList(TOPIC_R); + KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT), TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT), CONSUMER_TIMEOUT); + assertNotNull(instance); + + logger.log(Level.INFO, "Initialising consumer"); + instance.initializeConsumer(topics); + + logger.log(Level.INFO, "Produce data"); + List expResult = sendAndGetMessages(NUM_INSTANCES); + + logger.log(Level.INFO, "Wait a moment"); + Thread.sleep(CONSUMER_TIMEOUT); + + logger.log(Level.INFO, "Get results from Kafka"); + List<byte[]> result = instance.getKafkaMessages(); + + assertArrayEquals(expResult.toArray(), result.toArray()); + instance.closeConsumer(); + } + + private List<byte[]> sendAndGetMessages(int maxNum) throws InterruptedException, ExecutionException, TimeoutException { + List<byte[]> ret; + try (KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties("sendM-test", BROKERHOST, BROKERPORT))) { + ret = new ArrayList<>(); + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + Gson gson = new Gson(); + int i = 0; + for (i = 0; i < maxNum; i++) { + ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC_R, gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes()); + ret.add(record.value()); + producer.send(record); + } + producer.flush(); + } + return ret; + } + + /** + * Test of sendKafkaMessage method, of class KafkaUtils. + * + * @throws java.lang.InterruptedException + */ + @Test + public void testSendKafkaMessage() throws InterruptedException { + logger.log(Level.INFO, "sendKafkaMessage"); + + logger.log(Level.INFO, "Initialising producer"); + KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT), TestUtilsForKafka.getProducerProperties("rcv-test", BROKERHOST, BROKERPORT), CONSUMER_TIMEOUT); + instance.initializeProducer(); + + logger.log(Level.INFO, "Initialising consumer"); + KafkaConsumer<String, byte[]> consumer; + consumer = new KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT)); + consumer.subscribe(Arrays.asList(TOPIC_S)); + + logger.log(Level.INFO, "Produce data"); + List<byte[]> sent = new ArrayList<>(); + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + Gson gson = new Gson(); + for (int i = 0; i < NUM_INSTANCES; i++) { + byte[] val = gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes(); + sent.add(val); + instance.sendKafkaMessage(TOPIC_S, val); + } + // wait for Kafka a bit :) + Thread.sleep(2 * CONSUMER_TIMEOUT); + + logger.log(Level.INFO, "Get results from Kafka"); + + List<byte[]> consumed = new ArrayList<>(); + + while (consumed.size() != sent.size()) { + ConsumerRecords<String, byte[]> records = consumer.poll(CONSUMER_TIMEOUT); + Iterator<ConsumerRecord<String, byte[]>> it = records.iterator(); + while (it.hasNext()) { + consumed.add(it.next().value()); + } + } + consumer.close(); + + assertArrayEquals(sent.toArray(), consumed.toArray()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java deleted file mode 100644 index 202833e..0000000 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java +++ /dev/null @@ -1,53 +0,0 @@ -package org.apache.samoa.streams.kafka.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.samoa.core.EntranceProcessor; -import org.apache.samoa.core.Processor; -import org.apache.samoa.topology.ComponentFactory; -import org.apache.samoa.topology.EntranceProcessingItem; -import org.apache.samoa.topology.IProcessingItem; -import org.apache.samoa.topology.ProcessingItem; -import org.apache.samoa.topology.Stream; -import org.apache.samoa.topology.Topology; - -public class SimpleComponentFactory implements ComponentFactory { - - public ProcessingItem createPi(Processor processor, int paralellism) { - return new SimpleProcessingItem(processor, paralellism); - } - - public ProcessingItem createPi(Processor processor) { - return this.createPi(processor, 1); - } - - public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { - return new SimpleEntranceProcessingItem(processor); - } - - public Stream createStream(IProcessingItem sourcePi) { - return new SimpleStream(sourcePi); - } - - public Topology createTopology(String topoName) { - return new SimpleTopology(topoName); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java deleted file mode 100644 index 338444b..0000000 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * To change this template, choose Tools | Templates - * and open the template in the editor. - */ -package org.apache.samoa.streams.kafka.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.samoa.topology.Topology; - -public class SimpleEngine { - - public static void submitTopology(Topology topology) { - SimpleTopology simpleTopology = (SimpleTopology) topology; - simpleTopology.run(); - // runs until completion - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java deleted file mode 100644 index 26ed471..0000000 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.apache.samoa.streams.kafka.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.samoa.core.EntranceProcessor; -import org.apache.samoa.topology.LocalEntranceProcessingItem; - -class SimpleEntranceProcessingItem extends LocalEntranceProcessingItem { - public SimpleEntranceProcessingItem(EntranceProcessor processor) { - super(processor); - } - - // The default waiting time when there is no available events is 100ms - // Override waitForNewEvents() to change it -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java deleted file mode 100644 index bac0398..0000000 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * To change this template, choose Tools | Templates - * and open the template in the editor. - */ -package org.apache.samoa.streams.kafka.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.samoa.core.ContentEvent; -import org.apache.samoa.core.Processor; -import org.apache.samoa.topology.AbstractProcessingItem; -import org.apache.samoa.topology.IProcessingItem; -import org.apache.samoa.topology.ProcessingItem; -import org.apache.samoa.topology.Stream; -import org.apache.samoa.utils.PartitioningScheme; -import org.apache.samoa.utils.StreamDestination; - -/** - * - * @author abifet - */ -class SimpleProcessingItem extends AbstractProcessingItem { - private IProcessingItem[] arrayProcessingItem; - - SimpleProcessingItem(Processor processor) { - super(processor); - } - - SimpleProcessingItem(Processor processor, int parallelism) { - super(processor); - this.setParallelism(parallelism); - } - - public IProcessingItem getProcessingItem(int i) { - return arrayProcessingItem[i]; - } - - @Override - protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { - StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme); - ((SimpleStream) inputStream).addDestination(destination); - return this; - } - - public SimpleProcessingItem copy() { - Processor processor = this.getProcessor(); - return new SimpleProcessingItem(processor.newProcessor(processor)); - } - - public void processEvent(ContentEvent event, int counter) { - - int parallelism = this.getParallelism(); - // System.out.println("Process event "+event+" (isLast="+event.isLastEvent()+") with counter="+counter+" while parallelism="+parallelism); - if (this.arrayProcessingItem == null && parallelism > 0) { - // Init processing elements, the first time they are needed - this.arrayProcessingItem = new IProcessingItem[parallelism]; - for (int j = 0; j < parallelism; j++) { - arrayProcessingItem[j] = this.copy(); - arrayProcessingItem[j].getProcessor().onCreate(j); - } - } - if (this.arrayProcessingItem != null) { - IProcessingItem pi = this.getProcessingItem(counter); - Processor p = pi.getProcessor(); - // System.out.println("PI="+pi+", p="+p); - this.getProcessingItem(counter).getProcessor().process(event); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java deleted file mode 100644 index 8405463..0000000 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * To change this template, choose Tools | Templates - * and open the template in the editor. - */ -package org.apache.samoa.streams.kafka.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.LinkedList; -import java.util.List; - -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.samoa.core.ContentEvent; -import org.apache.samoa.topology.AbstractStream; -import org.apache.samoa.topology.IProcessingItem; -import org.apache.samoa.utils.StreamDestination; - -/** - * - * @author abifet - */ -class SimpleStream extends AbstractStream { - private List<StreamDestination> destinations; - private int maxCounter; - private int eventCounter; - - SimpleStream(IProcessingItem sourcePi) { - super(sourcePi); - this.destinations = new LinkedList<>(); - this.eventCounter = 0; - this.maxCounter = 1; - } - - private int getNextCounter() { - if (maxCounter > 0 && eventCounter >= maxCounter) - eventCounter = 0; - this.eventCounter++; - return this.eventCounter; - } - - @Override - public void put(ContentEvent event) { - this.put(event, this.getNextCounter()); - } - - private void put(ContentEvent event, int counter) { - SimpleProcessingItem pi; - int parallelism; - for (StreamDestination destination : destinations) { - pi = (SimpleProcessingItem) destination.getProcessingItem(); - parallelism = destination.getParallelism(); - switch (destination.getPartitioningScheme()) { - case SHUFFLE: - pi.processEvent(event, counter % parallelism); - break; - case GROUP_BY_KEY: - HashCodeBuilder hb = new HashCodeBuilder(); - hb.append(event.getKey()); - int key = hb.build() % parallelism; - pi.processEvent(event, key); - break; - case BROADCAST: - for (int p = 0; p < parallelism; p++) { - pi.processEvent(event, p); - } - break; - } - } - } - - public void addDestination(StreamDestination destination) { - this.destinations.add(destination); - if (maxCounter <= 0) - maxCounter = 1; - maxCounter *= destination.getParallelism(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java deleted file mode 100644 index d298b69..0000000 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * To change this template, choose Tools | Templates - * and open the template in the editor. - */ -package org.apache.samoa.streams.kafka.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.samoa.topology.AbstractTopology; - -public class SimpleTopology extends AbstractTopology { - SimpleTopology(String name) { - super(name); - } - - public void run() { - if (this.getEntranceProcessingItems() == null) - throw new IllegalStateException("You need to set entrance PI before running the topology."); - if (this.getEntranceProcessingItems().size() != 1) - throw new IllegalStateException("SimpleTopology supports 1 entrance PI only. Number of entrance PIs is " - + this.getEntranceProcessingItems().size()); - - SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) this.getEntranceProcessingItems() - .toArray()[0]; - entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple mode - entrancePi.startSendingEvents(); - } -}
