Initial structure of Kafka components. Initital code for Kafka Consumer
Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/4453f1f1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/4453f1f1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/4453f1f1 Branch: refs/heads/master Commit: 4453f1f10df1ebe93dff7ae831162714ad37a9a3 Parents: 26c2191 Author: pwawrzyniak <[email protected]> Authored: Tue Mar 14 17:43:25 2017 +0100 Committer: nkourtellis <[email protected]> Committed: Fri Jul 21 21:12:18 2017 +0300 ---------------------------------------------------------------------- samoa-api/pom.xml | 208 ++++++++++--------- .../samoa/streams/kafka/KafkaDeserializer.java | 30 +++ .../kafka/KafkaDestinationProcessor.java | 42 ++++ .../streams/kafka/KafkaEntranceProcessor.java | 65 ++++++ .../samoa/streams/kafka/KafkaSerializer.java | 31 +++ .../apache/samoa/streams/kafka/KafkaUtils.java | 71 +++++++ 6 files changed, 346 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/4453f1f1/samoa-api/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-api/pom.xml b/samoa-api/pom.xml index 9f69e20..4621b93 100644 --- a/samoa-api/pom.xml +++ b/samoa-api/pom.xml @@ -11,119 +11,125 @@ http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - #L% - --> +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +#L% +--> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - </properties> + <modelVersion>4.0.0</modelVersion> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> - <name>samoa-api</name> - <description>API and algorithms for SAMOA</description> + <name>samoa-api</name> + <description>API and algorithms for SAMOA</description> - <artifactId>samoa-api</artifactId> - <parent> - <groupId>org.apache.samoa</groupId> - <artifactId>samoa</artifactId> - <version>0.5.0-incubating-SNAPSHOT</version> - </parent> + <artifactId>samoa-api</artifactId> + <parent> + <groupId>org.apache.samoa</groupId> + <artifactId>samoa</artifactId> + <version>0.5.0-incubating-SNAPSHOT</version> + </parent> - <dependencies> - <dependency> - <groupId>com.yammer.metrics</groupId> - <artifactId>metrics-core</artifactId> - <version>${metrics-core.version}</version> - </dependency> + <dependencies> + <dependency> + <groupId>com.yammer.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${metrics-core.version}</version> + </dependency> - <dependency> - <groupId>net.jcip</groupId> - <artifactId>jcip-annotations</artifactId> - <version>${jcip-annotations.version}</version> - </dependency> + <dependency> + <groupId>net.jcip</groupId> + <artifactId>jcip-annotations</artifactId> + <version>${jcip-annotations.version}</version> + </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <version>${commons-lang3.version}</version> - </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons-lang3.version}</version> + </dependency> - <dependency> - <groupId>com.github.javacliparser</groupId> - <artifactId>javacliparser</artifactId> - <version>${javacliparser.version}</version> - </dependency> + <dependency> + <groupId>com.github.javacliparser</groupId> + <artifactId>javacliparser</artifactId> + <version>${javacliparser.version}</version> + </dependency> - <dependency> - <groupId>org.apache.samoa</groupId> - <artifactId>samoa-instances</artifactId> - <version>${project.version}</version> - </dependency> + <dependency> + <groupId>org.apache.samoa</groupId> + <artifactId>samoa-instances</artifactId> + <version>${project.version}</version> + </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> - <dependency> - <groupId>com.esotericsoftware.kryo</groupId> - <artifactId>kryo</artifactId> - <version>${kryo.version}</version> - </dependency> + <dependency> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + <version>${kryo.version}</version> + </dependency> - <dependency> - <groupId>com.dreizak</groupId> - <artifactId>miniball</artifactId> - <version>${miniball.version}</version> - </dependency> + <dependency> + <groupId>com.dreizak</groupId> + <artifactId>miniball</artifactId> + <version>${miniball.version}</version> + </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minicluster</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - </dependency> - </dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.2.0</version> + </dependency> + </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <version>${maven-dependency-plugin.version}</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>${project.build.directory}/lib</outputDirectory> - <overWriteReleases>false</overWriteReleases> - <overWriteSnapshots>false</overWriteSnapshots> - <overWriteIfNewer>true</overWriteIfNewer> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>${maven-dependency-plugin.version}</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/4453f1f1/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java new file mode 100644 index 0000000..2c7dae1 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java @@ -0,0 +1,30 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import org.apache.samoa.core.ContentEvent; + +/** + * + * @author pwawrzyniak + * @param <T> the class that would be deserialized + */ +public interface KafkaDeserializer<T extends ContentEvent> { + + // TODO: Consider key-value schema? + + T deserialize(byte[] message); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/4453f1f1/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java new file mode 100644 index 0000000..ed8f164 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java @@ -0,0 +1,42 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; + +/** + * + * @author pwawrzyniak + */ +public class KafkaDestinationProcessor implements Processor { + + @Override + public boolean process(ContentEvent event) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public void onCreate(int id) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public Processor newProcessor(Processor processor) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/4453f1f1/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java new file mode 100644 index 0000000..228e81b --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java @@ -0,0 +1,65 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import java.util.Properties; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; + +/** + * + * @author pwawrzyniak + */ +public class KafkaEntranceProcessor implements EntranceProcessor { + + transient private KafkaUtils kafkaUtils; + + public KafkaEntranceProcessor(Properties props, String topic, int batchSize) { + kafkaUtils = new KafkaUtils(props, null, batchSize); + } + + @Override + public void onCreate(int id) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public boolean isFinished() { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public boolean hasNext() { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public ContentEvent nextEvent() { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public boolean process(ContentEvent event) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public Processor newProcessor(Processor processor) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/4453f1f1/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java new file mode 100644 index 0000000..29f04ca --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java @@ -0,0 +1,31 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import org.apache.samoa.core.ContentEvent; + +/** + * + * @author pwawrzyniak + * @param <T> the class that would be serialized + */ +public interface KafkaSerializer<T extends ContentEvent> { + + // TODO: Consider Key-Value schema? + + + byte[] serialize(T message); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/4453f1f1/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java new file mode 100644 index 0000000..c2fbaa8 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java @@ -0,0 +1,71 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import java.util.Collection; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; + +/** + * Internal class responsible for Kafka Stream handling + * + * @author pwawrzyniak + */ +class KafkaUtils { + + // Consumer class for internal use to retrieve messages from Kafka + private KafkaConsumer<String, byte[]> consumer; + + private KafkaProducer<String, byte[]> producer; + + // Properties of the consumer, as defined in Kafka documentation + private Properties consumerProperties; + private Properties producerProperties; + + // Batch size for Kafka Consumer + private int consumerTimeout; + + public KafkaUtils(Properties consumerProperties, Properties producerProperties, int consumerTimeout) { + this.consumerProperties = consumerProperties; + this.producerProperties = producerProperties; + this.consumerTimeout = consumerTimeout; + } + + public void initializeConsumer(Collection<String> topics) { + // lazy initialization + if (consumer == null) { + consumer = new KafkaConsumer<String, byte[]>(consumerProperties); + } + consumer.subscribe(topics); + } + + public ConsumerRecords<String, byte[]> getMessages() throws Exception { + + if (consumer != null) { + if (!consumer.subscription().isEmpty()) { + return consumer.poll(consumerTimeout); + } else { + // TODO: do it more elegant way + throw new Exception("Consumer subscribed to no topics!"); + } + } else { + // TODO: do more elegant way + throw new Exception("Consumer not initialised"); + } + } +}
