Tests for kafkaUtils and KafkaEntranceProcessor, minor changes in classes Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/43f69c62 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/43f69c62 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/43f69c62
Branch: refs/heads/master Commit: 43f69c6235066ba8cfc49e8079621b71c4f43f4c Parents: 0db63b9 Author: pwawrzyniak <[email protected]> Authored: Fri Mar 24 14:34:49 2017 +0100 Committer: nkourtellis <[email protected]> Committed: Fri Jul 21 21:12:18 2017 +0300 ---------------------------------------------------------------------- .gitignore | 31 +- samoa-api/pom.xml | 289 ++++++++++--------- .../samoa/streams/kafka/KafkaDeserializer.java | 23 +- .../kafka/KafkaDestinationProcessor.java | 23 +- .../streams/kafka/KafkaEntranceProcessor.java | 28 ++ .../samoa/streams/kafka/KafkaJsonMapper.java | 41 ++- .../samoa/streams/kafka/KafkaSerializer.java | 23 +- .../apache/samoa/streams/kafka/KafkaUtils.java | 79 ++++- .../kafka/KafkaEntranceProcessorTest.java | 212 ++++++++++++++ .../samoa/streams/kafka/KafkaUtilsTest.java | 235 +++++++++++++++ .../samoa/streams/kafka/TestUtilsForKafka.java | 132 +++++++++ 11 files changed, 945 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 294c718..a834232 100644 --- a/.gitignore +++ b/.gitignore @@ -1,15 +1,16 @@ -#maven -target/ - -#eclipse -.classpath -.project -.settings/ - -#DS_Store -.DS_Store - -#intellij -.idea/ -*.iml -*.iws +#maven +target/ + +#eclipse +.classpath +.project +.settings/ + +#DS_Store +.DS_Store + +#intellij +.idea/ +*.iml +*.iws +/samoa-api/nbproject/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-api/pom.xml b/samoa-api/pom.xml index 4621b93..2b7bd22 100644 --- a/samoa-api/pom.xml +++ b/samoa-api/pom.xml @@ -1,135 +1,154 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - #%L - SAMOA - %% - Copyright (C) 2014 - 2015 Apache Software Foundation - %% - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -#L% ---> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - </properties> - - <name>samoa-api</name> - <description>API and algorithms for SAMOA</description> - - <artifactId>samoa-api</artifactId> - <parent> - <groupId>org.apache.samoa</groupId> - <artifactId>samoa</artifactId> - <version>0.5.0-incubating-SNAPSHOT</version> - </parent> - - <dependencies> - <dependency> - <groupId>com.yammer.metrics</groupId> - <artifactId>metrics-core</artifactId> - <version>${metrics-core.version}</version> - </dependency> - - <dependency> - <groupId>net.jcip</groupId> - <artifactId>jcip-annotations</artifactId> - <version>${jcip-annotations.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <version>${commons-lang3.version}</version> - </dependency> - - <dependency> - <groupId>com.github.javacliparser</groupId> - <artifactId>javacliparser</artifactId> - <version>${javacliparser.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.samoa</groupId> - <artifactId>samoa-instances</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - - <dependency> - <groupId>com.esotericsoftware.kryo</groupId> - <artifactId>kryo</artifactId> - <version>${kryo.version}</version> - </dependency> - - <dependency> - <groupId>com.dreizak</groupId> - <artifactId>miniball</artifactId> - <version>${miniball.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minicluster</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - <version>0.10.2.0</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <version>${maven-dependency-plugin.version}</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>${project.build.directory}/lib</outputDirectory> - <overWriteReleases>false</overWriteReleases> - <overWriteSnapshots>false</overWriteSnapshots> - <overWriteIfNewer>true</overWriteIfNewer> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> +<?xml version="1.0" encoding="UTF-8"?> +<!-- + #%L + SAMOA + %% + Copyright (C) 2014 - 2015 Apache Software Foundation + %% + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +#L% +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <name>samoa-api</name> + <description>API and algorithms for SAMOA</description> + + <artifactId>samoa-api</artifactId> + <parent> + <groupId>org.apache.samoa</groupId> + <artifactId>samoa</artifactId> + <version>0.5.0-incubating-SNAPSHOT</version> + </parent> + + <dependencies> + <dependency> + <groupId>com.yammer.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${metrics-core.version}</version> + </dependency> + + <dependency> + <groupId>net.jcip</groupId> + <artifactId>jcip-annotations</artifactId> + <version>${jcip-annotations.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons-lang3.version}</version> + </dependency> + + <dependency> + <groupId>com.github.javacliparser</groupId> + <artifactId>javacliparser</artifactId> + <version>${javacliparser.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.samoa</groupId> + <artifactId>samoa-instances</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + <version>${kryo.version}</version> + </dependency> + + <dependency> + <groupId>com.dreizak</groupId> + <artifactId>miniball</artifactId> + <version>${miniball.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.2.0</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.2.0</version> + <classifier>test</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.10.2.0</version> +</dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.10.2.0</version> + <classifier>test</classifier> + <scope>test</scope> +</dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>${maven-dependency-plugin.version}</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java index b85ec1f..7b11cbd 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java @@ -13,7 +13,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.samoa.streams.kafka; +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + import org.apache.samoa.core.ContentEvent; http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java index 5632b6e..67dfbaa 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java @@ -13,7 +13,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.samoa.streams.kafka; +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + import java.util.Properties; import java.util.logging.Level; http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java index d0a4c0d..2b0b808 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java @@ -15,6 +15,27 @@ */ package org.apache.samoa.streams.kafka; +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -97,4 +118,11 @@ public class KafkaEntranceProcessor implements EntranceProcessor { KafkaEntranceProcessor kep = (KafkaEntranceProcessor) processor; return new KafkaEntranceProcessor(new KafkaUtils(kep.kafkaUtils), kep.deserializer, kep.topic); } + + @Override + protected void finalize() throws Throwable { + kafkaUtils.closeConsumer(); + super.finalize(); //To change body of generated methods, choose Tools | Templates. + } + } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java index 6ede447..1996b40 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java @@ -15,12 +15,40 @@ */ package org.apache.samoa.streams.kafka; +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.InstanceCreator; +import java.lang.reflect.Type; import java.nio.charset.Charset; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.samoa.instances.InstanceData; +import org.apache.samoa.instances.SingleClassInstanceData; import org.apache.samoa.learners.InstanceContentEvent; /** - * Sample class for serializing and deserializing InsatnceContentEvent from/to JSON format + * Sample class for serializing and deserializing {@link InstanceContentEvent} from/to JSON format * @author pwawrzyniak * @version 0.5.0-incubating-SNAPSHOT * @since 0.5.0-incubating @@ -35,7 +63,7 @@ public class KafkaJsonMapper implements KafkaDeserializer<InstanceContentEvent>, * @param charset Charset to be used for bytes parsing */ public KafkaJsonMapper(Charset charset){ - this.gson = new Gson(); + this.gson = new GsonBuilder().registerTypeAdapter(InstanceData.class, new InstanceDataCreator()).create(); this.charset = charset; } @@ -49,4 +77,13 @@ public class KafkaJsonMapper implements KafkaDeserializer<InstanceContentEvent>, return gson.toJson(message).getBytes(this.charset); } + public class InstanceDataCreator implements InstanceCreator<InstanceData>{ + + @Override + public InstanceData createInstance(Type type) { + return new SingleClassInstanceData(); + } + + } + } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java index a8cc0b8..ad6bd8e 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java @@ -13,7 +13,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.samoa.streams.kafka; +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + import org.apache.samoa.core.ContentEvent; http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java index 24783d4..f5227d3 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java @@ -15,19 +15,45 @@ */ package org.apache.samoa.streams.kafka; +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import java.util.logging.Level; +import java.util.logging.Logger; /** - * Internal class responsible for Kafka Stream handling (both consume and produce) + * Internal class responsible for Kafka Stream handling (both consume and + * produce) * * @author pwawrzyniak * @version 0.5.0-incubating-SNAPSHOT @@ -36,24 +62,25 @@ import org.apache.kafka.clients.producer.ProducerRecord; class KafkaUtils { // Consumer class for internal use to retrieve messages from Kafka - private KafkaConsumer<String, byte[]> consumer; + private transient KafkaConsumer<String, byte[]> consumer; - private KafkaProducer<String, byte[]> producer; + private transient KafkaProducer<String, byte[]> producer; // Properties of the consumer, as defined in Kafka documentation private final Properties consumerProperties; private final Properties producerProperties; // Timeout for Kafka Consumer - private int consumerTimeout; + private long consumerTimeout; /** * Class constructor + * * @param consumerProperties Properties of consumer * @param producerProperties Properties of producer * @param consumerTimeout Timeout for consumer poll requests */ - public KafkaUtils(Properties consumerProperties, Properties producerProperties, int consumerTimeout) { + public KafkaUtils(Properties consumerProperties, Properties producerProperties, long consumerTimeout) { this.consumerProperties = consumerProperties; this.producerProperties = producerProperties; this.consumerTimeout = consumerTimeout; @@ -66,7 +93,9 @@ class KafkaUtils { } /** - * Method used to initialize Kafka Consumer, i.e. instantiate it and subscribe to configured topic + * Method used to initialize Kafka Consumer, i.e. instantiate it and + * subscribe to configured topic + * * @param topics List of Kafka topics that consumer should subscribe to */ public void initializeConsumer(Collection<String> topics) { @@ -75,19 +104,29 @@ class KafkaUtils { consumer = new KafkaConsumer<>(consumerProperties); } consumer.subscribe(topics); +// consumer.seekToBeginning(consumer.assignment()); + } + + public void closeConsumer() { + if (consumer != null) { + consumer.unsubscribe(); + consumer.close(); + } } - public void initializeProducer(){ + public void initializeProducer() { // lazy instantiation - if(producer==null){ + if (producer == null) { producer = new KafkaProducer<>(producerProperties); - } + } } - + /** * Method for reading new messages from Kafka topics + * * @return Collection of read messages - * @throws Exception Exception is thrown when consumer was not initialized or is not subscribed to any topic. + * @throws Exception Exception is thrown when consumer was not initialized + * or is not subscribed to any topic. */ public List<byte[]> getKafkaMessages() throws Exception { @@ -107,16 +146,24 @@ class KafkaUtils { private List<byte[]> getMessagesBytes(ConsumerRecords<String, byte[]> poll) { Iterator<ConsumerRecord<String, byte[]>> iterator = poll.iterator(); List<byte[]> ret = new ArrayList<>(); - while(iterator.hasNext()){ + while (iterator.hasNext()) { ret.add(iterator.next().value()); } return ret; } - - public void sendKafkaMessage(String topic, byte[] message){ - if(producer!=null){ - producer.send(new ProducerRecord<String, byte[]>(topic, message)); + + public long sendKafkaMessage(String topic, byte[] message) { + if (producer != null) { + try{ + ProducerRecord<String, byte[]> record = new ProducerRecord(topic, message); + long offset = producer.send(record).get(10, TimeUnit.SECONDS).offset(); producer.flush(); + return offset; + } catch(InterruptedException | ExecutionException | TimeoutException e){ + Logger.getLogger(KafkaUtils.class.getName()).log(Level.SEVERE, null, e); + } + } + return -1; } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java new file mode 100644 index 0000000..2a92a31 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java @@ -0,0 +1,212 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import com.google.gson.Gson; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import mockit.Mocked; +import mockit.Tested; +import mockit.Expectations; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.learners.InstanceContentEvent; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import org.apache.kafka.common.utils.Time; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.samoa.instances.Attribute; +import org.apache.samoa.instances.DenseInstance; +import org.apache.samoa.instances.Instance; +import org.apache.samoa.instances.Instances; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.moa.core.FastVector; +import org.apache.samoa.moa.core.InstanceExample; +import org.apache.samoa.streams.InstanceStream; + +/** + * + * @author pwawrzyniak + */ +public class KafkaEntranceProcessorTest { + +// @Tested +// private KafkaEntranceProcessor kep; + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private static final String TOPIC = "test"; + private static final int NUM_INSTANCES = 500; + + + private static KafkaServer kafkaServer; + private static EmbeddedZookeeper zkServer; + private static ZkClient zkClient; + private static String zkConnect; + + + public KafkaEntranceProcessorTest() { + } + + @BeforeClass + public static void setUpClass() throws IOException { + // setup Zookeeper + zkServer = new EmbeddedZookeeper(); + zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + + // create topic + AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + + } + + @AfterClass + public static void tearDownClass() { + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } + + @Before + public void setUp() throws IOException { + + } + + @After + public void tearDown() { + + } + + @Test + public void testFetchingNewData() throws InterruptedException, ExecutionException, TimeoutException { + + Logger logger = Logger.getLogger(KafkaEntranceProcessorTest.class.getName()); + Properties props = TestUtilsForKafka.getConsumerProperties(); + props.setProperty("auto.offset.reset", "earliest"); + KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC, 10000, new KafkaJsonMapper(Charset.defaultCharset())); + kep.onCreate(1); + +// prepare new thread for data producing + Thread th = new Thread(new Runnable() { + @Override + public void run() { + KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties()); + + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + Gson gson = new Gson(); + int i = 0; + for (i = 0; i < NUM_INSTANCES; i++) { + try { + ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC, gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes()); + long stat = producer.send(record).get(10, TimeUnit.DAYS).offset(); + Thread.sleep(5); + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, "Sent message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat}); + } catch (InterruptedException | ExecutionException | TimeoutException ex) { + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); + } + } + producer.flush(); + producer.close(); + } + }); + th.start(); + + int z = 0; + while (kep.hasNext() && z < NUM_INSTANCES) { + logger.log(Level.INFO, "{0} {1}", new Object[]{z++, kep.nextEvent().toString()}); + } + + assertEquals("Number of sent and received instances", NUM_INSTANCES, z); + + + } + +// private Properties getProducerProperties() { +// Properties producerProps = new Properties(); +//// props.setProperty("zookeeper.connect", zkConnect); +// producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); +// producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); +// producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); +//// producerProps.setProperty("group.id", "test"); +// return producerProps; +// } +// +// private Properties getConsumerProperties() { +// Properties consumerProps = new Properties(); +// consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); +// consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); +// consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); +//// consumerProps.setProperty("group.id", "test"); +// consumerProps.setProperty("group.id", "group0"); +// consumerProps.setProperty("client.id", "consumer0"); +// return consumerProps; + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java new file mode 100644 index 0000000..4cd5135 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java @@ -0,0 +1,235 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import com.google.gson.Gson; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.utils.Time; +import org.apache.samoa.instances.InstancesHeader; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; + +/** + * + * @author pwawrzyniak + */ +public class KafkaUtilsTest { + + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private static final String TOPIC_R = "test-r"; + private static final String TOPIC_S = "test-s"; + + private static KafkaServer kafkaServer; + private static EmbeddedZookeeper zkServer; + private static ZkClient zkClient; + private static String zkConnect; + + private Logger logger = Logger.getLogger(KafkaUtilsTest.class.getCanonicalName()); + private long CONSUMER_TIMEOUT = 1000; + + public KafkaUtilsTest() { + } + + @BeforeClass + public static void setUpClass() throws IOException { + // setup Zookeeper + zkServer = new EmbeddedZookeeper(); + zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafkaUtils-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + + // create topics + AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + + } + + @AfterClass + public static void tearDownClass() { + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } + + @Before + public void setUp() { + } + + @After + public void tearDown() { + } + + /** + * Test of initializeConsumer method, of class KafkaUtils. + */ + @Test + public void testInitializeConsumer() throws Exception { + logger.log(Level.INFO, "initializeConsumer"); + Collection<String> topics = Arrays.asList(TOPIC_R); + KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(), TestUtilsForKafka.getProducerProperties(), CONSUMER_TIMEOUT); + assertNotNull(instance); + + instance.initializeConsumer(topics); + + assertNotNull(instance.getKafkaMessages()); + instance.closeConsumer(); + } + + /** + * Test of getKafkaMessages method, of class KafkaUtils. + */ + @Test + public void testGetKafkaMessages() throws Exception { + logger.log(Level.INFO, "getKafkaMessages"); + Collection<String> topics = Arrays.asList(TOPIC_R); + KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(), TestUtilsForKafka.getProducerProperties(), CONSUMER_TIMEOUT); + assertNotNull(instance); + + logger.log(Level.INFO, "Initialising consumer"); + instance.initializeConsumer(topics); + + logger.log(Level.INFO, "Produce data"); + List expResult = sendAndGetMessages(500); + + logger.log(Level.INFO, "Get results from Kafka"); + List<byte[]> result = instance.getKafkaMessages(); + + assertArrayEquals(expResult.toArray(), result.toArray()); + instance.closeConsumer(); + } + + private List<byte[]> sendAndGetMessages(int maxNum) throws InterruptedException, ExecutionException, TimeoutException { + List<byte[]> ret; + try (KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties("sendM-test"))) { + ret = new ArrayList<>(); + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + Gson gson = new Gson(); + int i = 0; + for (i = 0; i < maxNum; i++) { + ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC_R, gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes()); + ret.add(record.value()); + producer.send(record); + } producer.flush(); + } + return ret; + } + + /** + * Test of sendKafkaMessage method, of class KafkaUtils. + */ + @Test + public void testSendKafkaMessage() { + logger.log(Level.INFO, "sendKafkaMessage"); + + logger.log(Level.INFO, "Initialising producer"); + KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(), TestUtilsForKafka.getProducerProperties("rcv-test"), CONSUMER_TIMEOUT); + instance.initializeProducer(); + + logger.log(Level.INFO, "Initialising consumer"); + KafkaConsumer<String, byte[]> consumer; + consumer = new KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties()); + consumer.subscribe(Arrays.asList(TOPIC_S)); + + logger.log(Level.INFO, "Produce data"); + List<byte[]> sent = new ArrayList<>(); + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + Gson gson = new Gson(); + for (int i = 0; i < 500; i++) { + byte[] val = gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes(); + sent.add(val); + instance.sendKafkaMessage(TOPIC_S, val); + } + + logger.log(Level.INFO, "Get results from Kafka"); + ConsumerRecords<String, byte[]> records = consumer.poll(CONSUMER_TIMEOUT); + Iterator<ConsumerRecord<String, byte[]>> it = records.iterator(); + List<byte[]> consumed = new ArrayList<>(); + while (it.hasNext()) { + consumed.add(it.next().value()); + } + consumer.close(); + + assertArrayEquals(sent.toArray(), consumed.toArray()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java new file mode 100644 index 0000000..0d30429 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java @@ -0,0 +1,132 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.util.Properties; +import java.util.Random; +import org.apache.samoa.instances.Attribute; +import org.apache.samoa.instances.DenseInstance; +import org.apache.samoa.instances.Instance; +import org.apache.samoa.instances.Instances; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.learners.InstanceContentEvent; +import org.apache.samoa.moa.core.FastVector; + +/** + * + * @author pwawrzyniak <your.name at your.org> + */ +public class TestUtilsForKafka { + + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private static final String TOPIC = "test"; + + protected static InstanceContentEvent getData(Random instanceRandom, int numAtts, InstancesHeader header) { + double[] attVals = new double[numAtts + 1]; + double sum = 0.0; + double sumWeights = 0.0; + for (int i = 0; i < numAtts; i++) { + attVals[i] = instanceRandom.nextDouble(); +// sum += this.weights[i] * attVals[i]; +// sumWeights += this.weights[i]; + } + int classLabel; + if (sum >= sumWeights * 0.5) { + classLabel = 1; + } else { + classLabel = 0; + } + + Instance inst = new DenseInstance(1.0, attVals); + inst.setDataset(header); + inst.setClassValue(classLabel); + + return new InstanceContentEvent(0, inst, true, false); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + protected static InstancesHeader generateHeader(int numAttributes) { + FastVector attributes = new FastVector(); + for (int i = 0; i < numAttributes; i++) { + attributes.addElement(new Attribute("att" + (i + 1))); + } + + FastVector classLabels = new FastVector(); + for (int i = 0; i < numAttributes; i++) { + classLabels.addElement("class" + (i + 1)); + } + attributes.addElement(new Attribute("class", classLabels)); + InstancesHeader streamHeader = new InstancesHeader(new Instances("test-kafka", attributes, 0)); + streamHeader.setClassIndex(streamHeader.numAttributes() - 1); + return streamHeader; + } + + + protected static Properties getProducerProperties() { + return getProducerProperties("test"); + } + + /** + * + * @param clientId + * @return + */ + protected static Properties getProducerProperties(String clientId) { + Properties producerProps = new Properties(); + producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerProps.setProperty("group.id", "test"); + producerProps.setProperty("client.id", clientId); + return producerProps; + } + + protected static Properties getConsumerProperties() { + Properties consumerProps = new Properties(); + consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + consumerProps.put("enable.auto.commit", "true"); + consumerProps.put("auto.commit.interval.ms", "1000"); + consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.setProperty("group.id", "test"); + consumerProps.setProperty("auto.offset.reset", "earliest"); +// consumerProps.setProperty("client.id", "consumer0"); + return consumerProps; + } +}
