Hi team,

I am developing a sample project for Storm - Kafka integration, where I am
enabling SSL and ACL feature for Kafka - using Kafka 0.10.x.

I have used storm src code for storm-kafka-client to check for new
KafkaSpout that supports for Kafka streaming data. But, I am running into
multiple issues while testing the code for same.


1. Getting below exception while I try to run Storm Topology with 1.0.1 and
Kafka-client 0.10.1.1

25959 [Thread-23-kafka_spout-executor[4 4]] INFO  o.a.k.c.u.AppInfoParser -
Kafka version : 0.9.0.1
25959 [Thread-23-kafka_spout-executor[4 4]] INFO  o.a.k.c.u.AppInfoParser -
Kafka commitId : 23c69d62a0cabf06
25974 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.util - Async loop
died!
java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
    at
org.apache.storm.kafka.spout.internal.fetcher.AutomaticKafkaRecordsFetcher.subscribe(AutomaticKafkaRecordsFetcher.java:51)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.kafka.spout.internal.fetcher.AutomaticKafkaRecordsFetcher.<init>(AutomaticKafkaRecordsFetcher.java:45)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.kafka.spout.internal.fetcher.KafkaRecordsFetchers.create(KafkaRecordsFetchers.java:51)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.kafka.spout.KafkaSpout.subscribeKafkaConsumer(KafkaSpout.java:415)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.kafka.spout.KafkaSpout.activate(KafkaSpout.java:407)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:640)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at clojure.lang.AFn.run(AFn.java:22)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
25994 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.d.executor -
java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
    at
org.apache.storm.kafka.spout.internal.fetcher.AutomaticKafkaRecordsFetcher.subscribe(AutomaticKafkaRecordsFetcher.java:51)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.kafka.spout.internal.fetcher.AutomaticKafkaRecordsFetcher.<init>(AutomaticKafkaRecordsFetcher.java:45)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.kafka.spout.internal.fetcher.KafkaRecordsFetchers.create(KafkaRecordsFetchers.java:51)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.kafka.spout.KafkaSpout.subscribeKafkaConsumer(KafkaSpout.java:415)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.kafka.spout.KafkaSpout.activate(KafkaSpout.java:407)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:640)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at clojure.lang.AFn.run(AFn.java:22)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
26028 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.util - Halting
process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at clojure.lang.RestFn.invoke(RestFn.java:423)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.daemon.worker$fn__8554$fn__8555.invoke(worker.clj:761)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.daemon.executor$mk_executor_data$fn__7773$fn__7774.invoke(executor.clj:271)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:494)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at clojure.lang.AFn.run(AFn.java:22)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]


It seems that the Storm-Kafka-Client 1.0.2 (latest) is having dependency
with Kafka version 0.9.0.1. Even when I try to resolve the dependency using
0.10.1.1, while running topology for sending data through Kafka Producer is
throwing the above exception.


Please do suggest any other workaround if available. As, I see that there
latest version of storm 1.1.0 is having the dependency for Kafka 0.10, but
which is NOT available in GIT. And I am help less here to test the changes
further.

Do let me know any further information required from my side to resolve
this. I'll be sending as attachments. PFA my POM.xml used for the project.

Expecting an early reply.

Thanks,
Santhosh
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";>
	<modelVersion>4.0.0</modelVersion>

	<parent>
		<artifactId>storm</artifactId>
		<groupId>org.apache.storm</groupId>
		<version>2.0.0-SNAPSHOT</version>
		<relativePath>../../pom.xml</relativePath>
	</parent>

	<artifactId>storm-kafka-client</artifactId>
	<name>storm-kafka-client</name>

	<packaging>jar</packaging>

	<developers>
		<developer>
			<id>hmcl</id>
			<name>Hugo Louro</name>
			<email>[email protected]</email>
		</developer>
	</developers>

	<dependencies>
		<!--parent module dependency -->
		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-core</artifactId>
			<version>1.0.1</version>
			<exclusions>
				<exclusion>
					<groupId>storm</groupId>
					<artifactId>storm</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.apache.kafka</groupId>
					<artifactId>kafka-clients</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<!--kafka libraries -->
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.10.1.1</version>
			<scope>provided</scope>
			<exclusions>
				<exclusion>
					<groupId>org.apache.zookeeper</groupId>
					<artifactId>zookeeper</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-log4j12</artifactId>
				</exclusion>
				<exclusion>
					<groupId>log4j</groupId>
					<artifactId>log4j</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.4.6</version>
		</dependency>
		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-kafka</artifactId>
			<version>1.0.1</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.kafka</groupId>
					<artifactId>kafka-clients</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.apache.zookeeper</groupId>
					<artifactId>zookeeper</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-starter</artifactId>
			<version>1.0.1</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.kafka</groupId>
					<artifactId>kafka-clients</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-kafka-client</artifactId>
			<version>1.0.1</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.kafka</groupId>
					<artifactId>kafka-clients</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.11</artifactId>
			<version>0.10.1.1</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.kafka</groupId>
					<artifactId>kafka-clients</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.apache.zookeeper</groupId>
					<artifactId>zookeeper</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<!--test dependencies -->
		<dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.8.0_111</version>
			<scope>system</scope>
			<systemPath>/home/cloudera/Downloads/jdk1.8.0_111/lib/tools.jar</systemPath>
		</dependency>

		<dependency>
			<groupId>org.mockito</groupId>
			<artifactId>mockito-core</artifactId>
			<version>${mockito.version}</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.hamcrest</groupId>
			<artifactId>hamcrest-all</artifactId>
			<version>1.3</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>info.batey.kafka</groupId>
			<artifactId>kafka-unit</artifactId>
			<version>0.6</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>log4j-over-slf4j</artifactId>
			<version>${log4j-over-slf4j.version}</version>
			<scope>test</scope>
		</dependency>

	</dependencies>

	<build>
		<!-- <directory>${project.basedir}/target</directory> <outputDirectory>${project.build.directory}/classes</outputDirectory> 
			<sourceDirectory>${project.basedir}/src</sourceDirectory> -->
		<plugins>
			<!-- <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> 
				<version>2.10</version> <executions> <execution> <id>copy-dependencies</id> 
				<phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> 
				<outputDirectory>${project.build.directory}/dependencies</outputDirectory> 
				<overWriteReleases>false</overWriteReleases> <overWriteSnapshots>false</overWriteSnapshots> 
				<overWriteIfNewer>true</overWriteIfNewer> </configuration> </execution> </executions> 
				</plugin> -->
			<!-- <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> 
				<version>2.5</version> <executions> <execution> <goals> <goal>test-jar</goal> 
				</goals> </execution> </executions> </plugin> -->
			<!-- <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> 
				<version>1.5.0</version> <executions> <execution> <id>thickbuild</id> <phase>package</phase> 
				<inherited>false</inherited> <goals> <goal>exec</goal> </goals> <configuration> 
				<executable>sh</executable> <arguments> <argument>${basedir}/thickjar1.sh</argument> 
				</arguments> </configuration> </execution> </executions> </plugin> -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.2</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
			<plugin>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>2.4</version>
				<configuration>
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
					<archive>
						<manifest>
							<mainClass></mainClass>
						</manifest>
					</archive>
					<!-- <descriptor>src/main/assembly/assembly.xml</descriptor> -->
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
						<configuration>
							<archive>
								<manifest>
									<mainClass>org/apache/storm/kafka/spout/testAgents/TestKafkaSpoutTopologyMainNamedTopics</mainClass>
								</manifest>
							</archive>
						</configuration>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>2.4.1</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<filters>
								<filter>
									<artifact>storm-core:storm-core</artifact>
									<excludes>
										<exclude>defaults.yaml</exclude>
									</excludes>
								</filter>
							</filters>

							<transformers>
								<transformer
									implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<mainClass>org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain</mainClass>
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

Reply via email to