I don't see anything obvious, you'd need to do more troubleshooting.

Could also try creating a single rdd for a known range of offsets:

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#creating-an-rdd

On Wed, Nov 15, 2017 at 9:33 PM, jagadish kagitala <jka...@gmail.com> wrote:
> Hi Cody,
>
> It worked, after moving the parameter to sparkConf. I don't see that error.
> But, Now i'm seeing the count for each RDD returns 0. But, there are records
> in the topic i'm reading.
>
> Do you see anything wrong with how i'm creating the Direct Stream ?
>
> Thanks
> Jagadish
>
> On Wed, Nov 15, 2017 at 11:23 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> spark.streaming.kafka.consumer.poll.ms  is a spark configuration, not
>> a kafka parameter.
>>
>> see http://spark.apache.org/docs/latest/configuration.html
>>
>> On Tue, Nov 14, 2017 at 8:56 PM, jkagitala <jka...@gmail.com> wrote:
>> > Hi,
>> >
>> > I'm trying to add spark-streaming to our kafka topic. But, I keep
>> > getting
>> > this error
>> > java.lang.AssertionError: assertion failed: Failed to get record after
>> > polling for 512 ms.
>> >
>> > I tried to add different params like max.poll.interval.ms,
>> > spark.streaming.kafka.consumer.poll.ms to 10000ms in kafkaParams.
>> > But, i still get failed to get records after 512ms. Not sure, even
>> > adding
>> > the above params doesn't change the polling time.
>> >
>> > Without spark-streaming, i'm able to fetch the records. Only with
>> > spark-streaming addon, i get this error.
>> >
>> > Any help is greatly appreciated. Below, is the code i'm using.
>> >
>> > SparkConf sparkConf = new
>> >
>> > SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]");
>> > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>> > Durations.seconds(10));
>> >
>> > kafkaParams.put("bootstrap.servers", hosts);
>> > kafkaParams.put("group.id", groupid);
>> > kafkaParams.put("auto.commit.enable", false);
>> > kafkaParams.put("key.deserializer", StringDeserializer.class);
>> > kafkaParams.put("value.deserializer", BytesDeserializer.class);
>> > kafkaParams.put("auto.offset.reset", "earliest");
>> > //kafkaParams.put("max.poll.interval.ms", 12000);
>> > //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000);
>> > //kafkaParams.put("request.timeout.ms", 12000);
>> >
>> >
>> > JavaInputDStream<ConsumerRecord&lt;String, List&lt;Bytes>>> messages =
>> >                           KafkaUtils.createDirectStream(ssc,
>> > LocationStrategies.PreferConsistent(),
>> >
>> > ConsumerStrategies.Subscribe(topics, kafkaParams));
>> > messages.foreachRDD(rdd -> {
>> >                 List<ConsumerRecord&lt;String, List&lt;Bytes>>> input =
>> > rdd.collect();
>> >                 System.out.println("count is"+input.size());
>> >         });
>> > ssc.start();
>> > ssc.awaitTermination();
>> >
>> > Thanks
>> > Jagadish
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to