kafka IT
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/409b4482 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/409b4482 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/409b4482 Branch: refs/heads/STREAMS-49 Commit: 409b44827e047a2bd2c07effb5f4127e04a4b7da Parents: 85e0b50 Author: sblackmon <[email protected]> Authored: Thu Nov 6 13:28:37 2014 -0800 Committer: sblackmon <[email protected]> Committed: Thu Nov 6 13:28:37 2014 -0800 ---------------------------------------------------------------------- streams-contrib/streams-persist-kafka/pom.xml | 24 ++++ .../apache/streams/kafka/KafkaConfigurator.java | 20 ++-- .../streams/kafka/KafkaPersistReaderTask.java | 22 ++-- .../streams/kafka/KafkaPersistWriter.java | 15 +-- .../streams/kafka/StreamsPartitioner.java | 2 +- .../streams/kafka/test/KafkaPersistIT.java | 109 +++++++++++++++++++ 6 files changed, 165 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/409b4482/streams-contrib/streams-persist-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/pom.xml b/streams-contrib/streams-persist-kafka/pom.xml index 4e9e375..6d98c36 100644 --- a/streams-contrib/streams-persist-kafka/pom.xml +++ b/streams-contrib/streams-persist-kafka/pom.xml @@ -85,6 +85,30 @@ <version>2.6.0</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-runtime-local</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-persist-console</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + </dependency> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/409b4482/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java index 9f64ae6..4a835e8 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java @@ -18,7 +18,9 @@ package org.apache.streams.kafka; +import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; +import com.typesafe.config.ConfigRenderOptions; import org.apache.streams.config.StreamsConfigurator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,18 +32,18 @@ public class KafkaConfigurator { private final static Logger LOGGER = LoggerFactory.getLogger(KafkaConfigurator.class); + private final static ObjectMapper mapper = new ObjectMapper(); + public static KafkaConfiguration detectConfiguration(Config kafka) { - String brokerlist = StreamsConfigurator.config.getString("kafka.metadata.broker.list"); - String zkconnect = StreamsConfigurator.config.getString("kafka.zkconnect"); - String topic = StreamsConfigurator.config.getString("kafka.topic"); - String groupId = StreamsConfigurator.config.getString("kafka.groupid"); - KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(); + KafkaConfiguration kafkaConfiguration = null; - kafkaConfiguration.setBrokerlist(brokerlist); - kafkaConfiguration.setZkconnect(zkconnect); - kafkaConfiguration.setTopic(topic); - kafkaConfiguration.setGroupId(groupId); + try { + kafkaConfiguration = mapper.readValue(kafka.root().render(ConfigRenderOptions.concise()), KafkaConfiguration.class); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn("Could not parse KafkaConfiguration"); + } return kafkaConfiguration; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/409b4482/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java index 03fa291..3c71398 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java @@ -33,30 +33,38 @@ public class KafkaPersistReaderTask implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReaderTask.class); private KafkaPersistReader reader; - private KafkaStream<String,String> stream; + private KafkaStream<String,Object> stream; - public KafkaPersistReaderTask(KafkaPersistReader reader, KafkaStream<String,String> stream) { + public KafkaPersistReaderTask(KafkaPersistReader reader, KafkaStream<String,Object> stream) { this.reader = reader; this.stream = stream; } - - @Override public void run() { Preconditions.checkNotNull(this.stream); - ConsumerIterator<String, String> it = stream.iterator(); + ConsumerIterator<String, Object> it = stream.iterator(); while (it.hasNext()) { - MessageAndMetadata<String,String> item = it.next(); - reader.persistQueue.add(new StreamsDatum(item.message())); + MessageAndMetadata<String,Object> item = it.next(); + write(new StreamsDatum(item.message())); } try { Thread.sleep(new Random().nextInt(100)); } catch (InterruptedException e) {} + } + private void write( StreamsDatum entry ) { + boolean success; + do { + synchronized( KafkaPersistReader.class ) { + success = reader.persistQueue.offer(entry); + } + Thread.yield(); + } + while( !success ); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/409b4482/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java index 01db32f..2937b7a 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java @@ -46,7 +46,7 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable { private KafkaConfiguration config; - private Producer<String, String> producer; + private Producer<String, Object> producer; public KafkaPersistWriter() { this(KafkaConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("kafka"))); @@ -67,7 +67,7 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable { ProducerConfig config = new ProducerConfig(props); - producer = new Producer<String, String>(config); + producer = new Producer<String, Object>(config); new Thread(new KafkaPersistWriterTask(this)).start(); } @@ -75,16 +75,11 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable { @Override public void write(StreamsDatum entry) { - try { - String text = mapper.writeValueAsString(entry); + String key = entry.getId() != null ? entry.getId() : GuidUtils.generateGuid("kafkawriter"); - KeyedMessage<String, String> data = new KeyedMessage<String, String>(config.getTopic(), entry.getId(), text); + KeyedMessage<String, Object> data = new KeyedMessage<>(config.getTopic(), key, entry.getDocument()); - producer.send(data); - - } catch (JsonProcessingException e) { - LOGGER.warn("save: {}", e); - }// put + producer.send(data); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/409b4482/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java index fa38ca2..b1f2a28 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java @@ -32,7 +32,7 @@ public class StreamsPartitioner implements Partitioner { public int partition(Object key, int a_numPartitions) { int partition = 0; - partition = key.hashCode() % a_numPartitions; + return partition; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/409b4482/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java new file mode 100644 index 0000000..2817fca --- /dev/null +++ b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/KafkaPersistIT.java @@ -0,0 +1,109 @@ +package org.apache.streams.kafka.test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import javassist.bytecode.stackmap.TypeData; +import kafka.admin.AdminUtils; +import kafka.utils.ZKStringSerializer$; +import org.I0Itec.zkclient.ZkClient; +import org.apache.streams.console.ConsolePersistReader; +import org.apache.streams.console.ConsolePersistWriter; +import org.apache.streams.core.StreamBuilder; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.kafka.KafkaConfiguration; +import org.apache.streams.kafka.KafkaPersistReader; +import org.apache.streams.kafka.KafkaPersistWriter; +import org.apache.streams.local.builders.LocalStreamBuilder; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.internal.util.reflection.Whitebox; +import org.powermock.api.mockito.PowerMockito; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.Map; +import java.util.Properties; + +import static org.mockito.Mockito.mock; + +/** + * Created by sblackmon on 10/20/14. + */ +public class KafkaPersistIT { + + private TestKafkaCluster testKafkaCluster; + private KafkaConfiguration testConfiguration; + + private String testTopic = "testTopic"; + + ConsolePersistReader reader = Mockito.mock(ConsolePersistReader.class); + ConsolePersistWriter writer = Mockito.mock(ConsolePersistWriter.class); + + StreamsDatum testDatum = new StreamsDatum("{\"dummy\":\"true\"}"); + + @Before + public void prepareTest() { + + try { + testKafkaCluster = new TestKafkaCluster(); + } catch (Throwable e ) { + e.printStackTrace(); + } + + testConfiguration = new KafkaConfiguration(); + testConfiguration.setBrokerlist(testKafkaCluster.getKafkaBrokerString()); + testConfiguration.setZkconnect(testKafkaCluster.getZkConnectString()); + testConfiguration.setTopic(testTopic); + + ZkClient zkClient = new ZkClient(testKafkaCluster.getZkConnectString(), 1000, 1000, ZKStringSerializer$.MODULE$); + + AdminUtils.createTopic(zkClient, testTopic, 1, 1, new Properties()); + + PowerMockito.when(reader.readCurrent()) + .thenReturn( + new StreamsResultSet(Queues.newConcurrentLinkedQueue( + Lists.newArrayList(testDatum))) + ); + } + + @Test + public void testPersistStream() { + + assert(testConfiguration != null); + assert(testKafkaCluster != null); + + Map<String, Object> streamConfig = Maps.newHashMap(); + streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000); + + StreamBuilder builder = new LocalStreamBuilder(1, streamConfig); + + KafkaPersistWriter kafkaWriter = new KafkaPersistWriter(testConfiguration); + KafkaPersistReader kafkaReader = new KafkaPersistReader(testConfiguration); + + builder.newReadCurrentStream("stdin", reader); + builder.addStreamsPersistWriter("writer", kafkaWriter, 1, "stdin"); + builder.newPerpetualStream("reader", kafkaReader); + builder.addStreamsPersistWriter("stdout", writer, 1, "reader"); + + builder.start(); + + builder.stop(); + + Mockito.verify(writer).write(testDatum); + + } +}
