I don't see anything obvious, you'd need to do more troubleshooting. Could also try creating a single rdd for a known range of offsets:
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#creating-an-rdd On Wed, Nov 15, 2017 at 9:33 PM, jagadish kagitala <jka...@gmail.com> wrote: > Hi Cody, > > It worked, after moving the parameter to sparkConf. I don't see that error. > But, Now i'm seeing the count for each RDD returns 0. But, there are records > in the topic i'm reading. > > Do you see anything wrong with how i'm creating the Direct Stream ? > > Thanks > Jagadish > > On Wed, Nov 15, 2017 at 11:23 AM, Cody Koeninger <c...@koeninger.org> wrote: >> >> spark.streaming.kafka.consumer.poll.ms is a spark configuration, not >> a kafka parameter. >> >> see http://spark.apache.org/docs/latest/configuration.html >> >> On Tue, Nov 14, 2017 at 8:56 PM, jkagitala <jka...@gmail.com> wrote: >> > Hi, >> > >> > I'm trying to add spark-streaming to our kafka topic. But, I keep >> > getting >> > this error >> > java.lang.AssertionError: assertion failed: Failed to get record after >> > polling for 512 ms. >> > >> > I tried to add different params like max.poll.interval.ms, >> > spark.streaming.kafka.consumer.poll.ms to 10000ms in kafkaParams. >> > But, i still get failed to get records after 512ms. Not sure, even >> > adding >> > the above params doesn't change the polling time. >> > >> > Without spark-streaming, i'm able to fetch the records. Only with >> > spark-streaming addon, i get this error. >> > >> > Any help is greatly appreciated. Below, is the code i'm using. >> > >> > SparkConf sparkConf = new >> > >> > SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]"); >> > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, >> > Durations.seconds(10)); >> > >> > kafkaParams.put("bootstrap.servers", hosts); >> > kafkaParams.put("group.id", groupid); >> > kafkaParams.put("auto.commit.enable", false); >> > kafkaParams.put("key.deserializer", StringDeserializer.class); >> > kafkaParams.put("value.deserializer", BytesDeserializer.class); >> > kafkaParams.put("auto.offset.reset", "earliest"); >> > //kafkaParams.put("max.poll.interval.ms", 12000); >> > //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000); >> > //kafkaParams.put("request.timeout.ms", 12000); >> > >> > >> > JavaInputDStream<ConsumerRecord<String, List<Bytes>>> messages = >> > KafkaUtils.createDirectStream(ssc, >> > LocationStrategies.PreferConsistent(), >> > >> > ConsumerStrategies.Subscribe(topics, kafkaParams)); >> > messages.foreachRDD(rdd -> { >> > List<ConsumerRecord<String, List<Bytes>>> input = >> > rdd.collect(); >> > System.out.println("count is"+input.size()); >> > }); >> > ssc.start(); >> > ssc.awaitTermination(); >> > >> > Thanks >> > Jagadish >> > >> > >> > >> > -- >> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >> > >> > --------------------------------------------------------------------- >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org