Tests for KafkaDestinationProcessor, minor changes in classes

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/2a4bec9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/2a4bec9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/2a4bec9e

Branch: refs/heads/master
Commit: 2a4bec9eacc061f11237ec7b7035da8b564df339
Parents: 43f69c6
Author: pwawrzyniak <[email protected]>
Authored: Tue Apr 11 16:44:30 2017 +0200
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 .../kafka/KafkaDestinationProcessor.java        |  50 +++---
 .../apache/samoa/streams/kafka/KafkaUtils.java  |   6 +
 .../kafka/KafkaDestinationProcessorTest.java    | 167 +++++++++++++++++++
 3 files changed, 201 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/2a4bec9e/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
index 67dfbaa..420d43c 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
@@ -13,28 +13,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.samoa.streams.kafka;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2017 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
 
 import java.util.Properties;
 import java.util.logging.Level;
@@ -50,6 +50,12 @@ import org.apache.samoa.core.Processor;
  */
 public class KafkaDestinationProcessor implements Processor {
 
+    @Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+        kafkaUtils.closeProducer();
+    }
+
     private final KafkaUtils kafkaUtils;
     private final String topic;
     private final KafkaSerializer serializer;

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/2a4bec9e/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
index f5227d3..0635877 100644
--- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
@@ -121,6 +121,12 @@ class KafkaUtils {
         }
     }
 
+    public void closeProducer(){
+        if(producer != null){
+            producer.close(1, TimeUnit.MINUTES);
+        }
+    }
+    
     /**
      * Method for reading new messages from Kafka topics
      *

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/2a4bec9e/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
new file mode 100644
index 0000000..a138763
--- /dev/null
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.junit.After;
+import org.junit.AfterClass;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * @author pwawrzyniak <your.name at your.org>
+ */
+public class KafkaDestinationProcessorTest {
+
+    private static final String ZKHOST = "127.0.0.1";
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+    private static final String TOPIC = "test-kdp";
+    private static final int NUM_INSTANCES = 500;
+    private static final int CONSUMER_TIMEOUT = 1000;
+
+    private static KafkaServer kafkaServer;
+    private static EmbeddedZookeeper zkServer;
+    private static ZkClient zkClient;
+    private static String zkConnect;
+
+    public KafkaDestinationProcessorTest() {
+    }
+
+    @BeforeClass
+    public static void setUpClass() throws IOException {
+        // setup Zookeeper
+        zkServer = new EmbeddedZookeeper();
+        zkConnect = ZKHOST + ":" + zkServer.port();
+        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
+        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
+        KafkaConfig config = new KafkaConfig(brokerProps);
+        Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+
+        // create topic
+        AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+
+    }
+
+    @AfterClass
+    public static void tearDownClass() {
+        kafkaServer.shutdown();
+        zkClient.close();
+        zkServer.shutdown();
+    }
+
+    @Before
+    public void setUp() throws IOException {
+
+    }
+
+    @After
+    public void tearDown() {
+
+    }
+
+    @Test
+    public void testSendingData() throws InterruptedException, 
ExecutionException, TimeoutException {
+
+        final Logger logger = 
Logger.getLogger(KafkaDestinationProcessorTest.class.getName());
+        Properties props = TestUtilsForKafka.getProducerProperties();
+        props.setProperty("auto.offset.reset", "earliest");
+        KafkaDestinationProcessor kdp = new KafkaDestinationProcessor(props, 
TOPIC, new KafkaJsonMapper(Charset.defaultCharset()));
+        kdp.onCreate(1);
+
+        final int[] i = {0};
+        
+//         prepare new thread for data receiveing
+        Thread th = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                KafkaConsumer<String, byte[]> consumer = new 
KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties());
+                consumer.subscribe(Arrays.asList(TOPIC));
+                while (i[0] < NUM_INSTANCES) {
+                    try {
+                        ConsumerRecords<String, byte[]> cr = 
consumer.poll(CONSUMER_TIMEOUT);
+
+                        Iterator<ConsumerRecord<String, byte[]>> it = 
cr.iterator();
+                        while (it.hasNext()) {
+                            ConsumerRecord<String, byte[]> record = it.next();
+                            logger.info(new String(record.value()));
+                            logger.log(Level.INFO, "Current read offset is: 
{0}", record.offset());
+                            i[0]++;
+                        }
+                        
+                        Thread.sleep(1);
+
+                    } catch (InterruptedException ex) {
+                        
Logger.getLogger(KafkaDestinationProcessorTest.class.getName()).log(Level.SEVERE,
 null, ex);
+                    }
+                }
+                consumer.close();
+            }
+        });
+        th.start();
+
+        int z = 0;
+        Random r = new Random();
+        InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+
+        for (z = 0; z < NUM_INSTANCES; z++) {
+            InstanceContentEvent event = TestUtilsForKafka.getData(r, 10, 
header);
+            kdp.process(event);
+            logger.log(Level.INFO, "{0} {1}", new Object[]{"Sent item with id: 
", z});
+            Thread.sleep(5);
+        }
+        // wait for all instances to be read
+        Thread.sleep(100);        
+        assertEquals("Number of sent and received instances", z, i[0]);
+    }
+}

Reply via email to