Rebuild of Consumer-related classes (added separate thread to read from Kafka, blocking until sth received) Major cleanup in tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/81d29599 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/81d29599 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/81d29599 Branch: refs/heads/master Commit: 81d295995722161b8b86e7c6b4f9298cd5665d8a Parents: be32e18 Author: pwawrzyniak <[email protected]> Authored: Fri May 12 17:17:27 2017 +0200 Committer: nkourtellis <[email protected]> Committed: Fri Jul 21 21:12:18 2017 +0300 ---------------------------------------------------------------------- .../streams/kafka/KafkaConsumerThread.java | 157 +++++++++++++++++++ .../streams/kafka/KafkaEntranceProcessor.java | 2 +- .../apache/samoa/streams/kafka/KafkaUtils.java | 47 ++---- .../kafka/KafkaDestinationProcessorTest.java | 47 ++++-- .../kafka/KafkaEntranceProcessorTest.java | 151 +++++++----------- .../samoa/streams/kafka/KafkaTaskTest.java | 56 +++++-- .../samoa/streams/kafka/KafkaUtilsTest.java | 64 ++++---- .../samoa/streams/kafka/TestUtilsForKafka.java | 22 ++- 8 files changed, 349 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/81d29599/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java new file mode 100644 index 0000000..6522f67 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java @@ -0,0 +1,157 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +/** + * + * @author pwawrzyniak <your.name at your.org> + */ +class KafkaConsumerThread extends Thread { + + // Consumer class for internal use to retrieve messages from Kafka + private transient KafkaConsumer<String, byte[]> consumer; + + private Logger log = Logger.getLogger(KafkaConsumerThread.class.getName()); + + private final Properties consumerProperties; + private final Collection<String> topics; + private final long consumerTimeout; + private final List<byte[]> buffer; + // used to synchronize things + private final Object lock; + private boolean running; + + /** + * Class constructor + * + * @param consumerProperties Properties of Consumer + * @param topics Topics to fetch (subscribe) + * @param consumerTimeout Timeout for data polling + */ + KafkaConsumerThread(Properties consumerProperties, Collection<String> topics, long consumerTimeout) { + this.running = false; + this.consumerProperties = consumerProperties; + this.topics = topics; + this.consumerTimeout = consumerTimeout; + this.buffer = new ArrayList<>(); + lock = new Object(); + } + + @Override + public void run() { + + initializeConsumer(); + + while (running) { + fetchDataFromKafka(); + } + + cleanUp(); + } + + /** + * Method for fetching data from Apache Kafka. It takes care of received + * data + */ + private void fetchDataFromKafka() { + if (consumer != null) { + if (!consumer.subscription().isEmpty()) { + try { + List<byte[]> kafkaMsg = getMessagesBytes(consumer.poll(consumerTimeout)); + fillBufferAndNotifyWaits(kafkaMsg); + } catch (Throwable t) { + Logger.getLogger(KafkaConsumerThread.class.getName()).log(Level.SEVERE, null, t); + } + } + } + } + + /** + * Copies received messages to class buffer and notifies Processor to grab + * the data. + * + * @param kafkaMsg Messages received from Kafka + */ + private void fillBufferAndNotifyWaits(List<byte[]> kafkaMsg) { + synchronized (lock) { + buffer.addAll(kafkaMsg); + if (buffer.size() > 0) { + lock.notifyAll(); + } + } + } + + private void cleanUp() { + // clean resources + if (consumer != null) { + consumer.unsubscribe(); + consumer.close(); + } + } + + private void initializeConsumer() { + // lazy instantiation + log.log(Level.INFO, "Instantiating Kafka consumer"); + if (consumer == null) { + consumer = new KafkaConsumer<>(consumerProperties); + running = true; + } + consumer.subscribe(topics); + } + + private List<byte[]> getMessagesBytes(ConsumerRecords<String, byte[]> poll) { + Iterator<ConsumerRecord<String, byte[]>> iterator = poll.iterator(); + List<byte[]> ret = new ArrayList<>(); + while (iterator.hasNext()) { + ret.add(iterator.next().value()); + } + return ret; + } + + void close() { + running = false; + } + + List<byte[]> getKafkaMessages() { + synchronized (lock) { + if (buffer.isEmpty()) { + try { + // block the call until new messages are received + lock.wait(); + } catch (InterruptedException ex) { + Logger.getLogger(KafkaConsumerThread.class.getName()).log(Level.SEVERE, null, ex); + } + } + ArrayList<byte[]> ret = new ArrayList<>(); + // copy buffer to return list + ret.addAll(buffer); + // clear message buffer + buffer.clear(); + return ret; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/81d29599/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java index 2b0b808..7079c58 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java @@ -104,7 +104,7 @@ public class KafkaEntranceProcessor implements EntranceProcessor { @Override public ContentEvent nextEvent() { - // assume this will never be called when buffer is empty! + // assume this will never be called when buffer is empty! return this.deserializer.deserialize(buffer.remove(buffer.size() - 1)); } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/81d29599/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java index 0635877..75b5402 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java @@ -61,8 +61,7 @@ import java.util.logging.Logger; */ class KafkaUtils { - // Consumer class for internal use to retrieve messages from Kafka - private transient KafkaConsumer<String, byte[]> consumer; + private transient KafkaConsumerThread kafkaConsumerThread; private transient KafkaProducer<String, byte[]> producer; @@ -72,6 +71,7 @@ class KafkaUtils { // Timeout for Kafka Consumer private long consumerTimeout; + /** * Class constructor @@ -86,6 +86,10 @@ class KafkaUtils { this.consumerTimeout = consumerTimeout; } + /** + * Creates new KafkaUtils from existing instance + * @param kafkaUtils Instance of KafkaUtils + */ KafkaUtils(KafkaUtils kafkaUtils) { this.consumerProperties = kafkaUtils.consumerProperties; this.producerProperties = kafkaUtils.producerProperties; @@ -93,25 +97,18 @@ class KafkaUtils { } /** - * Method used to initialize Kafka Consumer, i.e. instantiate it and + * Method used to initialize Kafka Consumer Thread, i.e. instantiate it and * subscribe to configured topic * * @param topics List of Kafka topics that consumer should subscribe to */ - public void initializeConsumer(Collection<String> topics) { - // lazy instantiation - if (consumer == null) { - consumer = new KafkaConsumer<>(consumerProperties); - } - consumer.subscribe(topics); -// consumer.seekToBeginning(consumer.assignment()); + public void initializeConsumer(Collection<String> topics) { + kafkaConsumerThread = new KafkaConsumerThread(consumerProperties, topics, consumerTimeout); + kafkaConsumerThread.start(); } public void closeConsumer() { - if (consumer != null) { - consumer.unsubscribe(); - consumer.close(); - } + kafkaConsumerThread.close(); } public void initializeProducer() { @@ -135,27 +132,7 @@ class KafkaUtils { * or is not subscribed to any topic. */ public List<byte[]> getKafkaMessages() throws Exception { - - if (consumer != null) { - if (!consumer.subscription().isEmpty()) { - return getMessagesBytes(consumer.poll(consumerTimeout)); - } else { - // TODO: do it more elegant way - throw new Exception("Consumer subscribed to no topics!"); - } - } else { - // TODO: do more elegant way - throw new Exception("Consumer not initialised"); - } - } - - private List<byte[]> getMessagesBytes(ConsumerRecords<String, byte[]> poll) { - Iterator<ConsumerRecord<String, byte[]>> iterator = poll.iterator(); - List<byte[]> ret = new ArrayList<>(); - while (iterator.hasNext()) { - ret.add(iterator.next().value()); - } - return ret; + return kafkaConsumerThread.getKafkaMessages(); } public long sendKafkaMessage(String topic, byte[] message) { http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/81d29599/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java index a138763..bf45ffb 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java @@ -15,6 +15,25 @@ */ package org.apache.samoa.streams.kafka; +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; @@ -51,7 +70,7 @@ import org.junit.Test; /** * - * @author pwawrzyniak <your.name at your.org> + * @author pwawrzyniak */ public class KafkaDestinationProcessorTest { @@ -59,7 +78,7 @@ public class KafkaDestinationProcessorTest { private static final String BROKERHOST = "127.0.0.1"; private static final String BROKERPORT = "9092"; private static final String TOPIC = "test-kdp"; - private static final int NUM_INSTANCES = 500; + private static final int NUM_INSTANCES = 11111; private static final int CONSUMER_TIMEOUT = 1000; private static KafkaServer kafkaServer; @@ -94,7 +113,7 @@ public class KafkaDestinationProcessorTest { } @AfterClass - public static void tearDownClass() { + public static void tearDownClass() { kafkaServer.shutdown(); zkClient.close(); zkServer.shutdown(); @@ -114,34 +133,28 @@ public class KafkaDestinationProcessorTest { public void testSendingData() throws InterruptedException, ExecutionException, TimeoutException { final Logger logger = Logger.getLogger(KafkaDestinationProcessorTest.class.getName()); - Properties props = TestUtilsForKafka.getProducerProperties(); + Properties props = TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT); props.setProperty("auto.offset.reset", "earliest"); KafkaDestinationProcessor kdp = new KafkaDestinationProcessor(props, TOPIC, new KafkaJsonMapper(Charset.defaultCharset())); kdp.onCreate(1); final int[] i = {0}; - -// prepare new thread for data receiveing + + // prepare new thread for data receiveing Thread th = new Thread(new Runnable() { @Override public void run() { - KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties()); + KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT)); consumer.subscribe(Arrays.asList(TOPIC)); while (i[0] < NUM_INSTANCES) { try { ConsumerRecords<String, byte[]> cr = consumer.poll(CONSUMER_TIMEOUT); - Iterator<ConsumerRecord<String, byte[]>> it = cr.iterator(); while (it.hasNext()) { ConsumerRecord<String, byte[]> record = it.next(); - logger.info(new String(record.value())); - logger.log(Level.INFO, "Current read offset is: {0}", record.offset()); i[0]++; } - - Thread.sleep(1); - - } catch (InterruptedException ex) { + } catch (Exception ex) { Logger.getLogger(KafkaDestinationProcessorTest.class.getName()).log(Level.SEVERE, null, ex); } } @@ -157,11 +170,11 @@ public class KafkaDestinationProcessorTest { for (z = 0; z < NUM_INSTANCES; z++) { InstanceContentEvent event = TestUtilsForKafka.getData(r, 10, header); kdp.process(event); - logger.log(Level.INFO, "{0} {1}", new Object[]{"Sent item with id: ", z}); - Thread.sleep(5); +// logger.log(Level.INFO, "{0} {1}", new Object[]{"Sent item with id: ", z}); } + // wait for all instances to be read - Thread.sleep(100); + Thread.sleep(2 * CONSUMER_TIMEOUT); assertEquals("Number of sent and received instances", z, i[0]); } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/81d29599/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java index bc2a11e..009a6a7 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java @@ -38,7 +38,6 @@ import com.google.gson.Gson; import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; -import java.util.ArrayList; import java.util.Properties; import java.util.Random; import java.util.concurrent.ExecutionException; @@ -46,17 +45,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; -import mockit.Mocked; -import mockit.Tested; -import mockit.Expectations; -import org.apache.samoa.core.ContentEvent; -import org.apache.samoa.core.Processor; import org.apache.samoa.learners.InstanceContentEvent; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import static org.junit.Assert.*; import kafka.admin.AdminUtils; @@ -72,36 +65,26 @@ import kafka.zk.EmbeddedZookeeper; import org.I0Itec.zkclient.ZkClient; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.samoa.instances.Attribute; -import org.apache.samoa.instances.DenseInstance; -import org.apache.samoa.instances.Instance; -import org.apache.samoa.instances.Instances; import org.apache.samoa.instances.InstancesHeader; -import org.apache.samoa.moa.core.FastVector; -import org.apache.samoa.moa.core.InstanceExample; -import org.apache.samoa.streams.InstanceStream; /** * * @author pwawrzyniak */ -//@Ignore public class KafkaEntranceProcessorTest { -// @Tested -// private KafkaEntranceProcessor kep; - private static final String ZKHOST = "10.255.251.202"; //10.255.251.202 - private static final String BROKERHOST = "10.255.251.214"; //10.255.251.214 - private static final String BROKERPORT = "6667"; //6667, local: 9092 - private static final String TOPIC = "samoa_test"; //samoa_test, local: test - private static final int NUM_INSTANCES = 50; - - + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private static final String TOPIC_AVRO = "samoa_test-avro"; + private static final String TOPIC_JSON = "samoa_test-json"; + private static final int NUM_INSTANCES = 11111; + private static KafkaServer kafkaServer; private static EmbeddedZookeeper zkServer; private static ZkClient zkClient; private static String zkConnect; - + private static int TIMEOUT = 1000; public KafkaEntranceProcessorTest() { } @@ -110,30 +93,35 @@ public class KafkaEntranceProcessorTest { public static void setUpClass() throws IOException { // setup Zookeeper zkServer = new EmbeddedZookeeper(); - zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port(); + zkConnect = ZKHOST + ":" + zkServer.port(); zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); ZkUtils zkUtils = ZkUtils.apply(zkClient, false); // setup Broker - /*Properties brokerProps = new Properties(); + Properties brokerProps = new Properties(); brokerProps.setProperty("zookeeper.connect", zkConnect); brokerProps.setProperty("broker.id", "0"); brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); KafkaConfig config = new KafkaConfig(brokerProps); Time mock = new MockTime(); - kafkaServer = TestUtils.createServer(config, mock);*/ + kafkaServer = TestUtils.createServer(config, mock); - // create topic - //AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + // create topics + AdminUtils.createTopic(zkUtils, TOPIC_AVRO, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + AdminUtils.createTopic(zkUtils, TOPIC_JSON, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); } @AfterClass public static void tearDownClass() { - //kafkaServer.shutdown(); - zkClient.close(); - zkServer.shutdown(); + try { + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } catch (Exception ex) { + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); + } } @Before @@ -146,20 +134,23 @@ public class KafkaEntranceProcessorTest { } - /*@Test - public void testFetchingNewData() throws InterruptedException, ExecutionException, TimeoutException { + @Test + public void testFetchingNewDataWithJson() throws InterruptedException, ExecutionException, TimeoutException { Logger logger = Logger.getLogger(KafkaEntranceProcessorTest.class.getName()); - Properties props = TestUtilsForKafka.getConsumerProperties(); + logger.log(Level.INFO, "JSON"); + logger.log(Level.INFO, "testFetchingNewDataWithJson"); + Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT); props.setProperty("auto.offset.reset", "earliest"); - KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC, 10000, new KafkaJsonMapper(Charset.defaultCharset())); + KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC_JSON, TIMEOUT, new KafkaJsonMapper(Charset.defaultCharset())); + kep.onCreate(1); -// prepare new thread for data producing + // prepare new thread for data producing Thread th = new Thread(new Runnable() { @Override public void run() { - KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties()); + KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT)); Random r = new Random(); InstancesHeader header = TestUtilsForKafka.generateHeader(10); @@ -167,10 +158,9 @@ public class KafkaEntranceProcessorTest { int i = 0; for (i = 0; i < NUM_INSTANCES; i++) { try { - ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC, gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes()); - long stat = producer.send(record).get(10, TimeUnit.DAYS).offset(); - Thread.sleep(5); - Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, "Sent message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat}); + InstanceContentEvent event = TestUtilsForKafka.getData(r, 10, header); + ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC_JSON, gson.toJson(event).getBytes()); + long stat = producer.send(record).get(10, TimeUnit.SECONDS).offset(); } catch (InterruptedException | ExecutionException | TimeoutException ex) { Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); } @@ -182,45 +172,44 @@ public class KafkaEntranceProcessorTest { th.start(); int z = 0; - while (kep.hasNext() && z < NUM_INSTANCES) { - logger.log(Level.INFO, "{0} {1}", new Object[]{z++, kep.nextEvent().toString()}); - } + while (z < NUM_INSTANCES && kep.hasNext()) { + InstanceContentEvent event = (InstanceContentEvent) kep.nextEvent(); + z++; +// logger.log(Level.INFO, "{0} {1}", new Object[]{z, event.getInstance().toString()}); + } + + assertEquals("Number of sent and received instances", NUM_INSTANCES, z); + + } - assertEquals("Number of sent and received instances", NUM_INSTANCES, z); - - - }*/ - @Test public void testFetchingNewDataWithAvro() throws InterruptedException, ExecutionException, TimeoutException { Logger logger = Logger.getLogger(KafkaEntranceProcessorTest.class.getName()); logger.log(Level.INFO, "AVRO"); - logger.log(Level.INFO, "testFetchingNewDataWithAvro"); - Properties props = TestUtilsForKafka.getConsumerProperties(); + logger.log(Level.INFO, "testFetchingNewDataWithAvro"); + Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT); props.setProperty("auto.offset.reset", "earliest"); - KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC, 10000, new KafkaAvroMapper()); + KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC_AVRO, TIMEOUT, new KafkaAvroMapper()); kep.onCreate(1); // prepare new thread for data producing Thread th = new Thread(new Runnable() { @Override public void run() { - KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties()); + KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT)); Random r = new Random(); InstancesHeader header = TestUtilsForKafka.generateHeader(10); - KafkaAvroMapper avroMapper = new KafkaAvroMapper(); + int i = 0; for (i = 0; i < NUM_INSTANCES; i++) { try { - //byte[] data = avroMapper.serialize(TestUtilsForKafka.getData(r, 10, header)); - byte[] data = KafkaAvroMapper.avroSerialize(InstanceContentEvent.class, TestUtilsForKafka.getData(r, 10, header)); - if(data == null) - Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, "Serialize result: null ("+i+")"); - ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC, data); - long stat = producer.send(record).get(10, TimeUnit.DAYS).offset(); - Thread.sleep(5); - Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, "Sent avro message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat}); + byte[] data = KafkaAvroMapper.avroSerialize(InstanceContentEvent.class, TestUtilsForKafka.getData(r, 10, header)); + if (data == null) { + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, "Serialize result: null ({0})", i); + } + ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC_AVRO, data); + long stat = producer.send(record).get(10, TimeUnit.SECONDS).offset(); } catch (InterruptedException | ExecutionException | TimeoutException ex) { Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); } @@ -232,34 +221,12 @@ public class KafkaEntranceProcessorTest { th.start(); int z = 0; - while (kep.hasNext() && z < NUM_INSTANCES) { - InstanceContentEvent event = (InstanceContentEvent)kep.nextEvent(); - logger.log(Level.INFO, "{0} {1}", new Object[]{z++, event.getInstance().toString()}); - } + while (z < NUM_INSTANCES && kep.hasNext()) { + InstanceContentEvent event = (InstanceContentEvent) kep.nextEvent(); + z++; +// logger.log(Level.INFO, "{0} {1}", new Object[]{z, event.getInstance().toString()}); + } - assertEquals("Number of sent and received instances", NUM_INSTANCES, z); - - + assertEquals("Number of sent and received instances", NUM_INSTANCES, z); } - -// private Properties getProducerProperties() { -// Properties producerProps = new Properties(); -//// props.setProperty("zookeeper.connect", zkConnect); -// producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); -// producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); -// producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); -//// producerProps.setProperty("group.id", "test"); -// return producerProps; -// } -// -// private Properties getConsumerProperties() { -// Properties consumerProps = new Properties(); -// consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); -// consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); -// consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); -//// consumerProps.setProperty("group.id", "test"); -// consumerProps.setProperty("group.id", "group0"); -// consumerProps.setProperty("client.id", "consumer0"); -// return consumerProps; - } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/81d29599/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java index 31f34fb..08aae11 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java @@ -15,11 +15,14 @@ */ package org.apache.samoa.streams.kafka; +import com.google.gson.Gson; import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; import java.util.Properties; +import java.util.Random; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; @@ -44,6 +47,9 @@ import kafka.utils.TestUtils; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import kafka.zk.EmbeddedZookeeper; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.samoa.instances.InstancesHeader; /* * #%L @@ -72,11 +78,11 @@ import kafka.zk.EmbeddedZookeeper; @Ignore public class KafkaTaskTest { - private static final String ZKHOST = "10.255.251.202"; //10.255.251.202 - private static final String BROKERHOST = "10.255.251.214"; //10.255.251.214 - private static final String BROKERPORT = "6667"; //6667, local: 9092 + private static final String ZKHOST = "127.0.0.1";//10.255.251.202"; //10.255.251.202 + private static final String BROKERHOST = "127.0.0.1";//"10.255.251.214"; //10.255.251.214 + private static final String BROKERPORT = "9092"; //6667, local: 9092 private static final String TOPIC = "samoa_test"; //samoa_test, local: test - private static final int NUM_INSTANCES = 500; + private static final int NUM_INSTANCES = 125922; private static KafkaServer kafkaServer; @@ -87,10 +93,10 @@ public class KafkaTaskTest { @BeforeClass public static void setUpClass() throws IOException { // setup Zookeeper - zkServer = new EmbeddedZookeeper(); - zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port(); - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); - ZkUtils zkUtils = ZkUtils.apply(zkClient, false); +// zkServer = new EmbeddedZookeeper(); +// zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port(); +// zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); +// ZkUtils zkUtils = ZkUtils.apply(zkClient, false); // setup Broker /*Properties brokerProps = new Properties(); @@ -109,8 +115,8 @@ public class KafkaTaskTest { @AfterClass public static void tearDownClass() { //kafkaServer.shutdown(); - zkClient.close(); - zkServer.shutdown(); +// zkClient.close(); +// zkServer.shutdown(); } @Before @@ -127,12 +133,38 @@ public class KafkaTaskTest { public void testKafkaTask() throws InterruptedException, ExecutionException, TimeoutException { Logger logger = Logger.getLogger(KafkaTaskTest.class.getName()); logger.log(Level.INFO, "KafkaTask"); - Properties producerProps = TestUtilsForKafka.getProducerProperties(); - Properties consumerProps = TestUtilsForKafka.getConsumerProperties(); + Properties producerProps = TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT); + Properties consumerProps = TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT); KafkaTask task = new KafkaTask(producerProps, consumerProps, "kafkaTaskTest", 10000, new KafkaJsonMapper(Charset.defaultCharset()), new KafkaJsonMapper(Charset.defaultCharset())); task.setFactory(new SimpleComponentFactory()); task.init(); SimpleEngine.submitTopology(task.getTopology()); + + Thread th = new Thread(new Runnable() { + @Override + public void run() { + KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT)); + + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + Gson gson = new Gson(); + int i = 0; + for (i = 0; i < NUM_INSTANCES; i++) { + try { + ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC, gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes()); + long stat = producer.send(record).get(10, TimeUnit.DAYS).offset(); +// Thread.sleep(5); + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, "Sent message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat}); + } catch (InterruptedException | ExecutionException | TimeoutException ex) { + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); + } + } + producer.flush(); + producer.close(); + } + }); + th.start(); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/81d29599/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java index 7c1c7c0..e2b36fd 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java @@ -34,8 +34,6 @@ package org.apache.samoa.streams.kafka; * limitations under the License. * #L% */ - - import com.google.gson.Gson; import java.io.IOException; import java.nio.file.Files; @@ -47,7 +45,6 @@ import java.util.List; import java.util.Properties; import java.util.Random; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; @@ -70,32 +67,31 @@ import org.apache.kafka.common.utils.Time; import org.apache.samoa.instances.InstancesHeader; import org.junit.After; import org.junit.AfterClass; +import static org.junit.Assert.*; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; -import static org.junit.Assert.*; /** * * @author pwawrzyniak */ -@Ignore public class KafkaUtilsTest { - private static final String ZKHOST = "10.255.251.202"; //10.255.251.202 - private static final String BROKERHOST = "10.255.251.214"; //10.255.251.214 - private static final String BROKERPORT = "6667"; //6667, local: 9092 + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; private static final String TOPIC_R = "test-r"; private static final String TOPIC_S = "test-s"; + private static final int NUM_INSTANCES = 50; private static KafkaServer kafkaServer; private static EmbeddedZookeeper zkServer; private static ZkClient zkClient; private static String zkConnect; - private Logger logger = Logger.getLogger(KafkaUtilsTest.class.getCanonicalName()); - private long CONSUMER_TIMEOUT = 1000; + private static final Logger logger = Logger.getLogger(KafkaUtilsTest.class.getCanonicalName()); + private final long CONSUMER_TIMEOUT = 1000; public KafkaUtilsTest() { } @@ -104,29 +100,29 @@ public class KafkaUtilsTest { public static void setUpClass() throws IOException { // setup Zookeeper zkServer = new EmbeddedZookeeper(); - zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port(); + zkConnect = ZKHOST + ":" + zkServer.port(); zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); ZkUtils zkUtils = ZkUtils.apply(zkClient, false); // setup Broker - /*Properties brokerProps = new Properties(); + Properties brokerProps = new Properties(); brokerProps.setProperty("zookeeper.connect", zkConnect); brokerProps.setProperty("broker.id", "0"); brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafkaUtils-").toAbsolutePath().toString()); brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); KafkaConfig config = new KafkaConfig(brokerProps); Time mock = new MockTime(); - kafkaServer = TestUtils.createServer(config, mock);*/ + kafkaServer = TestUtils.createServer(config, mock); // create topics - //AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); - //AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); } @AfterClass public static void tearDownClass() { - //kafkaServer.shutdown(); + kafkaServer.shutdown(); zkClient.close(); zkServer.shutdown(); } @@ -146,13 +142,19 @@ public class KafkaUtilsTest { public void testInitializeConsumer() throws Exception { logger.log(Level.INFO, "initializeConsumer"); Collection<String> topics = Arrays.asList(TOPIC_R); - KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(), TestUtilsForKafka.getProducerProperties(), CONSUMER_TIMEOUT); + KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT), CONSUMER_TIMEOUT); assertNotNull(instance); instance.initializeConsumer(topics); + Thread.sleep(1000); + instance.closeConsumer(); + + Thread.sleep(CONSUMER_TIMEOUT); - assertNotNull(instance.getKafkaMessages()); + instance.initializeConsumer(topics); + Thread.sleep(1000); instance.closeConsumer(); + assertTrue(true); } /** @@ -162,14 +164,17 @@ public class KafkaUtilsTest { public void testGetKafkaMessages() throws Exception { logger.log(Level.INFO, "getKafkaMessages"); Collection<String> topics = Arrays.asList(TOPIC_R); - KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(), TestUtilsForKafka.getProducerProperties(), CONSUMER_TIMEOUT); + KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT), CONSUMER_TIMEOUT); assertNotNull(instance); logger.log(Level.INFO, "Initialising consumer"); instance.initializeConsumer(topics); logger.log(Level.INFO, "Produce data"); - List expResult = sendAndGetMessages(50); + List expResult = sendAndGetMessages(NUM_INSTANCES); + + logger.log(Level.INFO, "Wait a moment"); + Thread.sleep(CONSUMER_TIMEOUT); logger.log(Level.INFO, "Get results from Kafka"); List<byte[]> result = instance.getKafkaMessages(); @@ -180,7 +185,7 @@ public class KafkaUtilsTest { private List<byte[]> sendAndGetMessages(int maxNum) throws InterruptedException, ExecutionException, TimeoutException { List<byte[]> ret; - try (KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties("sendM-test"))) { + try (KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties("sendM-test",BROKERHOST,BROKERPORT))) { ret = new ArrayList<>(); Random r = new Random(); InstancesHeader header = TestUtilsForKafka.generateHeader(10); @@ -190,25 +195,28 @@ public class KafkaUtilsTest { ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC_R, gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes()); ret.add(record.value()); producer.send(record); - } producer.flush(); + } + producer.flush(); } return ret; } /** * Test of sendKafkaMessage method, of class KafkaUtils. + * + * @throws java.lang.InterruptedException */ @Test - public void testSendKafkaMessage() { + public void testSendKafkaMessage() throws InterruptedException { logger.log(Level.INFO, "sendKafkaMessage"); logger.log(Level.INFO, "Initialising producer"); - KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(), TestUtilsForKafka.getProducerProperties("rcv-test"), CONSUMER_TIMEOUT); + KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), TestUtilsForKafka.getProducerProperties("rcv-test", BROKERHOST,BROKERPORT), CONSUMER_TIMEOUT); instance.initializeProducer(); logger.log(Level.INFO, "Initialising consumer"); KafkaConsumer<String, byte[]> consumer; - consumer = new KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties()); + consumer = new KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT)); consumer.subscribe(Arrays.asList(TOPIC_S)); logger.log(Level.INFO, "Produce data"); @@ -216,11 +224,13 @@ public class KafkaUtilsTest { Random r = new Random(); InstancesHeader header = TestUtilsForKafka.generateHeader(10); Gson gson = new Gson(); - for (int i = 0; i < 50; i++) { + for (int i = 0; i < NUM_INSTANCES; i++) { byte[] val = gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes(); sent.add(val); instance.sendKafkaMessage(TOPIC_S, val); } + // wait for Kafka a bit :) + Thread.sleep(CONSUMER_TIMEOUT); logger.log(Level.INFO, "Get results from Kafka"); ConsumerRecords<String, byte[]> records = consumer.poll(CONSUMER_TIMEOUT); http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/81d29599/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java index 8d85fd7..87ab16c 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java @@ -48,14 +48,12 @@ import org.apache.samoa.moa.core.FastVector; /** * - * @author pwawrzyniak <your.name at your.org> + * @author pwawrzyniak */ public class TestUtilsForKafka { - private static final String ZKHOST = "10.255.251.202"; //10.255.251.202 - private static final String BROKERHOST = "10.255.251.214"; //10.255.251.214 - private static final String BROKERPORT = "6667"; //6667, local: 9092 - private static final String TOPIC = "samoa_test"; //samoa_test, local: test +// private static final String BROKERHOST = "127.0.0.1"; +// private static final String BROKERPORT = "9092"; protected static InstanceContentEvent getData(Random instanceRandom, int numAtts, InstancesHeader header) { double[] attVals = new double[numAtts + 1]; @@ -63,8 +61,7 @@ public class TestUtilsForKafka { double sumWeights = 0.0; for (int i = 0; i < numAtts; i++) { attVals[i] = instanceRandom.nextDouble(); -// sum += this.weights[i] * attVals[i]; -// sumWeights += this.weights[i]; + } int classLabel; if (sum >= sumWeights * 0.5) { @@ -98,8 +95,8 @@ public class TestUtilsForKafka { } - protected static Properties getProducerProperties() { - return getProducerProperties("test"); + protected static Properties getProducerProperties(String BROKERHOST, String BROKERPORT) { + return getProducerProperties("test", BROKERHOST, BROKERPORT); } /** @@ -107,7 +104,7 @@ public class TestUtilsForKafka { * @param clientId * @return */ - protected static Properties getProducerProperties(String clientId) { + protected static Properties getProducerProperties(String clientId, String BROKERHOST, String BROKERPORT) { Properties producerProps = new Properties(); producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); @@ -117,7 +114,7 @@ public class TestUtilsForKafka { return producerProps; } - protected static Properties getConsumerProperties() { + protected static Properties getConsumerProperties(String BROKERHOST, String BROKERPORT) { Properties consumerProps = new Properties(); consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); consumerProps.put("enable.auto.commit", "true"); @@ -126,11 +123,10 @@ public class TestUtilsForKafka { consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); consumerProps.setProperty("group.id", "test"); consumerProps.setProperty("auto.offset.reset", "earliest"); - //consumerProps.setProperty("client.id", "consumer0"); return consumerProps; } - protected static Properties getConsumerProducerProperties() { + protected static Properties getConsumerProducerProperties(String BROKERHOST, String BROKERPORT) { Properties props = new Properties(); props.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); props.put("enable.auto.commit", "true");
