Fix Kafka versions (pom.xml and samoa-samza issue) Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/9180ab18 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/9180ab18 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/9180ab18
Branch: refs/heads/master Commit: 9180ab18248a38fd86162c0a6b5fe4999e9531f2 Parents: a5cda69 Author: pwawrzyniak <[email protected]> Authored: Wed Jul 5 12:10:05 2017 +0200 Committer: nkourtellis <[email protected]> Committed: Fri Jul 21 21:12:18 2017 +0300 ---------------------------------------------------------------------- pom.xml | 2 +- samoa-api/pom.xml | 8 +- .../streams/kafka/KafkaEntranceProcessor.java | 21 +++ .../samoa/streams/kafka/OosTestSerializer.java | 135 +++++++++++-------- .../org/apache/samoa/utils/SystemsUtils.java | 6 +- 5 files changed, 108 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9180ab18/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ecc713d..90d6a5f 100644 --- a/pom.xml +++ b/pom.xml @@ -131,7 +131,7 @@ <jcip-annotations.version>1.0</jcip-annotations.version> <jmockit.version>1.13</jmockit.version> <junit.version>4.10</junit.version> - <kafka.version>0.8.1</kafka.version> + <kafka.version>0.10.2.0</kafka.version> <kryo.version>2.21</kryo.version> <metrics-core.version>2.2.0</metrics-core.version> <miniball.version>1.0.3</miniball.version> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9180ab18/samoa-api/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-api/pom.xml b/samoa-api/pom.xml index e1e0b68..191bc1a 100644 --- a/samoa-api/pom.xml +++ b/samoa-api/pom.xml @@ -104,24 +104,24 @@ limitations under the License. <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> - <version>0.10.2.0</version> + <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> - <version>0.10.2.0</version> + <version>${kafka.version}</version> <classifier>test</classifier> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> - <version>0.10.2.0</version> + <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> - <version>0.10.2.0</version> + <version>${kafka.version}</version> <classifier>test</classifier> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9180ab18/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java index 83039dc..866a457 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java @@ -13,6 +13,27 @@ */ package org.apache.samoa.streams.kafka; +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + import java.util.ArrayList; import java.util.Arrays; import java.util.List; http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9180ab18/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java index 2b64bec..14535bb 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java @@ -1,58 +1,77 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samoa.streams.kafka; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.logging.Level; -import java.util.logging.Logger; -import org.apache.samoa.learners.InstanceContentEvent; - -/** - * - * @author Piotr Wawrzyniak - */ -public class OosTestSerializer implements KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent> { - - @Override - public InstanceContentEvent deserialize(byte[] message) { - try { - ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(message)); - InstanceContentEvent ice = (InstanceContentEvent)ois.readObject(); - return ice; - } catch (IOException | ClassNotFoundException ex) { - Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex); - } - return null; - } - - @Override - public byte[] serialize(InstanceContentEvent message) { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(message); - oos.flush(); - return baos.toByteArray(); - } catch (IOException ex) { - Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex); - } - return null; - } - - -} +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.samoa.learners.InstanceContentEvent; + +/** + * + * @author Piotr Wawrzyniak + */ +public class OosTestSerializer implements KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent> { + + @Override + public InstanceContentEvent deserialize(byte[] message) { + try { + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(message)); + InstanceContentEvent ice = (InstanceContentEvent)ois.readObject(); + return ice; + } catch (IOException | ClassNotFoundException ex) { + Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex); + } + return null; + } + + @Override + public byte[] serialize(InstanceContentEvent message) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(message); + oos.flush(); + return baos.toByteArray(); + } catch (IOException ex) { + Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex); + } + return null; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9180ab18/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java ---------------------------------------------------------------------- diff --git a/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java b/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java index ad2b383..d1e3a53 100644 --- a/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java +++ b/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java @@ -32,7 +32,9 @@ import java.util.Map; import java.util.Properties; import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; import kafka.utils.ZKStringSerializer; +import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkMarshallingError; @@ -72,7 +74,9 @@ public class SystemsUtils { * Create Kafka topic/stream */ static void createKafkaTopic(String name, int partitions, int replicas) { - AdminUtils.createTopic(zkClient, name, partitions, replicas, new Properties()); + // Fix for Apache Kafka 0.10 + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + AdminUtils.createTopic(zkUtils, name, partitions, replicas, new Properties(), RackAwareMode.Disabled$.MODULE$); } static class ZKStringSerializerWrapper implements ZkSerializer {
