Repository: incubator-samoa Updated Branches: refs/heads/master 17733b5e6 -> 804eac8c0 (forced update)
SAMOA-65: Apache Kafka integration components for SAMOA Fix #59 Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/804eac8c Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/804eac8c Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/804eac8c Branch: refs/heads/master Commit: 804eac8c066e2f177e4f9c4682452fc3735699ea Parents: 26c2191 Author: pwawrzyniak <[email protected]> Authored: Tue Mar 14 17:43:25 2017 +0100 Committer: Gianmarco De Francisci Morales <[email protected]> Committed: Wed Jul 26 11:54:31 2017 +0300 ---------------------------------------------------------------------- .gitignore | 31 +-- pom.xml | 2 +- samoa-api/pom.xml | 258 ++++++++++--------- .../streams/kafka/KafkaConsumerThread.java | 174 +++++++++++++ .../samoa/streams/kafka/KafkaDeserializer.java | 51 ++++ .../kafka/KafkaDestinationProcessor.java | 100 +++++++ .../streams/kafka/KafkaEntranceProcessor.java | 126 +++++++++ .../samoa/streams/kafka/KafkaSerializer.java | 52 ++++ .../apache/samoa/streams/kafka/KafkaUtils.java | 142 ++++++++++ .../java/org/apache/samoa/tasks/KafkaTask.java | 199 ++++++++++++++ samoa-api/src/main/resources/kafka.avsc | 106 ++++++++ .../kafka/KafkaDestinationProcessorTest.java | 175 +++++++++++++ .../kafka/KafkaEntranceProcessorTest.java | 185 +++++++++++++ .../samoa/streams/kafka/KafkaUtilsTest.java | 247 ++++++++++++++++++ .../samoa/streams/kafka/OosTestSerializer.java | 77 ++++++ .../samoa/streams/kafka/TestUtilsForKafka.java | 136 ++++++++++ .../org/apache/samoa/utils/SystemsUtils.java | 6 +- 17 files changed, 1935 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 294c718..a834232 100644 --- a/.gitignore +++ b/.gitignore @@ -1,15 +1,16 @@ -#maven -target/ - -#eclipse -.classpath -.project -.settings/ - -#DS_Store -.DS_Store - -#intellij -.idea/ -*.iml -*.iws +#maven +target/ + +#eclipse +.classpath +.project +.settings/ + +#DS_Store +.DS_Store + +#intellij +.idea/ +*.iml +*.iws +/samoa-api/nbproject/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ecc713d..90d6a5f 100644 --- a/pom.xml +++ b/pom.xml @@ -131,7 +131,7 @@ <jcip-annotations.version>1.0</jcip-annotations.version> <jmockit.version>1.13</jmockit.version> <junit.version>4.10</junit.version> - <kafka.version>0.8.1</kafka.version> + <kafka.version>0.10.2.0</kafka.version> <kryo.version>2.21</kryo.version> <metrics-core.version>2.2.0</metrics-core.version> <miniball.version>1.0.3</miniball.version> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-api/pom.xml b/samoa-api/pom.xml index 9f69e20..e2e007a 100644 --- a/samoa-api/pom.xml +++ b/samoa-api/pom.xml @@ -8,122 +8,150 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - #L% - --> + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +#L% +--> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - </properties> - - <name>samoa-api</name> - <description>API and algorithms for SAMOA</description> - - <artifactId>samoa-api</artifactId> - <parent> - <groupId>org.apache.samoa</groupId> - <artifactId>samoa</artifactId> - <version>0.5.0-incubating-SNAPSHOT</version> - </parent> - - <dependencies> - <dependency> - <groupId>com.yammer.metrics</groupId> - <artifactId>metrics-core</artifactId> - <version>${metrics-core.version}</version> - </dependency> - - <dependency> - <groupId>net.jcip</groupId> - <artifactId>jcip-annotations</artifactId> - <version>${jcip-annotations.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <version>${commons-lang3.version}</version> - </dependency> - - <dependency> - <groupId>com.github.javacliparser</groupId> - <artifactId>javacliparser</artifactId> - <version>${javacliparser.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.samoa</groupId> - <artifactId>samoa-instances</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - - <dependency> - <groupId>com.esotericsoftware.kryo</groupId> - <artifactId>kryo</artifactId> - <version>${kryo.version}</version> - </dependency> - - <dependency> - <groupId>com.dreizak</groupId> - <artifactId>miniball</artifactId> - <version>${miniball.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minicluster</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <version>${maven-dependency-plugin.version}</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>${project.build.directory}/lib</outputDirectory> - <overWriteReleases>false</overWriteReleases> - <overWriteSnapshots>false</overWriteSnapshots> - <overWriteIfNewer>true</overWriteIfNewer> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> + <modelVersion>4.0.0</modelVersion> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <name>samoa-api</name> + <description>API and algorithms for SAMOA</description> + + <artifactId>samoa-api</artifactId> + <parent> + <groupId>org.apache.samoa</groupId> + <artifactId>samoa</artifactId> + <version>0.5.0-incubating-SNAPSHOT</version> + </parent> + + <dependencies> + <dependency> + <groupId>com.yammer.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${metrics-core.version}</version> + </dependency> + + <dependency> + <groupId>net.jcip</groupId> + <artifactId>jcip-annotations</artifactId> + <version>${jcip-annotations.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons-lang3.version}</version> + </dependency> + + <dependency> + <groupId>com.github.javacliparser</groupId> + <artifactId>javacliparser</artifactId> + <version>${javacliparser.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.samoa</groupId> + <artifactId>samoa-instances</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + <version>${kryo.version}</version> + </dependency> + + <dependency> + <groupId>com.dreizak</groupId> + <artifactId>miniball</artifactId> + <version>${miniball.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + <classifier>test</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>${kafka.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>${kafka.version}</version> + <classifier>test</classifier> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>${maven-dependency-plugin.version}</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java new file mode 100644 index 0000000..fbd3ec6 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java @@ -0,0 +1,174 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +/** + * + * @author pwawrzyniak + */ +class KafkaConsumerThread extends Thread { + + // Consumer class for internal use to retrieve messages from Kafka + private transient KafkaConsumer<String, byte[]> consumer; + + private Logger log = Logger.getLogger(KafkaConsumerThread.class.getName()); + + private final Properties consumerProperties; + private final Collection<String> topics; + private final long consumerTimeout; + private final List<byte[]> buffer; + // used to synchronize things + private final Object lock; + private boolean running; + + /** + * Class constructor + * + * @param consumerProperties Properties of Consumer + * @param topics Topics to fetch (subscribe) + * @param consumerTimeout Timeout for data polling + */ + KafkaConsumerThread(Properties consumerProperties, Collection<String> topics, long consumerTimeout) { + this.running = false; + this.consumerProperties = consumerProperties; + this.topics = topics; + this.consumerTimeout = consumerTimeout; + this.buffer = new ArrayList<>(); + lock = new Object(); + } + + @Override + public void run() { + + initializeConsumer(); + + while (running) { + fetchDataFromKafka(); + } + + cleanUp(); + } + + /** + * Method for fetching data from Apache Kafka. It takes care of received + * data + */ + private void fetchDataFromKafka() { + if (consumer != null) { + if (!consumer.subscription().isEmpty()) { + try { + List<byte[]> kafkaMsg = getMessagesBytes(consumer.poll(consumerTimeout)); + fillBufferAndNotifyWaits(kafkaMsg); + } catch (Throwable t) { + Logger.getLogger(KafkaConsumerThread.class.getName()).log(Level.SEVERE, null, t); + } + } + } + } + + /** + * Copies received messages to class buffer and notifies Processor to grab + * the data. + * + * @param kafkaMsg Messages received from Kafka + */ + private void fillBufferAndNotifyWaits(List<byte[]> kafkaMsg) { + synchronized (lock) { + buffer.addAll(kafkaMsg); + if (buffer.size() > 0) { + lock.notifyAll(); + } + } + } + + private void cleanUp() { + // clean resources + if (consumer != null) { + consumer.unsubscribe(); + consumer.close(); + } + } + + private void initializeConsumer() { + // lazy instantiation + log.log(Level.INFO, "Instantiating Kafka consumer"); + if (consumer == null) { + consumer = new KafkaConsumer<>(consumerProperties); + running = true; + } + consumer.subscribe(topics); + } + + private List<byte[]> getMessagesBytes(ConsumerRecords<String, byte[]> poll) { + Iterator<ConsumerRecord<String, byte[]>> iterator = poll.iterator(); + List<byte[]> ret = new ArrayList<>(); + while (iterator.hasNext()) { + ret.add(iterator.next().value()); + } + return ret; + } + + void close() { + running = false; + } + + List<byte[]> getKafkaMessages() { + synchronized (lock) { + if (buffer.isEmpty()) { + try { + // block the call until new messages are received + lock.wait(); + } catch (InterruptedException ex) { + Logger.getLogger(KafkaConsumerThread.class.getName()).log(Level.SEVERE, null, ex); + } + } + ArrayList<byte[]> ret = new ArrayList<>(); + // copy buffer to return list + ret.addAll(buffer); + // clear message buffer + buffer.clear(); + return ret; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java new file mode 100644 index 0000000..459c491 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import org.apache.samoa.core.ContentEvent; + +/** + * + * @author pwawrzyniak + * @param <T> the class that would be deserialized + */ +public interface KafkaDeserializer<T extends ContentEvent> { + + // TODO: Consider key-value schema? + /** + * Method that provides deserialization algorithm + * @param message Message as received from Apache Kafka + * @return Deserialized form of message, to be passed to topology + */ + T deserialize(byte[] message); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java new file mode 100644 index 0000000..231e25d --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java @@ -0,0 +1,100 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; + +/** + * Destination processor that writes data to Apache Kafka + * @author pwawrzyniak + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + */ +public class KafkaDestinationProcessor implements Processor { + + @Override + protected void finalize() throws Throwable { + super.finalize(); + kafkaUtils.closeProducer(); + } + + private final KafkaUtils kafkaUtils; + private final String topic; + private final KafkaSerializer serializer; + + /** + * Class constructor + * @param props Properties of Kafka Producer + * @see <a href="http://kafka.apache.org/documentation/#producerconfigs">Kafka Producer configuration</a> + * @param topic Topic this destination processor will write into + * @param serializer Implementation of KafkaSerializer that handles arriving data serialization + */ + public KafkaDestinationProcessor(Properties props, String topic, KafkaSerializer serializer) { + this.kafkaUtils = new KafkaUtils(null, props, 0); + this.topic = topic; + this.serializer = serializer; + } + + private KafkaDestinationProcessor(KafkaUtils kafkaUtils, String topic, KafkaSerializer serializer){ + this.kafkaUtils = kafkaUtils; + this.topic = topic; + this.serializer = serializer; + } + + @Override + public boolean process(ContentEvent event) { + try { + kafkaUtils.sendKafkaMessage(topic, serializer.serialize(event)); + } catch (Exception ex) { + Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, null, ex); + return false; + } + return true; + } + + @Override + public void onCreate(int id) { + kafkaUtils.initializeProducer(); + } + + @Override + public Processor newProcessor(Processor processor) { + KafkaDestinationProcessor kdp = (KafkaDestinationProcessor)processor; + return new KafkaDestinationProcessor(new KafkaUtils(kdp.kafkaUtils), kdp.topic, kdp.serializer); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java new file mode 100644 index 0000000..866a457 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java @@ -0,0 +1,126 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; + +/** + * Entrance processor that reads incoming messages from <a href="https://kafka.apache.org/">Apache Kafka</a> + * @author pwawrzyniak + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + */ +public class KafkaEntranceProcessor implements EntranceProcessor { + + transient private final KafkaUtils kafkaUtils; + private List<byte[]> buffer; + private final KafkaDeserializer deserializer; + private final String topic; + + /** + * Class constructor + * @param props Properties of Kafka consumer + * @see <a href="https://kafka.apache.org/documentation/#newconsumerconfigs"> Apache Kafka consumer configuration</a> + * @param topic Topic from which the messages should be read + * @param timeout Timeout used when polling Kafka for new messages + * @param deserializer Instance of the implementation of {@link KafkaDeserializer} + */ + public KafkaEntranceProcessor(Properties props, String topic, int timeout, KafkaDeserializer deserializer) { + this.kafkaUtils = new KafkaUtils(props, null, timeout); + this.deserializer = deserializer; + this.topic = topic; + } + + private KafkaEntranceProcessor(KafkaUtils kafkaUtils, KafkaDeserializer deserializer, String topic) { + this.kafkaUtils = kafkaUtils; + this.deserializer = deserializer; + this.topic = topic; + } + + @Override + public void onCreate(int id) { + this.buffer = new ArrayList<>(100); + this.kafkaUtils.initializeConsumer(Arrays.asList(this.topic)); + } + + @Override + public boolean isFinished() { + return false; + } + + @Override + public boolean hasNext() { + if (buffer.isEmpty()) { + try { + buffer.addAll(kafkaUtils.getKafkaMessages()); + } catch (Exception ex) { + Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, null, ex); + } + } + return buffer.size() > 0; + } + + @Override + public ContentEvent nextEvent() { + // assume this will never be called when buffer is empty! + return this.deserializer.deserialize(buffer.remove(0)); + } + + @Override + public boolean process(ContentEvent event) { + return false; + } + + @Override + public Processor newProcessor(Processor processor) { + KafkaEntranceProcessor kep = (KafkaEntranceProcessor) processor; + return new KafkaEntranceProcessor(new KafkaUtils(kep.kafkaUtils), kep.deserializer, kep.topic); + } + + @Override + protected void finalize() throws Throwable { + kafkaUtils.closeConsumer(); + super.finalize(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java new file mode 100644 index 0000000..2bbc259 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import org.apache.samoa.core.ContentEvent; + +/** + * + * @author pwawrzyniak + * @param <T> the class that would be serialized + */ +public interface KafkaSerializer<T extends ContentEvent> { + + // TODO: Consider Key-Value schema? + + /** + * Method that provides serialization algorithm + * @param message Message received from topology, to be serialized + * @return Serialized form of the message + */ + byte[] serialize(T message); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java new file mode 100644 index 0000000..fb3aef7 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java @@ -0,0 +1,142 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +/** + * Internal class responsible for Kafka Stream handling (both consume and + * produce) + * + * @author pwawrzyniak + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + */ +class KafkaUtils { + + private transient KafkaConsumerThread kafkaConsumerThread; + + private transient KafkaProducer<String, byte[]> producer; + + // Properties of the consumer, as defined in Kafka documentation + private final Properties consumerProperties; + private final Properties producerProperties; + + // Timeout for Kafka Consumer + private long consumerTimeout; + + + /** + * Class constructor + * + * @param consumerProperties Properties of consumer + * @param producerProperties Properties of producer + * @param consumerTimeout Timeout for consumer poll requests + */ + public KafkaUtils(Properties consumerProperties, Properties producerProperties, long consumerTimeout) { + this.consumerProperties = consumerProperties; + this.producerProperties = producerProperties; + this.consumerTimeout = consumerTimeout; + } + + /** + * Creates new KafkaUtils from existing instance + * @param kafkaUtils Instance of KafkaUtils + */ + KafkaUtils(KafkaUtils kafkaUtils) { + this.consumerProperties = kafkaUtils.consumerProperties; + this.producerProperties = kafkaUtils.producerProperties; + this.consumerTimeout = kafkaUtils.consumerTimeout; + } + + /** + * Method used to initialize Kafka Consumer Thread, i.e. instantiate it and + * subscribe to configured topic + * + * @param topics List of Kafka topics that consumer should subscribe to + */ + public void initializeConsumer(Collection<String> topics) { + kafkaConsumerThread = new KafkaConsumerThread(consumerProperties, topics, consumerTimeout); + kafkaConsumerThread.start(); + } + + public void closeConsumer() { + kafkaConsumerThread.close(); + } + + public void initializeProducer() { + // lazy instantiation + if (producer == null) { + producer = new KafkaProducer<>(producerProperties); + } + } + + public void closeProducer(){ + if(producer != null){ + producer.close(1, TimeUnit.MINUTES); + } + } + + /** + * Method for reading new messages from Kafka topics + * + * @return Collection of read messages + * @throws Exception Exception is thrown when consumer was not initialized + * or is not subscribed to any topic. + */ + public List<byte[]> getKafkaMessages() throws Exception { + return kafkaConsumerThread.getKafkaMessages(); + } + + public long sendKafkaMessage(String topic, byte[] message) { + if (producer != null) { + try{ + ProducerRecord<String, byte[]> record = new ProducerRecord(topic, message); + long offset = producer.send(record).get(10, TimeUnit.SECONDS).offset(); + producer.flush(); + return offset; + } catch(InterruptedException | ExecutionException | TimeoutException e){ + Logger.getLogger(KafkaUtils.class.getName()).log(Level.SEVERE, null, e); + } + + } + return -1; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/main/java/org/apache/samoa/tasks/KafkaTask.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/tasks/KafkaTask.java b/samoa-api/src/main/java/org/apache/samoa/tasks/KafkaTask.java new file mode 100644 index 0000000..f0597a8 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/tasks/KafkaTask.java @@ -0,0 +1,199 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.tasks; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import com.github.javacliparser.ClassOption; +import java.util.Properties; + +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; +import org.apache.samoa.topology.TopologyBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.Configurable; +import com.github.javacliparser.IntOption; +import com.github.javacliparser.StringOption; +import java.text.SimpleDateFormat; +import java.util.Date; +import org.apache.samoa.streams.kafka.KafkaDeserializer; +import org.apache.samoa.streams.kafka.KafkaDestinationProcessor; +import org.apache.samoa.streams.kafka.KafkaEntranceProcessor; +import org.apache.samoa.streams.kafka.KafkaSerializer; + +/** + * Kafka task + * + * @author Jakub Jankowski + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + * + */ +public class KafkaTask implements Task, Configurable { + + private static final long serialVersionUID = 3984474041982397855L; + private static Logger logger = LoggerFactory.getLogger(KafkaTask.class); + + Properties producerProps; + Properties consumerProps; + int timeout; + private KafkaDeserializer deserializer; + private KafkaSerializer serializer; + private String inTopic; + private String outTopic; + + private TopologyBuilder builder; + private Topology kafkaTopology; + + public IntOption kafkaParallelismOption = new IntOption("parallelismOption", 'p', + "Number of destination Processors", 1, 1, Integer.MAX_VALUE); + + public IntOption timeoutOption = new IntOption("timeout", 't', + "Kafka consumer timeout", 1, 1, Integer.MAX_VALUE); + + public StringOption inputBrokerOption = new StringOption("inputBroker", 'r', "Input brokers addresses", + "inputTopic"); + + public StringOption outputBrokerOption = new StringOption("outputBroker", 's', "Output brokers name", + "inputTopic"); + + public StringOption inputTopicOption = new StringOption("inputTopic", 'i', "Input topic name", + "inputTopic"); + + public StringOption outputTopicOption = new StringOption("outputTopic", 'o', "Output topic name", + "outputTopic"); + + public ClassOption serializerOption = new ClassOption("serializer", 'w', + "Serializer class name", + KafkaSerializer.class, KafkaSerializer.class.getName()); + + public ClassOption deserializerOption = new ClassOption("deserializer", 'd', + "Deserializer class name", + KafkaDeserializer.class, KafkaDeserializer.class.getName()); + + public StringOption taskNameOption = new StringOption("taskName", 'n', "Identifier of the task", + "KafkaTask" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); + + /** + * Class constructor (for tests purposes) + * + * @param producerProps Properties of Kafka Producer and Consumer + * @see <a href="http://kafka.apache.org/documentation/#producerconfigs">Kafka + * Producer configuration</a> + * @param consumerProps Properties of Kafka Producer and Consumer + * @see <a href="http://kafka.apache.org/documentation/#consumerconfigs">Kafka + * Consumer configuration</a> + * @param inTopic Topic to which destination processor will read from + * @param outTopic Topic to which destination processor will write into + * @param timeout Timeout used when polling Kafka for new messages + * @param serializer Implementation of KafkaSerializer that handles arriving + * data serialization + * @param deserializer Implementation of KafkaDeserializer that handles + * arriving data deserialization + */ + public KafkaTask(Properties producerProps, Properties consumerProps, String inTopic, String outTopic, int timeout, KafkaSerializer serializer, KafkaDeserializer deserializer) { + this.producerProps = producerProps; + this.consumerProps = consumerProps; + this.deserializer = deserializer; + this.serializer = serializer; + this.inTopic = inTopic; + this.outTopic = outTopic; + this.timeout = timeout; + } + + /** + * Class constructor + */ + public KafkaTask() { + + } + + @Override + public void init() { + producerProps = new Properties(); + producerProps.setProperty("bootstrap.servers", outputBrokerOption.getValue()); + + consumerProps = new Properties(); + consumerProps.setProperty("bootstrap.servers", inputBrokerOption.getValue()); + + serializer = serializerOption.getValue(); + + deserializer = deserializerOption.getValue(); + + inTopic = inputTopicOption.getValue(); + outTopic = outputTopicOption.getValue(); + + timeout = timeoutOption.getValue(); + + logger.info("Invoking init"); + if (builder == null) { + builder = new TopologyBuilder(); + logger.info("Successfully instantiating TopologyBuilder"); + + builder.initTopology(taskNameOption.getValue()); + logger.info("Successfully initializing SAMOA topology with name {}", taskNameOption.getValue()); + } + + // create enterance processor + KafkaEntranceProcessor sourceProcessor = new KafkaEntranceProcessor(consumerProps, inTopic, timeout, deserializer); + builder.addEntranceProcessor(sourceProcessor); + + // create stream + Stream stream = builder.createStream(sourceProcessor); + + // create destination processor + KafkaDestinationProcessor destProcessor = new KafkaDestinationProcessor(producerProps, outTopic, serializer); + builder.addProcessor(destProcessor, kafkaParallelismOption.getValue()); + builder.connectInputShuffleStream(stream, destProcessor); + + // build topology + kafkaTopology = builder.build(); + logger.info("Successfully built the topology"); + } + + @Override + public Topology getTopology() { + return kafkaTopology; + } + + @Override + public void setFactory(ComponentFactory factory) { + logger.info("Invoking setFactory: " + factory.toString()); + builder = new TopologyBuilder(factory); + logger.info("Successfully instantiating TopologyBuilder"); + + builder.initTopology(taskNameOption.getValue()); + logger.info("Successfully initializing SAMOA topology with name {}", taskNameOption.getValue()); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/main/resources/kafka.avsc ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/resources/kafka.avsc b/samoa-api/src/main/resources/kafka.avsc new file mode 100644 index 0000000..f5f12cf --- /dev/null +++ b/samoa-api/src/main/resources/kafka.avsc @@ -0,0 +1,106 @@ +[ +{ + "namespace": "org.apache.samoa.streams.kafka.temp", + "type": "record", + "name": "BurrTest", + "fields": [ + {"name":"name", "type": "string"}, + {"name":"atrs", "type": {"type": "array", "items": "string"}}, + {"name":"nums", "type": {"type": "array", "items": "int"}}, + {"name":"list", "type": {"type": "array", "items": "string"}} + ] +}, +{ + "namespace": "org.apache.samoa.instances", + "type": "record", + "name": "Instance", + "fields": [ + ] +}, +{ + "namespace": "org.apache.samoa.instances", + "type": "record", + "name": "InstanceData", + "fields": [ + ] +}, +{ + "namespace": "org.apache.samoa.instances", + "type": "record", + "name": "SingleClassInstanceData", + "fields": [ + {"name":"classValue", "type": "double"} + ] +}, +{ + "namespace": "org.apache.samoa.instances", + "type": "record", + "name": "DenseInstanceData", + "fields": [ + {"name":"attributeValues", "type": {"type": "array", "items": "double"}} + ] +}, +{ + "namespace": "org.apache.samoa.instances", + "type": "record", + "name": "SparseInstanceData", + "fields": [ + {"name":"attributeValues", "type": {"type": "array", "items": "double"}}, + {"name":"indexValues", "type": {"type": "array", "items": "int"}}, + {"name":"numberAttributes", "type": "int"} + ] +}, +{ + "namespace": "org.apache.samoa.instances", + "type": "record", + "name": "SingleLabelInstance", + "fields": [ + {"name": "weight", "type": "double"}, + {"name": "instanceData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]}, + {"name": "classData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]} + ] +}, +{ + "namespace": "org.apache.samoa.instances", + "type": "record", + "name": "DenseInstance", + "fields": [ + {"name": "weight", "type": "double"}, + {"name": "instanceData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]}, + {"name": "classData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]} + ] +}, +{ + "namespace": "org.apache.samoa.core", + "type": "record", + "name": "SerializableInstance", + "fields": [ + {"name": "weight", "type": "double"}, + {"name": "instanceData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]}, + {"name": "classData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]} + ] +}, +{ + "namespace": "org.apache.samoa.learners", + "type": "record", + "name": "InstanceContent", + "fields": [ + {"name": "instanceIndex", "type": "long"}, + {"name": "classifierIndex", "type": "int"}, + {"name": "evaluationIndex", "type": "int"}, + {"name":"instance", "type":"org.apache.samoa.core.SerializableInstance"}, + {"name": "isTraining", "type": "boolean"}, + {"name": "isTesting", "type": "boolean"}, + {"name": "isLast", "type": "boolean"} + ] +}, +{ + "namespace": "org.apache.samoa.learners", + "type": "record", + "name": "InstanceContentEvent", + "fields": [ + {"name": "instanceContent", "type": "org.apache.samoa.learners.InstanceContent"} + ] +} +] + http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java new file mode 100644 index 0000000..930ab23 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java @@ -0,0 +1,175 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.utils.Time; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.learners.InstanceContentEvent; +import org.junit.After; +import org.junit.AfterClass; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * + * @author pwawrzyniak + */ +public class KafkaDestinationProcessorTest { + + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private static final String TOPIC = "test-kdp"; + private static final int NUM_INSTANCES = 11111; + private static final int CONSUMER_TIMEOUT = 1000; + + private static KafkaServer kafkaServer; + private static EmbeddedZookeeper zkServer; + private static ZkClient zkClient; + private static String zkConnect; + + public KafkaDestinationProcessorTest() { + } + + @BeforeClass + public static void setUpClass() throws IOException { + // setup Zookeeper + zkServer = new EmbeddedZookeeper(); + zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + + // create topic + AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + + } + + @AfterClass + public static void tearDownClass() { + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } + + @Before + public void setUp() throws IOException { + + } + + @After + public void tearDown() { + + } + + @Test + public void testSendingData() throws InterruptedException, ExecutionException, TimeoutException { + + final Logger logger = Logger.getLogger(KafkaDestinationProcessorTest.class.getName()); + Properties props = TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT); + props.setProperty("auto.offset.reset", "earliest"); + KafkaDestinationProcessor kdp = new KafkaDestinationProcessor(props, TOPIC, new OosTestSerializer()); + kdp.onCreate(1); + + final int[] i = {0}; + + // prepare new thread for data receiveing + Thread th = new Thread(new Runnable() { + @Override + public void run() { + KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT)); + consumer.subscribe(Arrays.asList(TOPIC)); + while (i[0] < NUM_INSTANCES) { + try { + ConsumerRecords<String, byte[]> cr = consumer.poll(CONSUMER_TIMEOUT); + Iterator<ConsumerRecord<String, byte[]>> it = cr.iterator(); + while (it.hasNext()) { + ConsumerRecord<String, byte[]> record = it.next(); + i[0]++; + } + } catch (Exception ex) { + Logger.getLogger(KafkaDestinationProcessorTest.class.getName()).log(Level.SEVERE, null, ex); + } + } + consumer.close(); + } + }); + th.start(); + + int z = 0; + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + + for (z = 0; z < NUM_INSTANCES; z++) { + InstanceContentEvent event = TestUtilsForKafka.getData(r, 10, header); + kdp.process(event); +// logger.log(Level.INFO, "{0} {1}", new Object[]{"Sent item with id: ", z}); + } + + // wait for all instances to be read + Thread.sleep(2 * CONSUMER_TIMEOUT); + assertEquals("Number of sent and received instances", z, i[0]); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java new file mode 100644 index 0000000..55c3b85 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java @@ -0,0 +1,185 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import com.google.gson.Gson; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.samoa.learners.InstanceContentEvent; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import org.apache.kafka.common.utils.Time; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.samoa.instances.InstancesHeader; + +/** + * + * @author pwawrzyniak + * @author Jakub Jankowski + */ +public class KafkaEntranceProcessorTest { + + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private static final String TOPIC_OOS = "samoa_test-oos"; + private static final int NUM_INSTANCES = 11111; + + private static KafkaServer kafkaServer; + private static EmbeddedZookeeper zkServer; + private static ZkClient zkClient; + private static String zkConnect; + private static final int TIMEOUT = 1000; + + public KafkaEntranceProcessorTest() { + } + + @BeforeClass + public static void setUpClass() throws IOException { + // setup Zookeeper + zkServer = new EmbeddedZookeeper(); + zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + + // create topics + AdminUtils.createTopic(zkUtils, TOPIC_OOS, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + + } + + @AfterClass + public static void tearDownClass() { + try { + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } catch (Exception ex) { + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); + } + } + + @Before + public void setUp() throws IOException { + + } + + @After + public void tearDown() { + + } + + @Test + public void testFetchingNewData() throws InterruptedException, ExecutionException, TimeoutException { + + final Logger logger = Logger.getLogger(KafkaEntranceProcessorTest.class.getName()); + logger.log(Level.INFO, "OOS"); + logger.log(Level.INFO, "testFetchingNewData"); + Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT); + props.setProperty("auto.offset.reset", "earliest"); + KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC_OOS, TIMEOUT, new OosTestSerializer()); + + kep.onCreate(1); + + // prepare new thread for data producing + Thread th = new Thread(new Runnable() { + @Override + public void run() { + KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT)); + + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + OosTestSerializer serializer = new OosTestSerializer(); + int i = 0; + for (i = 0; i < NUM_INSTANCES; i++) { + try { + InstanceContentEvent event = TestUtilsForKafka.getData(r, 10, header); + + ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC_OOS, serializer.serialize(event)); + long stat = producer.send(record).get(10, TimeUnit.SECONDS).offset(); + } catch (InterruptedException | ExecutionException | TimeoutException ex) { + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); + } + } + producer.flush(); + producer.close(); + } + }); + th.start(); + + int z = 0; + while (z < NUM_INSTANCES && kep.hasNext()) { + InstanceContentEvent event = (InstanceContentEvent) kep.nextEvent(); + z++; + } + + assertEquals("Number of sent and received instances", NUM_INSTANCES, z); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java new file mode 100644 index 0000000..186d97b --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java @@ -0,0 +1,247 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import com.google.gson.Gson; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.utils.Time; +import org.apache.samoa.instances.InstancesHeader; +import org.junit.After; +import org.junit.AfterClass; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * + * @author pwawrzyniak + */ +public class KafkaUtilsTest { + + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private static final String TOPIC_R = "test-r"; + private static final String TOPIC_S = "test-s"; + private static final int NUM_INSTANCES = 50; + + private static KafkaServer kafkaServer; + private static EmbeddedZookeeper zkServer; + private static ZkClient zkClient; + private static String zkConnect; + + private static final Logger logger = Logger.getLogger(KafkaUtilsTest.class.getCanonicalName()); + private final long CONSUMER_TIMEOUT = 1500; + + public KafkaUtilsTest() { + } + + @BeforeClass + public static void setUpClass() throws IOException { + // setup Zookeeper + zkServer = new EmbeddedZookeeper(); + zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafkaUtils-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + + // create topics + AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + + } + + @AfterClass + public static void tearDownClass() { + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } + + @Before + public void setUp() { + } + + @After + public void tearDown() { + } + + /** + * Test of initializeConsumer method, of class KafkaUtils. + */ + @Test + public void testInitializeConsumer() throws Exception { + logger.log(Level.INFO, "initializeConsumer"); + Collection<String> topics = Arrays.asList(TOPIC_R); + KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT), TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT), CONSUMER_TIMEOUT); + assertNotNull(instance); + + instance.initializeConsumer(topics); + Thread.sleep(1000); + instance.closeConsumer(); + + Thread.sleep(CONSUMER_TIMEOUT); + + instance.initializeConsumer(topics); + Thread.sleep(1000); + instance.closeConsumer(); + assertTrue(true); + } + + /** + * Test of getKafkaMessages method, of class KafkaUtils. + */ + @Test + public void testGetKafkaMessages() throws Exception { + logger.log(Level.INFO, "getKafkaMessages"); + Collection<String> topics = Arrays.asList(TOPIC_R); + KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT), TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT), CONSUMER_TIMEOUT); + assertNotNull(instance); + + logger.log(Level.INFO, "Initialising consumer"); + instance.initializeConsumer(topics); + + logger.log(Level.INFO, "Produce data"); + List expResult = sendAndGetMessages(NUM_INSTANCES); + + logger.log(Level.INFO, "Wait a moment"); + Thread.sleep(CONSUMER_TIMEOUT); + + logger.log(Level.INFO, "Get results from Kafka"); + List<byte[]> result = instance.getKafkaMessages(); + + assertArrayEquals(expResult.toArray(), result.toArray()); + instance.closeConsumer(); + } + + private List<byte[]> sendAndGetMessages(int maxNum) throws InterruptedException, ExecutionException, TimeoutException { + List<byte[]> ret; + try (KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties("sendM-test", BROKERHOST, BROKERPORT))) { + ret = new ArrayList<>(); + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + Gson gson = new Gson(); + int i = 0; + for (i = 0; i < maxNum; i++) { + ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC_R, gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes()); + ret.add(record.value()); + producer.send(record); + } + producer.flush(); + } + return ret; + } + + /** + * Test of sendKafkaMessage method, of class KafkaUtils. + * + * @throws java.lang.InterruptedException + */ + @Test + public void testSendKafkaMessage() throws InterruptedException { + logger.log(Level.INFO, "sendKafkaMessage"); + + logger.log(Level.INFO, "Initialising producer"); + KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT), TestUtilsForKafka.getProducerProperties("rcv-test", BROKERHOST, BROKERPORT), CONSUMER_TIMEOUT); + instance.initializeProducer(); + + logger.log(Level.INFO, "Initialising consumer"); + KafkaConsumer<String, byte[]> consumer; + consumer = new KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT)); + consumer.subscribe(Arrays.asList(TOPIC_S)); + + logger.log(Level.INFO, "Produce data"); + List<byte[]> sent = new ArrayList<>(); + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + Gson gson = new Gson(); + for (int i = 0; i < NUM_INSTANCES; i++) { + byte[] val = gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes(); + sent.add(val); + instance.sendKafkaMessage(TOPIC_S, val); + } + // wait for Kafka a bit :) + Thread.sleep(2 * CONSUMER_TIMEOUT); + + logger.log(Level.INFO, "Get results from Kafka"); + + List<byte[]> consumed = new ArrayList<>(); + + while (consumed.size() != sent.size()) { + ConsumerRecords<String, byte[]> records = consumer.poll(CONSUMER_TIMEOUT); + Iterator<ConsumerRecord<String, byte[]>> it = records.iterator(); + while (it.hasNext()) { + consumed.add(it.next().value()); + } + } + consumer.close(); + + assertArrayEquals(sent.toArray(), consumed.toArray()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java new file mode 100644 index 0000000..14535bb --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java @@ -0,0 +1,77 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.samoa.learners.InstanceContentEvent; + +/** + * + * @author Piotr Wawrzyniak + */ +public class OosTestSerializer implements KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent> { + + @Override + public InstanceContentEvent deserialize(byte[] message) { + try { + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(message)); + InstanceContentEvent ice = (InstanceContentEvent)ois.readObject(); + return ice; + } catch (IOException | ClassNotFoundException ex) { + Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex); + } + return null; + } + + @Override + public byte[] serialize(InstanceContentEvent message) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(message); + oos.flush(); + return baos.toByteArray(); + } catch (IOException ex) { + Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex); + } + return null; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java new file mode 100644 index 0000000..8936759 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java @@ -0,0 +1,136 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.util.Properties; +import java.util.Random; +import org.apache.samoa.instances.Attribute; +import org.apache.samoa.instances.DenseInstance; +import org.apache.samoa.instances.Instance; +import org.apache.samoa.instances.Instances; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.learners.InstanceContentEvent; +import org.apache.samoa.moa.core.FastVector; + +/** + * + * @author pwawrzyniak + */ +public class TestUtilsForKafka { + + protected static InstanceContentEvent getData(Random instanceRandom, int numAtts, InstancesHeader header) { + double[] attVals = new double[numAtts + 1]; + double sum = 0.0; + double sumWeights = 0.0; + for (int i = 0; i < numAtts; i++) { + attVals[i] = instanceRandom.nextDouble(); + + } + int classLabel; + if (sum >= sumWeights * 0.5) { + classLabel = 1; + } else { + classLabel = 0; + } + + Instance inst = new DenseInstance(1.0, attVals); + inst.setDataset(header); + inst.setClassValue(classLabel); + + return new InstanceContentEvent(0, inst, true, false); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + protected static InstancesHeader generateHeader(int numAttributes) { + FastVector attributes = new FastVector(); + for (int i = 0; i < numAttributes; i++) { + attributes.addElement(new Attribute("att" + (i + 1))); + } + + FastVector classLabels = new FastVector(); + for (int i = 0; i < numAttributes; i++) { + classLabels.addElement("class" + (i + 1)); + } + attributes.addElement(new Attribute("class", classLabels)); + InstancesHeader streamHeader = new InstancesHeader(new Instances("test-kafka", attributes, 0)); + streamHeader.setClassIndex(streamHeader.numAttributes() - 1); + return streamHeader; + } + + + protected static Properties getProducerProperties(String BROKERHOST, String BROKERPORT) { + return getProducerProperties("test", BROKERHOST, BROKERPORT); + } + + /** + * + * @param clientId + * @return + */ + protected static Properties getProducerProperties(String clientId, String BROKERHOST, String BROKERPORT) { + Properties producerProps = new Properties(); + producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerProps.setProperty("group.id", "test"); + producerProps.setProperty("client.id", clientId); + return producerProps; + } + + protected static Properties getConsumerProperties(String BROKERHOST, String BROKERPORT) { + Properties consumerProps = new Properties(); + consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + consumerProps.put("enable.auto.commit", "true"); + consumerProps.put("auto.commit.interval.ms", "1000"); + consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.setProperty("group.id", "test"); + consumerProps.setProperty("auto.offset.reset", "earliest"); + return consumerProps; + } + + protected static Properties getConsumerProducerProperties(String BROKERHOST, String BROKERPORT) { + Properties props = new Properties(); + props.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + props.put("enable.auto.commit", "true"); + props.put("auto.commit.interval.ms", "1000"); + props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.setProperty("group.id", "burrito"); + props.setProperty("auto.offset.reset", "earliest"); + props.setProperty("client.id", "burrito"); + return props; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java b/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java index ad2b383..d1e3a53 100644 --- a/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java +++ b/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java @@ -32,7 +32,9 @@ import java.util.Map; import java.util.Properties; import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; import kafka.utils.ZKStringSerializer; +import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkMarshallingError; @@ -72,7 +74,9 @@ public class SystemsUtils { * Create Kafka topic/stream */ static void createKafkaTopic(String name, int partitions, int replicas) { - AdminUtils.createTopic(zkClient, name, partitions, replicas, new Properties()); + // Fix for Apache Kafka 0.10 + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + AdminUtils.createTopic(zkUtils, name, partitions, replicas, new Properties(), RackAwareMode.Disabled$.MODULE$); } static class ZKStringSerializerWrapper implements ZkSerializer {
