Thanks Vinoth for the tip,We were able to fix the issue as our spark cluster(2.2.0) bundled both spark-streaming-kafka-0-8 and spark-streaming-kafka-0-10 jars. Getting rid of spark-streaming-kafka-0-10 jars from the cluster resolved the ClasscastException.
On Oct 1, 2019, at 10:25 AM, Vinoth Chandar <vin...@apache.org<mailto:vin...@apache.org>> wrote: [External Email] Thanks for the detailed notes. helps. Could you give a quick shot trying to override the version in a custom build ? Wondering if just upgrading Kafka would suffice for your scenario (without needing the 2.12 scala bundle) On Tue, Oct 1, 2019 at 10:14 AM Gautam Nayak <gna...@guardanthealth.com<mailto:gna...@guardanthealth.com>> wrote: Thanks Nishith for the help.We were able to figure out that Kafka 0.8.2 (clients/broker) doesn’t support SSL/TLS, So we created a non SSL/TLS AWS MSK cluster (Kafka 1.1.1 now) and ran the HoodieDeltaStreamer.We are getting the following exception.I am thinking, waiting for HUDI-238< https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_HUDI-2D238&d=DwIFaQ&c=d114LkLKutuOERAHmV60Up0wrdigk2ezXF3OLLFTSC0&r=Z_EOqTWkXAwThVl29e1qgMbtnRFInSWTgm2Si2AeXgA&m=NI9T6O72WAF4llaaSXKrf4uPuZ6Lc4OMh5cNlpb8MT0&s=-1Lkibr7ZUVyx72JmkN1BrBnDbG1DtseYPt7Xyr56S8&e= > (which also has upgrade planned to Kafka 0.10) would be the right thing since all our existing Kafka clusters are already on 2.0+ . If there is a solution for this error, We would still go for it. Caused by: org.apache.spark.SparkException: Couldn't connect to leader for topic KAFKA_TEST 0: java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:168) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:168) at scala.util.Either.fold(Either.scala:98) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.connectLeader(KafkaRDD.scala:167) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:159) at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:339) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) Gautam On Sep 30, 2019, at 8:52 PM, nishith agarwal <n3.nas...@gmail.com<mailto:n3.nas...@gmail.com><mailto: n3.nas...@gmail.com<mailto:n3.nas...@gmail.com>>> wrote: [External Email] Gautam, If the MSK version (which I'm assuming in AWS msk..) is compatible with kafka-client 0.8, then it seems like this might be an authentication issue. Some details in this post here : https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_48164748_kafka-2Djava-2Dio-2Deofexception-2Dnetworkreceive-2Dreadfromreadablechannel&d=DwIFaQ&c=d114LkLKutuOERAHmV60Up0wrdigk2ezXF3OLLFTSC0&r=Z_EOqTWkXAwThVl29e1qgMbtnRFInSWTgm2Si2AeXgA&m=auJ6yPCx7QX0WTZJxh3Wvc87ogR0WTbDqAd0004Y-SE&s=snk_1UVibetwfXvUDFIwj0sArlwaH512t0q5u8hbWUo&e= . Thanks, Nishith On Mon, Sep 30, 2019 at 12:39 PM Gautam Nayak <gna...@guardanthealth.com<mailto:gna...@guardanthealth.com> <mailto:gna...@guardanthealth.com>> wrote: When executing a HoodieDeltaStreamer Job, We run into this below exception. I see hudi-utilities-bundle packaged with kafka-0.8 client libs, but believe it should be compatible with the MSK version of Kafka. Any pointers what the issue could be? Spark - 2.2.1 Kafka – MSK 2.1.0 19/09/30 19:10:56 main INFO SimpleConsumer: Reconnect due to error: java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99) at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:111) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:133) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:132) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:365) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:361) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at org.apache.spark.streaming.kafka.KafkaCluster.org $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:361) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:132) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:119) at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:196) at org.apache.hudi.utilities.sources.AvroKafkaSource.fetchNewData(AvroKafkaSource.java:55) at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:72) at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:62) at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:299) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:218) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:123) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:294) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:926) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:204) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:229) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:127) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 19/09/30 19:10:57 main INFO KafkaOffsetGen: HUDI -- getNextOffsetRanges -- topic name is KAFKA_TEST Thanks, Gautam