Hi Santhosh,

I don't think you need to rewrite the storm-kafka-client pom to switch
Kafka versions. I've attached a minimal pom that should work for what you
want to do. You'll need to use storm-kafka-client 1.1.0, since 1.0.2 isn't
compatible with Kafka 0.10.

Note that the pom bundles kafka-client in the resulting jar. If you want to
generate a smaller jar, you can declare the client as "provided" scope, and
copy the Kafka library into your cluster's /extlib. See
https://github.com/apache/storm/blob/1.x-branch/examples/storm-kafka-client-examples/pom.xml
for a pom that does it that way.

2017-02-01 13:28 GMT+01:00 Santhosh Kumar Sagi <[email protected]>:

> Hi team,
>
> I am developing a sample project for Storm - Kafka integration, where I am
> enabling SSL and ACL feature for Kafka - using Kafka 0.10.x.
>
> I have used storm src code for storm-kafka-client to check for new
> KafkaSpout that supports for Kafka streaming data. But, I am running into
> multiple issues while testing the code for same.
>
>
> 1. Getting below exception while I try to run Storm Topology with 1.0.1
> and Kafka-client 0.10.1.1
>
> 25959 [Thread-23-kafka_spout-executor[4 4]] INFO  o.a.k.c.u.AppInfoParser
> - Kafka version : 0.9.0.1
> 25959 [Thread-23-kafka_spout-executor[4 4]] INFO  o.a.k.c.u.AppInfoParser
> - Kafka commitId : 23c69d62a0cabf06
> 25974 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.util - Async loop
> died!
> java.lang.NoSuchMethodError: org.apache.kafka.clients.
> consumer.KafkaConsumer.subscribe(Ljava/util/Collection;Lorg/apache/kafka/
> clients/consumer/ConsumerRebalanceListener;)V
>     at org.apache.storm.kafka.spout.internal.fetcher.
> AutomaticKafkaRecordsFetcher.subscribe(AutomaticKafkaRecordsFetcher.java:51)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.kafka.spout.internal.fetcher.
> AutomaticKafkaRecordsFetcher.<init>(AutomaticKafkaRecordsFetcher.java:45)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.kafka.spout.internal.fetcher.
> KafkaRecordsFetchers.create(KafkaRecordsFetchers.java:51)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at 
> org.apache.storm.kafka.spout.KafkaSpout.subscribeKafkaConsumer(KafkaSpout.java:415)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.kafka.spout.KafkaSpout.activate(KafkaSpout.java:407)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at 
> org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:640)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484)
> [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at clojure.lang.AFn.run(AFn.java:22) [storm-kafka-client-2.0.0-
> SNAPSHOT-jar-with-dependencies.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
> 25994 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.d.executor -
> java.lang.NoSuchMethodError: org.apache.kafka.clients.
> consumer.KafkaConsumer.subscribe(Ljava/util/Collection;Lorg/apache/kafka/
> clients/consumer/ConsumerRebalanceListener;)V
>     at org.apache.storm.kafka.spout.internal.fetcher.
> AutomaticKafkaRecordsFetcher.subscribe(AutomaticKafkaRecordsFetcher.java:51)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.kafka.spout.internal.fetcher.
> AutomaticKafkaRecordsFetcher.<init>(AutomaticKafkaRecordsFetcher.java:45)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.kafka.spout.internal.fetcher.
> KafkaRecordsFetchers.create(KafkaRecordsFetchers.java:51)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at 
> org.apache.storm.kafka.spout.KafkaSpout.subscribeKafkaConsumer(KafkaSpout.java:415)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.kafka.spout.KafkaSpout.activate(KafkaSpout.java:407)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at 
> org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:640)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484)
> [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at clojure.lang.AFn.run(AFn.java:22) [storm-kafka-client-2.0.0-
> SNAPSHOT-jar-with-dependencies.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
> 26028 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.util - Halting
> process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
>     at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
> [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at clojure.lang.RestFn.invoke(RestFn.java:423)
> [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.daemon.worker$fn__8554$fn__8555.invoke(worker.clj:761)
> [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.daemon.executor$mk_executor_data$fn__
> 7773$fn__7774.invoke(executor.clj:271) [storm-kafka-client-2.0.0-
> SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:494)
> [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at clojure.lang.AFn.run(AFn.java:22) [storm-kafka-client-2.0.0-
> SNAPSHOT-jar-with-dependencies.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
>
>
> It seems that the Storm-Kafka-Client 1.0.2 (latest) is having dependency
> with Kafka version 0.9.0.1. Even when I try to resolve the dependency using
> 0.10.1.1, while running topology for sending data through Kafka Producer is
> throwing the above exception.
>
>
> Please do suggest any other workaround if available. As, I see that there
> latest version of storm 1.1.0 is having the dependency for Kafka 0.10, but
> which is NOT available in GIT. And I am help less here to test the changes
> further.
>
> Do let me know any further information required from my side to resolve
> this. I'll be sending as attachments. PFA my POM.xml used for the project.
>
> Expecting an early reply.
>
> Thanks,
> Santhosh
>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";>
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.mycompany</groupId>
    <artifactId>TestKafkaSpout</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.0.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>1.1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <configuration>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <shadedClassifierName>jar-with-dependencies</shadedClassifierName>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Reply via email to