Hi team,
I am developing a sample project for Storm - Kafka integration, where I am
enabling SSL and ACL feature for Kafka - using Kafka 0.10.x.
I have used storm src code for storm-kafka-client to check for new
KafkaSpout that supports for Kafka streaming data. But, I am running into
multiple issues while testing the code for same.
1. Getting below exception while I try to run Storm Topology with 1.0.1 and
Kafka-client 0.10.1.1
25959 [Thread-23-kafka_spout-executor[4 4]] INFO o.a.k.c.u.AppInfoParser -
Kafka version : 0.9.0.1
25959 [Thread-23-kafka_spout-executor[4 4]] INFO o.a.k.c.u.AppInfoParser -
Kafka commitId : 23c69d62a0cabf06
25974 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.util - Async loop
died!
java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
at
org.apache.storm.kafka.spout.internal.fetcher.AutomaticKafkaRecordsFetcher.subscribe(AutomaticKafkaRecordsFetcher.java:51)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.apache.storm.kafka.spout.internal.fetcher.AutomaticKafkaRecordsFetcher.<init>(AutomaticKafkaRecordsFetcher.java:45)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.apache.storm.kafka.spout.internal.fetcher.KafkaRecordsFetchers.create(KafkaRecordsFetchers.java:51)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.apache.storm.kafka.spout.KafkaSpout.subscribeKafkaConsumer(KafkaSpout.java:415)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.apache.storm.kafka.spout.KafkaSpout.activate(KafkaSpout.java:407)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:640)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at clojure.lang.AFn.run(AFn.java:22)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
25994 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.d.executor -
java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
at
org.apache.storm.kafka.spout.internal.fetcher.AutomaticKafkaRecordsFetcher.subscribe(AutomaticKafkaRecordsFetcher.java:51)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.apache.storm.kafka.spout.internal.fetcher.AutomaticKafkaRecordsFetcher.<init>(AutomaticKafkaRecordsFetcher.java:45)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.apache.storm.kafka.spout.internal.fetcher.KafkaRecordsFetchers.create(KafkaRecordsFetchers.java:51)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.apache.storm.kafka.spout.KafkaSpout.subscribeKafkaConsumer(KafkaSpout.java:415)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.apache.storm.kafka.spout.KafkaSpout.activate(KafkaSpout.java:407)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:640)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at clojure.lang.AFn.run(AFn.java:22)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
26028 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.util - Halting
process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at clojure.lang.RestFn.invoke(RestFn.java:423)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.apache.storm.daemon.worker$fn__8554$fn__8555.invoke(worker.clj:761)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.apache.storm.daemon.executor$mk_executor_data$fn__7773$fn__7774.invoke(executor.clj:271)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:494)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at clojure.lang.AFn.run(AFn.java:22)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
It seems that the Storm-Kafka-Client 1.0.2 (latest) is having dependency
with Kafka version 0.9.0.1. Even when I try to resolve the dependency using
0.10.1.1, while running topology for sending data through Kafka Producer is
throwing the above exception.
Please do suggest any other workaround if available. As, I see that there
latest version of storm 1.1.0 is having the dependency for Kafka 0.10, but
which is NOT available in GIT. And I am help less here to test the changes
further.
Do let me know any further information required from my side to resolve
this. I'll be sending as attachments. PFA my POM.xml used for the project.
Expecting an early reply.
Thanks,
Santhosh
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>storm-kafka-client</artifactId>
<name>storm-kafka-client</name>
<packaging>jar</packaging>
<developers>
<developer>
<id>hmcl</id>
<name>Hugo Louro</name>
<email>[email protected]</email>
</developer>
</developers>
<dependencies>
<!--parent module dependency -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.1</version>
<exclusions>
<exclusion>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--kafka libraries -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-starter</artifactId>
<version>1.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--test dependencies -->
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8.0_111</version>
<scope>system</scope>
<systemPath>/home/cloudera/Downloads/jdk1.8.0_111/lib/tools.jar</systemPath>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>info.batey.kafka</groupId>
<artifactId>kafka-unit</artifactId>
<version>0.6</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>${log4j-over-slf4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<!-- <directory>${project.basedir}/target</directory> <outputDirectory>${project.build.directory}/classes</outputDirectory>
<sourceDirectory>${project.basedir}/src</sourceDirectory> -->
<plugins>
<!-- <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version> <executions> <execution> <id>copy-dependencies</id>
<phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
<overWriteReleases>false</overWriteReleases> <overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer> </configuration> </execution> </executions>
</plugin> -->
<!-- <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId>
<version>2.5</version> <executions> <execution> <goals> <goal>test-jar</goal>
</goals> </execution> </executions> </plugin> -->
<!-- <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId>
<version>1.5.0</version> <executions> <execution> <id>thickbuild</id> <phase>package</phase>
<inherited>false</inherited> <goals> <goal>exec</goal> </goals> <configuration>
<executable>sh</executable> <arguments> <argument>${basedir}/thickjar1.sh</argument>
</arguments> </configuration> </execution> </executions> </plugin> -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<!-- <descriptor>src/main/assembly/assembly.xml</descriptor> -->
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archive>
<manifest>
<mainClass>org/apache/storm/kafka/spout/testAgents/TestKafkaSpoutTopologyMainNamedTopics</mainClass>
</manifest>
</archive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>storm-core:storm-core</artifact>
<excludes>
<exclude>defaults.yaml</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>