spark.streaming.kafka.consumer.poll.ms  is a spark configuration, not
a kafka parameter.

see http://spark.apache.org/docs/latest/configuration.html

On Tue, Nov 14, 2017 at 8:56 PM, jkagitala <jka...@gmail.com> wrote:
> Hi,
>
> I'm trying to add spark-streaming to our kafka topic. But, I keep getting
> this error
> java.lang.AssertionError: assertion failed: Failed to get record after
> polling for 512 ms.
>
> I tried to add different params like max.poll.interval.ms,
> spark.streaming.kafka.consumer.poll.ms to 10000ms in kafkaParams.
> But, i still get failed to get records after 512ms. Not sure, even adding
> the above params doesn't change the polling time.
>
> Without spark-streaming, i'm able to fetch the records. Only with
> spark-streaming addon, i get this error.
>
> Any help is greatly appreciated. Below, is the code i'm using.
>
> SparkConf sparkConf = new
> SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]");
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(10));
>
> kafkaParams.put("bootstrap.servers", hosts);
> kafkaParams.put("group.id", groupid);
> kafkaParams.put("auto.commit.enable", false);
> kafkaParams.put("key.deserializer", StringDeserializer.class);
> kafkaParams.put("value.deserializer", BytesDeserializer.class);
> kafkaParams.put("auto.offset.reset", "earliest");
> //kafkaParams.put("max.poll.interval.ms", 12000);
> //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000);
> //kafkaParams.put("request.timeout.ms", 12000);
>
>
> JavaInputDStream<ConsumerRecord&lt;String, List&lt;Bytes>>> messages =
>                           KafkaUtils.createDirectStream(ssc,
> LocationStrategies.PreferConsistent(),
>
> ConsumerStrategies.Subscribe(topics, kafkaParams));
> messages.foreachRDD(rdd -> {
>                 List<ConsumerRecord&lt;String, List&lt;Bytes>>> input = 
> rdd.collect();
>                 System.out.println("count is"+input.size());
>         });
> ssc.start();
> ssc.awaitTermination();
>
> Thanks
> Jagadish
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to