Correction in order of messages Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/84c94874 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/84c94874 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/84c94874
Branch: refs/heads/master Commit: 84c94874f6d80fbf2b232beb2f573cd6b478435c Parents: eeb0691 Author: pwawrzyniak <[email protected]> Authored: Wed Jul 5 09:09:19 2017 +0200 Committer: nkourtellis <[email protected]> Committed: Fri Jul 21 21:12:18 2017 +0300 ---------------------------------------------------------------------- samoa-api/pom.xml | 308 +++++++++---------- .../streams/kafka/KafkaEntranceProcessor.java | 229 +++++++------- 2 files changed, 259 insertions(+), 278 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/84c94874/samoa-api/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-api/pom.xml b/samoa-api/pom.xml index 2b7bd22..e1e0b68 100644 --- a/samoa-api/pom.xml +++ b/samoa-api/pom.xml @@ -1,154 +1,154 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - #%L - SAMOA - %% - Copyright (C) 2014 - 2015 Apache Software Foundation - %% - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -#L% ---> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - </properties> - - <name>samoa-api</name> - <description>API and algorithms for SAMOA</description> - - <artifactId>samoa-api</artifactId> - <parent> - <groupId>org.apache.samoa</groupId> - <artifactId>samoa</artifactId> - <version>0.5.0-incubating-SNAPSHOT</version> - </parent> - - <dependencies> - <dependency> - <groupId>com.yammer.metrics</groupId> - <artifactId>metrics-core</artifactId> - <version>${metrics-core.version}</version> - </dependency> - - <dependency> - <groupId>net.jcip</groupId> - <artifactId>jcip-annotations</artifactId> - <version>${jcip-annotations.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <version>${commons-lang3.version}</version> - </dependency> - - <dependency> - <groupId>com.github.javacliparser</groupId> - <artifactId>javacliparser</artifactId> - <version>${javacliparser.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.samoa</groupId> - <artifactId>samoa-instances</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - - <dependency> - <groupId>com.esotericsoftware.kryo</groupId> - <artifactId>kryo</artifactId> - <version>${kryo.version}</version> - </dependency> - - <dependency> - <groupId>com.dreizak</groupId> - <artifactId>miniball</artifactId> - <version>${miniball.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minicluster</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - <version>0.10.2.0</version> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - <version>0.10.2.0</version> - <classifier>test</classifier> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> - <version>0.10.2.0</version> -</dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> - <version>0.10.2.0</version> - <classifier>test</classifier> - <scope>test</scope> -</dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <version>${maven-dependency-plugin.version}</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>${project.build.directory}/lib</outputDirectory> - <overWriteReleases>false</overWriteReleases> - <overWriteSnapshots>false</overWriteSnapshots> - <overWriteIfNewer>true</overWriteIfNewer> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> +<?xml version="1.0" encoding="UTF-8"?> +<!-- + #%L + SAMOA + %% + Copyright (C) 2014 - 2015 Apache Software Foundation + %% + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +#L% +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <name>samoa-api</name> + <description>API and algorithms for SAMOA</description> + + <artifactId>samoa-api</artifactId> + <parent> + <groupId>org.apache.samoa</groupId> + <artifactId>samoa</artifactId> + <version>0.5.0-incubating-SNAPSHOT</version> + </parent> + + <dependencies> + <dependency> + <groupId>com.yammer.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${metrics-core.version}</version> + </dependency> + + <dependency> + <groupId>net.jcip</groupId> + <artifactId>jcip-annotations</artifactId> + <version>${jcip-annotations.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons-lang3.version}</version> + </dependency> + + <dependency> + <groupId>com.github.javacliparser</groupId> + <artifactId>javacliparser</artifactId> + <version>${javacliparser.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.samoa</groupId> + <artifactId>samoa-instances</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + <version>${kryo.version}</version> + </dependency> + + <dependency> + <groupId>com.dreizak</groupId> + <artifactId>miniball</artifactId> + <version>${miniball.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.2.0</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.2.0</version> + <classifier>test</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.10.2.0</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.10.2.0</version> + <classifier>test</classifier> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>${maven-dependency-plugin.version}</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/84c94874/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java index ea5d06e..83039dc 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java @@ -1,124 +1,105 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samoa.streams.kafka; - -/* - * #%L - * SAMOA - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.logging.Level; -import java.util.logging.Logger; -import org.apache.samoa.core.ContentEvent; -import org.apache.samoa.core.EntranceProcessor; -import org.apache.samoa.core.Processor; - -/** - * Entrance processor that reads incoming messages from <a href="https://kafka.apache.org/">Apache Kafka</a> - * @author pwawrzyniak - * @version 0.5.0-incubating-SNAPSHOT - * @since 0.5.0-incubating - */ -public class KafkaEntranceProcessor implements EntranceProcessor { - - transient private final KafkaUtils kafkaUtils; - private List<byte[]> buffer; - private final KafkaDeserializer deserializer; - private final String topic; - - /** - * Class constructor - * @param props Properties of Kafka consumer - * @see <a href="https://kafka.apache.org/documentation/#newconsumerconfigs"> Apache Kafka consumer configuration</a> - * @param topic Topic from which the messages should be read - * @param timeout Timeout used when polling Kafka for new messages - * @param deserializer Instance of the implementation of {@link KafkaDeserializer} - */ - public KafkaEntranceProcessor(Properties props, String topic, int timeout, KafkaDeserializer deserializer) { - this.kafkaUtils = new KafkaUtils(props, null, timeout); - this.deserializer = deserializer; - this.topic = topic; - } - - private KafkaEntranceProcessor(KafkaUtils kafkaUtils, KafkaDeserializer deserializer, String topic) { - this.kafkaUtils = kafkaUtils; - this.deserializer = deserializer; - this.topic = topic; - } - - @Override - public void onCreate(int id) { - this.buffer = new ArrayList<>(100); - this.kafkaUtils.initializeConsumer(Arrays.asList(this.topic)); - } - - @Override - public boolean isFinished() { - return false; - } - - @Override - public boolean hasNext() { - if (buffer.isEmpty()) { - try { - buffer.addAll(kafkaUtils.getKafkaMessages()); - } catch (Exception ex) { - Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, null, ex); - } - } - return buffer.size() > 0; - } - - @Override - public ContentEvent nextEvent() { - // assume this will never be called when buffer is empty! - return this.deserializer.deserialize(buffer.remove(buffer.size() - 1)); - } - - @Override - public boolean process(ContentEvent event) { - return false; - } - - @Override - public Processor newProcessor(Processor processor) { - KafkaEntranceProcessor kep = (KafkaEntranceProcessor) processor; - return new KafkaEntranceProcessor(new KafkaUtils(kep.kafkaUtils), kep.deserializer, kep.topic); - } - - @Override - protected void finalize() throws Throwable { - kafkaUtils.closeConsumer(); - super.finalize(); //To change body of generated methods, choose Tools | Templates. - } - -} +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; + +/** + * Entrance processor that reads incoming messages from <a href="https://kafka.apache.org/">Apache Kafka</a> + * @author pwawrzyniak + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + */ +public class KafkaEntranceProcessor implements EntranceProcessor { + + transient private final KafkaUtils kafkaUtils; + private List<byte[]> buffer; + private final KafkaDeserializer deserializer; + private final String topic; + + /** + * Class constructor + * @param props Properties of Kafka consumer + * @see <a href="https://kafka.apache.org/documentation/#newconsumerconfigs"> Apache Kafka consumer configuration</a> + * @param topic Topic from which the messages should be read + * @param timeout Timeout used when polling Kafka for new messages + * @param deserializer Instance of the implementation of {@link KafkaDeserializer} + */ + public KafkaEntranceProcessor(Properties props, String topic, int timeout, KafkaDeserializer deserializer) { + this.kafkaUtils = new KafkaUtils(props, null, timeout); + this.deserializer = deserializer; + this.topic = topic; + } + + private KafkaEntranceProcessor(KafkaUtils kafkaUtils, KafkaDeserializer deserializer, String topic) { + this.kafkaUtils = kafkaUtils; + this.deserializer = deserializer; + this.topic = topic; + } + + @Override + public void onCreate(int id) { + this.buffer = new ArrayList<>(100); + this.kafkaUtils.initializeConsumer(Arrays.asList(this.topic)); + } + + @Override + public boolean isFinished() { + return false; + } + + @Override + public boolean hasNext() { + if (buffer.isEmpty()) { + try { + buffer.addAll(kafkaUtils.getKafkaMessages()); + } catch (Exception ex) { + Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, null, ex); + } + } + return buffer.size() > 0; + } + + @Override + public ContentEvent nextEvent() { + // assume this will never be called when buffer is empty! + return this.deserializer.deserialize(buffer.remove(0)); + } + + @Override + public boolean process(ContentEvent event) { + return false; + } + + @Override + public Processor newProcessor(Processor processor) { + KafkaEntranceProcessor kep = (KafkaEntranceProcessor) processor; + return new KafkaEntranceProcessor(new KafkaUtils(kep.kafkaUtils), kep.deserializer, kep.topic); + } + + @Override + protected void finalize() throws Throwable { + kafkaUtils.closeConsumer(); + super.finalize(); + } + +}
