Fix Kafka versions (pom.xml and samoa-samza issue)

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/9180ab18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/9180ab18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/9180ab18

Branch: refs/heads/master
Commit: 9180ab18248a38fd86162c0a6b5fe4999e9531f2
Parents: a5cda69
Author: pwawrzyniak <[email protected]>
Authored: Wed Jul 5 12:10:05 2017 +0200
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 samoa-api/pom.xml                               |   8 +-
 .../streams/kafka/KafkaEntranceProcessor.java   |  21 +++
 .../samoa/streams/kafka/OosTestSerializer.java  | 135 +++++++++++--------
 .../org/apache/samoa/utils/SystemsUtils.java    |   6 +-
 5 files changed, 108 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9180ab18/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ecc713d..90d6a5f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,7 +131,7 @@
         <jcip-annotations.version>1.0</jcip-annotations.version>
         <jmockit.version>1.13</jmockit.version>
         <junit.version>4.10</junit.version>
-        <kafka.version>0.8.1</kafka.version>
+        <kafka.version>0.10.2.0</kafka.version>
         <kryo.version>2.21</kryo.version>
         <metrics-core.version>2.2.0</metrics-core.version>
         <miniball.version>1.0.3</miniball.version>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9180ab18/samoa-api/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-api/pom.xml b/samoa-api/pom.xml
index e1e0b68..191bc1a 100644
--- a/samoa-api/pom.xml
+++ b/samoa-api/pom.xml
@@ -104,24 +104,24 @@ limitations under the License.
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
-            <version>0.10.2.0</version>
+            <version>${kafka.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
-            <version>0.10.2.0</version>
+            <version>${kafka.version}</version>
             <classifier>test</classifier>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.11</artifactId>
-            <version>0.10.2.0</version>
+            <version>${kafka.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.11</artifactId>
-            <version>0.10.2.0</version>
+            <version>${kafka.version}</version>
             <classifier>test</classifier>
             <scope>test</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9180ab18/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
index 83039dc..866a457 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
@@ -13,6 +13,27 @@
  */
 package org.apache.samoa.streams.kafka;
 
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9180ab18/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java
index 2b64bec..14535bb 100644
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java
@@ -1,58 +1,77 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samoa.streams.kafka;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import org.apache.samoa.learners.InstanceContentEvent;
-
-/**
- *
- * @author Piotr Wawrzyniak
- */
-public class OosTestSerializer implements 
KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent> {
-
-    @Override
-    public InstanceContentEvent deserialize(byte[] message) {
-        try {
-            ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(message));
-            InstanceContentEvent ice = (InstanceContentEvent)ois.readObject();
-            return ice;
-        } catch (IOException | ClassNotFoundException ex) {
-            
Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex);
-        }
-        return null;
-    }
-
-    @Override
-    public byte[] serialize(InstanceContentEvent message) {
-        try {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutputStream oos = new ObjectOutputStream(baos);
-            oos.writeObject(message);
-            oos.flush();
-            return baos.toByteArray();            
-        } catch (IOException ex) {
-            
Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex);
-        }        
-        return null;
-    }
-    
-    
-}
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.samoa.learners.InstanceContentEvent;
+
+/**
+ *
+ * @author Piotr Wawrzyniak
+ */
+public class OosTestSerializer implements 
KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent> {
+
+    @Override
+    public InstanceContentEvent deserialize(byte[] message) {
+        try {
+            ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(message));
+            InstanceContentEvent ice = (InstanceContentEvent)ois.readObject();
+            return ice;
+        } catch (IOException | ClassNotFoundException ex) {
+            
Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex);
+        }
+        return null;
+    }
+
+    @Override
+    public byte[] serialize(InstanceContentEvent message) {
+        try {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(message);
+            oos.flush();
+            return baos.toByteArray();            
+        } catch (IOException ex) {
+            
Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex);
+        }        
+        return null;
+    }
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9180ab18/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java
----------------------------------------------------------------------
diff --git a/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java 
b/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java
index ad2b383..d1e3a53 100644
--- a/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java
+++ b/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java
@@ -32,7 +32,9 @@ import java.util.Map;
 import java.util.Properties;
 
 import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
 import kafka.utils.ZKStringSerializer;
+import kafka.utils.ZkUtils;
 
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
@@ -72,7 +74,9 @@ public class SystemsUtils {
      * Create Kafka topic/stream
      */
     static void createKafkaTopic(String name, int partitions, int replicas) {
-      AdminUtils.createTopic(zkClient, name, partitions, replicas, new 
Properties());
+        // Fix for Apache Kafka 0.10
+        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+      AdminUtils.createTopic(zkUtils, name, partitions, replicas, new 
Properties(), RackAwareMode.Disabled$.MODULE$);
     }
 
     static class ZKStringSerializerWrapper implements ZkSerializer {

Reply via email to