kafka IT

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/409b4482
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/409b4482
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/409b4482

Branch: refs/heads/STREAMS-49
Commit: 409b44827e047a2bd2c07effb5f4127e04a4b7da
Parents: 85e0b50
Author: sblackmon <[email protected]>
Authored: Thu Nov 6 13:28:37 2014 -0800
Committer: sblackmon <[email protected]>
Committed: Thu Nov 6 13:28:37 2014 -0800

----------------------------------------------------------------------
 streams-contrib/streams-persist-kafka/pom.xml   |  24 ++++
 .../apache/streams/kafka/KafkaConfigurator.java |  20 ++--
 .../streams/kafka/KafkaPersistReaderTask.java   |  22 ++--
 .../streams/kafka/KafkaPersistWriter.java       |  15 +--
 .../streams/kafka/StreamsPartitioner.java       |   2 +-
 .../streams/kafka/test/KafkaPersistIT.java      | 109 +++++++++++++++++++
 6 files changed, 165 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/409b4482/streams-contrib/streams-persist-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/pom.xml 
b/streams-contrib/streams-persist-kafka/pom.xml
index 4e9e375..6d98c36 100644
--- a/streams-contrib/streams-persist-kafka/pom.xml
+++ b/streams-contrib/streams-persist-kafka/pom.xml
@@ -85,6 +85,30 @@
             <version>2.6.0</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-runtime-local</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-persist-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <sourceDirectory>src/main/java</sourceDirectory>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/409b4482/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java
 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java
index 9f64ae6..4a835e8 100644
--- 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java
+++ 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java
@@ -18,7 +18,9 @@
 
 package org.apache.streams.kafka;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
 import org.apache.streams.config.StreamsConfigurator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,18 +32,18 @@ public class KafkaConfigurator {
 
     private final static Logger LOGGER = 
LoggerFactory.getLogger(KafkaConfigurator.class);
 
+    private final static ObjectMapper mapper = new ObjectMapper();
+
     public static KafkaConfiguration detectConfiguration(Config kafka) {
-        String brokerlist = 
StreamsConfigurator.config.getString("kafka.metadata.broker.list");
-        String zkconnect = 
StreamsConfigurator.config.getString("kafka.zkconnect");
-        String topic = StreamsConfigurator.config.getString("kafka.topic");
-        String groupId = StreamsConfigurator.config.getString("kafka.groupid");
 
-        KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
+        KafkaConfiguration kafkaConfiguration = null;
 
-        kafkaConfiguration.setBrokerlist(brokerlist);
-        kafkaConfiguration.setZkconnect(zkconnect);
-        kafkaConfiguration.setTopic(topic);
-        kafkaConfiguration.setGroupId(groupId);
+        try {
+            kafkaConfiguration = 
mapper.readValue(kafka.root().render(ConfigRenderOptions.concise()), 
KafkaConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse KafkaConfiguration");
+        }
 
         return kafkaConfiguration;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/409b4482/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
index 03fa291..3c71398 100644
--- 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
+++ 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
@@ -33,30 +33,38 @@ public class KafkaPersistReaderTask implements Runnable {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaPersistReaderTask.class);
 
     private KafkaPersistReader reader;
-    private KafkaStream<String,String> stream;
+    private KafkaStream<String,Object> stream;
 
-    public KafkaPersistReaderTask(KafkaPersistReader reader, 
KafkaStream<String,String> stream) {
+    public KafkaPersistReaderTask(KafkaPersistReader reader, 
KafkaStream<String,Object> stream) {
         this.reader = reader;
         this.stream = stream;
     }
 
-
-
     @Override
     public void run() {
 
         Preconditions.checkNotNull(this.stream);
 
-        ConsumerIterator<String, String> it = stream.iterator();
+        ConsumerIterator<String, Object> it = stream.iterator();
         while (it.hasNext()) {
-            MessageAndMetadata<String,String> item = it.next();
-            reader.persistQueue.add(new StreamsDatum(item.message()));
+            MessageAndMetadata<String,Object> item = it.next();
+            write(new StreamsDatum(item.message()));
         }
         try {
             Thread.sleep(new Random().nextInt(100));
         } catch (InterruptedException e) {}
 
+    }
 
+    private void write( StreamsDatum entry ) {
+        boolean success;
+        do {
+            synchronized( KafkaPersistReader.class ) {
+                success = reader.persistQueue.offer(entry);
+            }
+            Thread.yield();
+        }
+        while( !success );
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/409b4482/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
index 01db32f..2937b7a 100644
--- 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
+++ 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
@@ -46,7 +46,7 @@ public class KafkaPersistWriter implements 
StreamsPersistWriter, Serializable {
 
     private KafkaConfiguration config;
 
-    private Producer<String, String> producer;
+    private Producer<String, Object> producer;
 
     public KafkaPersistWriter() {
        
this(KafkaConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("kafka")));
@@ -67,7 +67,7 @@ public class KafkaPersistWriter implements 
StreamsPersistWriter, Serializable {
 
         ProducerConfig config = new ProducerConfig(props);
 
-        producer = new Producer<String, String>(config);
+        producer = new Producer<String, Object>(config);
 
         new Thread(new KafkaPersistWriterTask(this)).start();
     }
@@ -75,16 +75,11 @@ public class KafkaPersistWriter implements 
StreamsPersistWriter, Serializable {
     @Override
     public void write(StreamsDatum entry) {
 
-        try {
-            String text = mapper.writeValueAsString(entry);
+        String key = entry.getId() != null ? entry.getId() : 
GuidUtils.generateGuid("kafkawriter");
 
-            KeyedMessage<String, String> data = new KeyedMessage<String, 
String>(config.getTopic(), entry.getId(), text);
+        KeyedMessage<String, Object> data = new 
KeyedMessage<>(config.getTopic(), key, entry.getDocument());
 
-            producer.send(data);
-
-        } catch (JsonProcessingException e) {
-            LOGGER.warn("save: {}", e);
-        }// put
+        producer.send(data);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/409b4482/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
index fa38ca2..b1f2a28 100644
--- 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
+++ 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
@@ -32,7 +32,7 @@ public class StreamsPartitioner implements Partitioner {
 
     public int partition(Object key, int a_numPartitions) {
         int partition = 0;
-        partition = key.hashCode() % a_numPartitions;
+
         return partition;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/409b4482/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java
 
b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java
new file mode 100644
index 0000000..2817fca
--- /dev/null
+++ 
b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java
@@ -0,0 +1,109 @@
+package org.apache.streams.kafka.test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import javassist.bytecode.stackmap.TypeData;
+import kafka.admin.AdminUtils;
+import kafka.utils.ZKStringSerializer$;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.streams.console.ConsolePersistReader;
+import org.apache.streams.console.ConsolePersistWriter;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.kafka.KafkaConfiguration;
+import org.apache.streams.kafka.KafkaPersistReader;
+import org.apache.streams.kafka.KafkaPersistWriter;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.api.mockito.PowerMockito;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+public class KafkaPersistIT {
+
+    private TestKafkaCluster testKafkaCluster;
+    private KafkaConfiguration testConfiguration;
+
+    private String testTopic = "testTopic";
+
+    ConsolePersistReader reader = Mockito.mock(ConsolePersistReader.class);
+    ConsolePersistWriter writer = Mockito.mock(ConsolePersistWriter.class);
+
+    StreamsDatum testDatum = new StreamsDatum("{\"dummy\":\"true\"}");
+
+    @Before
+    public void prepareTest() {
+
+        try {
+            testKafkaCluster = new TestKafkaCluster();
+        } catch (Throwable e ) {
+            e.printStackTrace();
+        }
+
+        testConfiguration = new KafkaConfiguration();
+        
testConfiguration.setBrokerlist(testKafkaCluster.getKafkaBrokerString());
+        testConfiguration.setZkconnect(testKafkaCluster.getZkConnectString());
+        testConfiguration.setTopic(testTopic);
+
+        ZkClient zkClient = new 
ZkClient(testKafkaCluster.getZkConnectString(), 1000, 1000, 
ZKStringSerializer$.MODULE$);
+
+        AdminUtils.createTopic(zkClient, testTopic, 1, 1, new Properties());
+
+        PowerMockito.when(reader.readCurrent())
+                .thenReturn(
+                        new StreamsResultSet(Queues.newConcurrentLinkedQueue(
+                                Lists.newArrayList(testDatum)))
+                );
+    }
+
+    @Test
+    public void testPersistStream() {
+
+        assert(testConfiguration != null);
+        assert(testKafkaCluster != null);
+
+        Map<String, Object> streamConfig = Maps.newHashMap();
+        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000);
+
+        StreamBuilder builder = new LocalStreamBuilder(1, streamConfig);
+
+        KafkaPersistWriter kafkaWriter = new 
KafkaPersistWriter(testConfiguration);
+        KafkaPersistReader kafkaReader = new 
KafkaPersistReader(testConfiguration);
+
+        builder.newReadCurrentStream("stdin", reader);
+        builder.addStreamsPersistWriter("writer", kafkaWriter, 1, "stdin");
+        builder.newPerpetualStream("reader", kafkaReader);
+        builder.addStreamsPersistWriter("stdout", writer, 1, "reader");
+
+        builder.start();
+
+        builder.stop();
+
+        Mockito.verify(writer).write(testDatum);
+
+    }
+}

Reply via email to