Correction in order of messages

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/84c94874
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/84c94874
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/84c94874

Branch: refs/heads/master
Commit: 84c94874f6d80fbf2b232beb2f573cd6b478435c
Parents: eeb0691
Author: pwawrzyniak <[email protected]>
Authored: Wed Jul 5 09:09:19 2017 +0200
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 samoa-api/pom.xml                               | 308 +++++++++----------
 .../streams/kafka/KafkaEntranceProcessor.java   | 229 +++++++-------
 2 files changed, 259 insertions(+), 278 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/84c94874/samoa-api/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-api/pom.xml b/samoa-api/pom.xml
index 2b7bd22..e1e0b68 100644
--- a/samoa-api/pom.xml
+++ b/samoa-api/pom.xml
@@ -1,154 +1,154 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  #%L
-  SAMOA
-  %%
-  Copyright (C) 2014 - 2015 Apache Software Foundation
-  %%
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-  
-       http://www.apache.org/licenses/LICENSE-2.0
-  
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-#L%
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    </properties>
-
-    <name>samoa-api</name>
-    <description>API and algorithms for SAMOA</description>
-
-    <artifactId>samoa-api</artifactId>
-    <parent>
-        <groupId>org.apache.samoa</groupId>
-        <artifactId>samoa</artifactId>
-        <version>0.5.0-incubating-SNAPSHOT</version>
-    </parent>
-
-    <dependencies>
-        <dependency>
-            <groupId>com.yammer.metrics</groupId>
-            <artifactId>metrics-core</artifactId>
-            <version>${metrics-core.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>net.jcip</groupId>
-            <artifactId>jcip-annotations</artifactId>
-            <version>${jcip-annotations.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-lang3</artifactId>
-            <version>${commons-lang3.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.github.javacliparser</groupId>
-            <artifactId>javacliparser</artifactId>
-            <version>${javacliparser.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.samoa</groupId>
-            <artifactId>samoa-instances</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>${guava.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.esotericsoftware.kryo</groupId>
-            <artifactId>kryo</artifactId>
-            <version>${kryo.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.dreizak</groupId>
-            <artifactId>miniball</artifactId>
-            <version>${miniball.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-minicluster</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>test</scope>
-        </dependency>
-        
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>0.10.2.0</version>
-        </dependency>
-            <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
-      <version>0.10.2.0</version>
-      <classifier>test</classifier>
-      <scope>test</scope>
-    </dependency>
-        <dependency>
-    <groupId>org.apache.kafka</groupId>
-    <artifactId>kafka_2.11</artifactId>
-    <version>0.10.2.0</version>
-</dependency>
-        <dependency>
-    <groupId>org.apache.kafka</groupId>
-    <artifactId>kafka_2.11</artifactId>
-    <version>0.10.2.0</version>
-    <classifier>test</classifier>
-    <scope>test</scope>
-</dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-dependency-plugin</artifactId>
-                <version>${maven-dependency-plugin.version}</version>
-                <executions>
-                    <execution>
-                        <id>copy-dependencies</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>copy-dependencies</goal>
-                        </goals>
-                        <configuration>
-                            
<outputDirectory>${project.build.directory}/lib</outputDirectory>
-                            <overWriteReleases>false</overWriteReleases>
-                            <overWriteSnapshots>false</overWriteSnapshots>
-                            <overWriteIfNewer>true</overWriteIfNewer>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  #%L
+  SAMOA
+  %%
+  Copyright (C) 2014 - 2015 Apache Software Foundation
+  %%
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+#L%
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <name>samoa-api</name>
+    <description>API and algorithms for SAMOA</description>
+
+    <artifactId>samoa-api</artifactId>
+    <parent>
+        <groupId>org.apache.samoa</groupId>
+        <artifactId>samoa</artifactId>
+        <version>0.5.0-incubating-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.yammer.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>${metrics-core.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>net.jcip</groupId>
+            <artifactId>jcip-annotations</artifactId>
+            <version>${jcip-annotations.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons-lang3.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.javacliparser</groupId>
+            <artifactId>javacliparser</artifactId>
+            <version>${javacliparser.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.samoa</groupId>
+            <artifactId>samoa-instances</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.esotericsoftware.kryo</groupId>
+            <artifactId>kryo</artifactId>
+            <version>${kryo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.dreizak</groupId>
+            <artifactId>miniball</artifactId>
+            <version>${miniball.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.2.0</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>0.10.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>0.10.2.0</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>${maven-dependency-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <id>copy-dependencies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            
<outputDirectory>${project.build.directory}/lib</outputDirectory>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>false</overWriteSnapshots>
+                            <overWriteIfNewer>true</overWriteIfNewer>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/84c94874/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
index ea5d06e..83039dc 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
@@ -1,124 +1,105 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samoa.streams.kafka;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import org.apache.samoa.core.ContentEvent;
-import org.apache.samoa.core.EntranceProcessor;
-import org.apache.samoa.core.Processor;
-
-/**
- * Entrance processor that reads incoming messages from <a 
href="https://kafka.apache.org/";>Apache Kafka</a>
- * @author pwawrzyniak
- * @version 0.5.0-incubating-SNAPSHOT
- * @since 0.5.0-incubating
- */
-public class KafkaEntranceProcessor implements EntranceProcessor {
-
-    transient private final KafkaUtils kafkaUtils;
-    private List<byte[]> buffer;
-    private final KafkaDeserializer deserializer;
-    private final String topic;
-
-    /**
-     * Class constructor
-     * @param props Properties of Kafka consumer
-     * @see  <a 
href="https://kafka.apache.org/documentation/#newconsumerconfigs";> Apache Kafka 
consumer configuration</a>
-     * @param topic Topic from which the messages should be read
-     * @param timeout Timeout used when polling Kafka for new messages
-     * @param deserializer Instance of the implementation of {@link 
KafkaDeserializer}
-     */
-    public KafkaEntranceProcessor(Properties props, String topic, int timeout, 
KafkaDeserializer deserializer) {
-        this.kafkaUtils = new KafkaUtils(props, null, timeout);
-        this.deserializer = deserializer;
-        this.topic = topic;
-    }
-
-    private KafkaEntranceProcessor(KafkaUtils kafkaUtils, KafkaDeserializer 
deserializer, String topic) {
-        this.kafkaUtils = kafkaUtils;
-        this.deserializer = deserializer;
-        this.topic = topic;
-    }
-
-    @Override
-    public void onCreate(int id) {
-        this.buffer = new ArrayList<>(100);
-        this.kafkaUtils.initializeConsumer(Arrays.asList(this.topic));
-    }
-
-    @Override
-    public boolean isFinished() {
-        return false;
-    }
-
-    @Override
-    public boolean hasNext() {
-        if (buffer.isEmpty()) {
-            try {
-                buffer.addAll(kafkaUtils.getKafkaMessages());
-            } catch (Exception ex) {
-                
Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, 
null, ex);
-            }
-        }
-        return buffer.size() > 0;
-    }
-
-    @Override
-    public ContentEvent nextEvent() {
-        // assume this will never be called when buffer is empty!        
-        return this.deserializer.deserialize(buffer.remove(buffer.size() - 1));
-    }
-
-    @Override
-    public boolean process(ContentEvent event) {
-        return false;
-    }
-
-    @Override
-    public Processor newProcessor(Processor processor) {
-        KafkaEntranceProcessor kep = (KafkaEntranceProcessor) processor;
-        return new KafkaEntranceProcessor(new KafkaUtils(kep.kafkaUtils), 
kep.deserializer, kep.topic);
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-        kafkaUtils.closeConsumer();
-        super.finalize(); //To change body of generated methods, choose Tools 
| Templates.
-    }
-    
-}
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+
+/**
+ * Entrance processor that reads incoming messages from <a 
href="https://kafka.apache.org/";>Apache Kafka</a>
+ * @author pwawrzyniak
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ */
+public class KafkaEntranceProcessor implements EntranceProcessor {
+
+    transient private final KafkaUtils kafkaUtils;
+    private List<byte[]> buffer;
+    private final KafkaDeserializer deserializer;
+    private final String topic;
+
+    /**
+     * Class constructor
+     * @param props Properties of Kafka consumer
+     * @see  <a 
href="https://kafka.apache.org/documentation/#newconsumerconfigs";> Apache Kafka 
consumer configuration</a>
+     * @param topic Topic from which the messages should be read
+     * @param timeout Timeout used when polling Kafka for new messages
+     * @param deserializer Instance of the implementation of {@link 
KafkaDeserializer}
+     */
+    public KafkaEntranceProcessor(Properties props, String topic, int timeout, 
KafkaDeserializer deserializer) {
+        this.kafkaUtils = new KafkaUtils(props, null, timeout);
+        this.deserializer = deserializer;
+        this.topic = topic;
+    }
+
+    private KafkaEntranceProcessor(KafkaUtils kafkaUtils, KafkaDeserializer 
deserializer, String topic) {
+        this.kafkaUtils = kafkaUtils;
+        this.deserializer = deserializer;
+        this.topic = topic;
+    }
+
+    @Override
+    public void onCreate(int id) {
+        this.buffer = new ArrayList<>(100);
+        this.kafkaUtils.initializeConsumer(Arrays.asList(this.topic));
+    }
+
+    @Override
+    public boolean isFinished() {
+        return false;
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (buffer.isEmpty()) {
+            try {
+                buffer.addAll(kafkaUtils.getKafkaMessages());
+            } catch (Exception ex) {
+                
Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, 
null, ex);
+            }
+        }
+        return buffer.size() > 0;
+    }
+
+    @Override
+    public ContentEvent nextEvent() {
+        // assume this will never be called when buffer is empty!        
+        return this.deserializer.deserialize(buffer.remove(0));
+    }
+
+    @Override
+    public boolean process(ContentEvent event) {
+        return false;
+    }
+
+    @Override
+    public Processor newProcessor(Processor processor) {
+        KafkaEntranceProcessor kep = (KafkaEntranceProcessor) processor;
+        return new KafkaEntranceProcessor(new KafkaUtils(kep.kafkaUtils), 
kep.deserializer, kep.topic);
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        kafkaUtils.closeConsumer();
+        super.finalize();
+    }
+    
+}

Reply via email to