Hi Santhosh, I don't think you need to rewrite the storm-kafka-client pom to switch Kafka versions. I've attached a minimal pom that should work for what you want to do. You'll need to use storm-kafka-client 1.1.0, since 1.0.2 isn't compatible with Kafka 0.10.
Note that the pom bundles kafka-client in the resulting jar. If you want to generate a smaller jar, you can declare the client as "provided" scope, and copy the Kafka library into your cluster's /extlib. See https://github.com/apache/storm/blob/1.x-branch/examples/storm-kafka-client-examples/pom.xml for a pom that does it that way. 2017-02-01 13:28 GMT+01:00 Santhosh Kumar Sagi <[email protected]>: > Hi team, > > I am developing a sample project for Storm - Kafka integration, where I am > enabling SSL and ACL feature for Kafka - using Kafka 0.10.x. > > I have used storm src code for storm-kafka-client to check for new > KafkaSpout that supports for Kafka streaming data. But, I am running into > multiple issues while testing the code for same. > > > 1. Getting below exception while I try to run Storm Topology with 1.0.1 > and Kafka-client 0.10.1.1 > > 25959 [Thread-23-kafka_spout-executor[4 4]] INFO o.a.k.c.u.AppInfoParser > - Kafka version : 0.9.0.1 > 25959 [Thread-23-kafka_spout-executor[4 4]] INFO o.a.k.c.u.AppInfoParser > - Kafka commitId : 23c69d62a0cabf06 > 25974 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.util - Async loop > died! > java.lang.NoSuchMethodError: org.apache.kafka.clients. > consumer.KafkaConsumer.subscribe(Ljava/util/Collection;Lorg/apache/kafka/ > clients/consumer/ConsumerRebalanceListener;)V > at org.apache.storm.kafka.spout.internal.fetcher. > AutomaticKafkaRecordsFetcher.subscribe(AutomaticKafkaRecordsFetcher.java:51) > ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at org.apache.storm.kafka.spout.internal.fetcher. > AutomaticKafkaRecordsFetcher.<init>(AutomaticKafkaRecordsFetcher.java:45) > ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at org.apache.storm.kafka.spout.internal.fetcher. > KafkaRecordsFetchers.create(KafkaRecordsFetchers.java:51) > ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at > org.apache.storm.kafka.spout.KafkaSpout.subscribeKafkaConsumer(KafkaSpout.java:415) > ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at org.apache.storm.kafka.spout.KafkaSpout.activate(KafkaSpout.java:407) > ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at > org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:640) > ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) > [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at clojure.lang.AFn.run(AFn.java:22) [storm-kafka-client-2.0.0- > SNAPSHOT-jar-with-dependencies.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] > 25994 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.d.executor - > java.lang.NoSuchMethodError: org.apache.kafka.clients. > consumer.KafkaConsumer.subscribe(Ljava/util/Collection;Lorg/apache/kafka/ > clients/consumer/ConsumerRebalanceListener;)V > at org.apache.storm.kafka.spout.internal.fetcher. > AutomaticKafkaRecordsFetcher.subscribe(AutomaticKafkaRecordsFetcher.java:51) > ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at org.apache.storm.kafka.spout.internal.fetcher. > AutomaticKafkaRecordsFetcher.<init>(AutomaticKafkaRecordsFetcher.java:45) > ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at org.apache.storm.kafka.spout.internal.fetcher. > KafkaRecordsFetchers.create(KafkaRecordsFetchers.java:51) > ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at > org.apache.storm.kafka.spout.KafkaSpout.subscribeKafkaConsumer(KafkaSpout.java:415) > ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at org.apache.storm.kafka.spout.KafkaSpout.activate(KafkaSpout.java:407) > ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at > org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:640) > ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) > [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at clojure.lang.AFn.run(AFn.java:22) [storm-kafka-client-2.0.0- > SNAPSHOT-jar-with-dependencies.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] > 26028 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.util - Halting > process: ("Worker died") > java.lang.RuntimeException: ("Worker died") > at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) > [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at clojure.lang.RestFn.invoke(RestFn.java:423) > [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at org.apache.storm.daemon.worker$fn__8554$fn__8555.invoke(worker.clj:761) > [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at org.apache.storm.daemon.executor$mk_executor_data$fn__ > 7773$fn__7774.invoke(executor.clj:271) [storm-kafka-client-2.0.0- > SNAPSHOT-jar-with-dependencies.jar:?] > at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:494) > [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?] > at clojure.lang.AFn.run(AFn.java:22) [storm-kafka-client-2.0.0- > SNAPSHOT-jar-with-dependencies.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] > > > It seems that the Storm-Kafka-Client 1.0.2 (latest) is having dependency > with Kafka version 0.9.0.1. Even when I try to resolve the dependency using > 0.10.1.1, while running topology for sending data through Kafka Producer is > throwing the above exception. > > > Please do suggest any other workaround if available. As, I see that there > latest version of storm 1.1.0 is having the dependency for Kafka 0.10, but > which is NOT available in GIT. And I am help less here to test the changes > further. > > Do let me know any further information required from my side to resolve > this. I'll be sending as attachments. PFA my POM.xml used for the project. > > Expecting an early reply. > > Thanks, > Santhosh >
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.mycompany</groupId> <artifactId>TestKafkaSpout</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>1.1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>jar-with-dependencies</shadedClassifierName> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
