Re: Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Bryan Jeffrey
Cody,

Thank you. Let me see if I can reproduce this. We're not seeing offsets load 
correctly on startup - but perhaps there is an error on my side.

Bryan

Get Outlook for Android<https://aka.ms/ghei36>


From: Cody Koeninger 
Sent: Thursday, June 14, 2018 5:01:01 PM
To: Bryan Jeffrey
Cc: user
Subject: Re: Kafka Offset Storage: Fetching Offsets

Offsets are loaded when you instantiate an
org.apache.kafka.clients.consumer.KafkaConsumer, subscribe, and poll.
There's not an explicit api for it.  Have you looked at the output of
kafka-consumer-groups.sh and tried the example code I linked to?


bash-3.2$ ./bin/kafka-consumer-groups.sh --bootstrap-server
localhost:9092 --group commitexample --describe
Note: This will only show information about consumers that use the
Java consumer API (non-ZooKeeper-based consumers).
Consumer group 'commitexample' has no active members.
TOPIC  PARTITION  CURRENT-OFFSET
LOG-END-OFFSET  LAGCONSUMER-ID
  HOST   CLIENT-ID
test   0  10561656
   600- -


scala> val c = new KafkaConsumer[String, String](kafkaParams.asJava)
c: org.apache.kafka.clients.consumer.KafkaConsumer[String,String] =
org.apache.kafka.clients.consumer.KafkaConsumer@780cbdf8
scala> c.subscribe(java.util.Arrays.asList("test"))
scala> c.poll(0)
scala> c.position(new TopicPartition("test", 0))
res4: Long = 1056






On Thu, Jun 14, 2018 at 3:33 PM, Bryan Jeffrey  wrote:
> Cody,
>
> Where is that called in the driver? The only call I see from Subscribe is to
> load the offset from checkpoint.
>
> Get Outlook for Android
>
> 
> From: Cody Koeninger 
> Sent: Thursday, June 14, 2018 4:24:58 PM
>
> To: Bryan Jeffrey
> Cc: user
> Subject: Re: Kafka Offset Storage: Fetching Offsets
>
> The code that loads offsets from kafka is in e.g.
> org.apache.kafka.clients.consumer, it's not in spark.
>
> On Thu, Jun 14, 2018 at 3:22 PM, Bryan Jeffrey 
> wrote:
>> Cody,
>>
>> Can you point me to the code that loads offsets? As far as I can see with
>> Spark 2.1, the only offset load is from checkpoint.
>>
>> Thank you!
>>
>> Bryan
>>
>> Get Outlook for Android
>>
>> ________
>> From: Cody Koeninger 
>> Sent: Thursday, June 14, 2018 4:00:31 PM
>> To: Bryan Jeffrey
>> Cc: user
>> Subject: Re: Kafka Offset Storage: Fetching Offsets
>>
>> The expectation is that you shouldn't have to manually load offsets
>> from kafka, because the underlying kafka consumer on the driver will
>> start at the offsets associated with the given group id.
>>
>> That's the behavior I see with this example:
>>
>>
>> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala
>>
>> What does bin/kafka-consumer-groups.sh show for your group id?
>>
>> On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey 
>> wrote:
>>> Hello.
>>>
>>> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based
>>> on
>>> the documentation
>>>
>>>
>>> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
>>> it appears that you can now use Kafka itself to store offsets.
>>>
>>> I've setup a  simple Kafka DStream:
>>> val kafkaParameters = Map[String, String](
>>>   "metadata.broker.list" -> brokers,
>>>   "auto.offset.reset" -> "latest",
>>>   "enable.auto.commit" -> false.toString,
>>>   "key.deserializer" ->
>>>
>>>
>>> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>>>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>>>   "partition.assignment.strategy" ->
>>>
>>>
>>> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>>>   "bootstrap.servers" -> brokersToUse.mkString(","),
>>>   "group.id" -> applicationName
>>> )
>>>
>>> val consumerStrategy = ConsumerStrategies.Subscribe[String,
>>> DecodedData](topics.toSeq, kafkaParameters)
>>> KafkaUtils.createDirectStream(ssc, locationStrategy =
>>> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>>>
>>>
>>> I then commit the offsets:
>>>
>>> var offsets: Array[OffsetRange] = Ar

Re: Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Cody Koeninger
Offsets are loaded when you instantiate an
org.apache.kafka.clients.consumer.KafkaConsumer, subscribe, and poll.
There's not an explicit api for it.  Have you looked at the output of
kafka-consumer-groups.sh and tried the example code I linked to?


bash-3.2$ ./bin/kafka-consumer-groups.sh --bootstrap-server
localhost:9092 --group commitexample --describe
Note: This will only show information about consumers that use the
Java consumer API (non-ZooKeeper-based consumers).
Consumer group 'commitexample' has no active members.
TOPIC  PARTITION  CURRENT-OFFSET
LOG-END-OFFSET  LAGCONSUMER-ID
  HOST   CLIENT-ID
test   0  10561656
   600- -


scala> val c = new KafkaConsumer[String, String](kafkaParams.asJava)
c: org.apache.kafka.clients.consumer.KafkaConsumer[String,String] =
org.apache.kafka.clients.consumer.KafkaConsumer@780cbdf8
scala> c.subscribe(java.util.Arrays.asList("test"))
scala> c.poll(0)
scala> c.position(new TopicPartition("test", 0))
res4: Long = 1056






On Thu, Jun 14, 2018 at 3:33 PM, Bryan Jeffrey  wrote:
> Cody,
>
> Where is that called in the driver? The only call I see from Subscribe is to
> load the offset from checkpoint.
>
> Get Outlook for Android
>
> 
> From: Cody Koeninger 
> Sent: Thursday, June 14, 2018 4:24:58 PM
>
> To: Bryan Jeffrey
> Cc: user
> Subject: Re: Kafka Offset Storage: Fetching Offsets
>
> The code that loads offsets from kafka is in e.g.
> org.apache.kafka.clients.consumer, it's not in spark.
>
> On Thu, Jun 14, 2018 at 3:22 PM, Bryan Jeffrey 
> wrote:
>> Cody,
>>
>> Can you point me to the code that loads offsets? As far as I can see with
>> Spark 2.1, the only offset load is from checkpoint.
>>
>> Thank you!
>>
>> Bryan
>>
>> Get Outlook for Android
>>
>> ____________
>> From: Cody Koeninger 
>> Sent: Thursday, June 14, 2018 4:00:31 PM
>> To: Bryan Jeffrey
>> Cc: user
>> Subject: Re: Kafka Offset Storage: Fetching Offsets
>>
>> The expectation is that you shouldn't have to manually load offsets
>> from kafka, because the underlying kafka consumer on the driver will
>> start at the offsets associated with the given group id.
>>
>> That's the behavior I see with this example:
>>
>>
>> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala
>>
>> What does bin/kafka-consumer-groups.sh show for your group id?
>>
>> On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey 
>> wrote:
>>> Hello.
>>>
>>> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based
>>> on
>>> the documentation
>>>
>>>
>>> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
>>> it appears that you can now use Kafka itself to store offsets.
>>>
>>> I've setup a  simple Kafka DStream:
>>> val kafkaParameters = Map[String, String](
>>>   "metadata.broker.list" -> brokers,
>>>   "auto.offset.reset" -> "latest",
>>>   "enable.auto.commit" -> false.toString,
>>>   "key.deserializer" ->
>>>
>>>
>>> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>>>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>>>   "partition.assignment.strategy" ->
>>>
>>>
>>> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>>>   "bootstrap.servers" -> brokersToUse.mkString(","),
>>>   "group.id" -> applicationName
>>> )
>>>
>>> val consumerStrategy = ConsumerStrategies.Subscribe[String,
>>> DecodedData](topics.toSeq, kafkaParameters)
>>> KafkaUtils.createDirectStream(ssc, locationStrategy =
>>> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>>>
>>>
>>> I then commit the offsets:
>>>
>>> var offsets: Array[OffsetRange] = Array()
>>> stream.foreachRDD(rdd => {
>>>   offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>   logger.info(s"Offsets: ${offsets.mkString("|")}")
>>> })
>>>
>>> // Future: Move this after we've done processing.
>>> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>>>
>>> The offsets appear to commit successfully. However, on restart the
>>> streaming
>>> application consistently starts from latest whenever the Spark checkpoint
>>> is
>>> changed.  Drilling into the code it does not appear that re-loading
>>> offset
>>> data is supported in the Spark Streaming Kafka library.  How is this
>>> expected to work?  Is there an example of saving the offsets to Kafka and
>>> then loading them from Kafka?
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Bryan Jeffrey
Cody,

Where is that called in the driver? The only call I see from Subscribe is to 
load the offset from checkpoint.

Get Outlook for Android<https://aka.ms/ghei36>


From: Cody Koeninger 
Sent: Thursday, June 14, 2018 4:24:58 PM
To: Bryan Jeffrey
Cc: user
Subject: Re: Kafka Offset Storage: Fetching Offsets

The code that loads offsets from kafka is in e.g.
org.apache.kafka.clients.consumer, it's not in spark.

On Thu, Jun 14, 2018 at 3:22 PM, Bryan Jeffrey  wrote:
> Cody,
>
> Can you point me to the code that loads offsets? As far as I can see with
> Spark 2.1, the only offset load is from checkpoint.
>
> Thank you!
>
> Bryan
>
> Get Outlook for Android
>
> 
> From: Cody Koeninger 
> Sent: Thursday, June 14, 2018 4:00:31 PM
> To: Bryan Jeffrey
> Cc: user
> Subject: Re: Kafka Offset Storage: Fetching Offsets
>
> The expectation is that you shouldn't have to manually load offsets
> from kafka, because the underlying kafka consumer on the driver will
> start at the offsets associated with the given group id.
>
> That's the behavior I see with this example:
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala
>
> What does bin/kafka-consumer-groups.sh show for your group id?
>
> On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey 
> wrote:
>> Hello.
>>
>> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based
>> on
>> the documentation
>>
>> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
>> it appears that you can now use Kafka itself to store offsets.
>>
>> I've setup a  simple Kafka DStream:
>> val kafkaParameters = Map[String, String](
>>   "metadata.broker.list" -> brokers,
>>   "auto.offset.reset" -> "latest",
>>   "enable.auto.commit" -> false.toString,
>>   "key.deserializer" ->
>>
>> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>>   "partition.assignment.strategy" ->
>>
>> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>>   "bootstrap.servers" -> brokersToUse.mkString(","),
>>   "group.id" -> applicationName
>> )
>>
>> val consumerStrategy = ConsumerStrategies.Subscribe[String,
>> DecodedData](topics.toSeq, kafkaParameters)
>> KafkaUtils.createDirectStream(ssc, locationStrategy =
>> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>>
>>
>> I then commit the offsets:
>>
>> var offsets: Array[OffsetRange] = Array()
>> stream.foreachRDD(rdd => {
>>   offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>   logger.info(s"Offsets: ${offsets.mkString("|")}")
>> })
>>
>> // Future: Move this after we've done processing.
>> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>>
>> The offsets appear to commit successfully. However, on restart the
>> streaming
>> application consistently starts from latest whenever the Spark checkpoint
>> is
>> changed.  Drilling into the code it does not appear that re-loading offset
>> data is supported in the Spark Streaming Kafka library.  How is this
>> expected to work?  Is there an example of saving the offsets to Kafka and
>> then loading them from Kafka?
>>
>> Regards,
>>
>> Bryan Jeffrey


Re: Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Cody Koeninger
The code that loads offsets from kafka is in e.g.
org.apache.kafka.clients.consumer, it's not in spark.

On Thu, Jun 14, 2018 at 3:22 PM, Bryan Jeffrey  wrote:
> Cody,
>
> Can you point me to the code that loads offsets? As far as I can see with
> Spark 2.1, the only offset load is from checkpoint.
>
> Thank you!
>
> Bryan
>
> Get Outlook for Android
>
> 
> From: Cody Koeninger 
> Sent: Thursday, June 14, 2018 4:00:31 PM
> To: Bryan Jeffrey
> Cc: user
> Subject: Re: Kafka Offset Storage: Fetching Offsets
>
> The expectation is that you shouldn't have to manually load offsets
> from kafka, because the underlying kafka consumer on the driver will
> start at the offsets associated with the given group id.
>
> That's the behavior I see with this example:
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala
>
> What does bin/kafka-consumer-groups.sh show for your group id?
>
> On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey 
> wrote:
>> Hello.
>>
>> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based
>> on
>> the documentation
>>
>> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
>> it appears that you can now use Kafka itself to store offsets.
>>
>> I've setup a  simple Kafka DStream:
>> val kafkaParameters = Map[String, String](
>>   "metadata.broker.list" -> brokers,
>>   "auto.offset.reset" -> "latest",
>>   "enable.auto.commit" -> false.toString,
>>   "key.deserializer" ->
>>
>> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>>   "partition.assignment.strategy" ->
>>
>> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>>   "bootstrap.servers" -> brokersToUse.mkString(","),
>>   "group.id" -> applicationName
>> )
>>
>> val consumerStrategy = ConsumerStrategies.Subscribe[String,
>> DecodedData](topics.toSeq, kafkaParameters)
>> KafkaUtils.createDirectStream(ssc, locationStrategy =
>> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>>
>>
>> I then commit the offsets:
>>
>> var offsets: Array[OffsetRange] = Array()
>> stream.foreachRDD(rdd => {
>>   offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>   logger.info(s"Offsets: ${offsets.mkString("|")}")
>> })
>>
>> // Future: Move this after we've done processing.
>> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>>
>> The offsets appear to commit successfully. However, on restart the
>> streaming
>> application consistently starts from latest whenever the Spark checkpoint
>> is
>> changed.  Drilling into the code it does not appear that re-loading offset
>> data is supported in the Spark Streaming Kafka library.  How is this
>> expected to work?  Is there an example of saving the offsets to Kafka and
>> then loading them from Kafka?
>>
>> Regards,
>>
>> Bryan Jeffrey

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Bryan Jeffrey
Cody,

Can you point me to the code that loads offsets? As far as I can see with Spark 
2.1, the only offset load is from checkpoint.

Thank you!

Bryan

Get Outlook for Android<https://aka.ms/ghei36>


From: Cody Koeninger 
Sent: Thursday, June 14, 2018 4:00:31 PM
To: Bryan Jeffrey
Cc: user
Subject: Re: Kafka Offset Storage: Fetching Offsets

The expectation is that you shouldn't have to manually load offsets
from kafka, because the underlying kafka consumer on the driver will
start at the offsets associated with the given group id.

That's the behavior I see with this example:

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala

What does bin/kafka-consumer-groups.sh show for your group id?

On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey  wrote:
> Hello.
>
> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based on
> the documentation
> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
> it appears that you can now use Kafka itself to store offsets.
>
> I've setup a  simple Kafka DStream:
> val kafkaParameters = Map[String, String](
>   "metadata.broker.list" -> brokers,
>   "auto.offset.reset" -> "latest",
>   "enable.auto.commit" -> false.toString,
>   "key.deserializer" ->
> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>   "partition.assignment.strategy" ->
> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>   "bootstrap.servers" -> brokersToUse.mkString(","),
>   "group.id" -> applicationName
> )
>
> val consumerStrategy = ConsumerStrategies.Subscribe[String,
> DecodedData](topics.toSeq, kafkaParameters)
> KafkaUtils.createDirectStream(ssc, locationStrategy =
> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>
>
> I then commit the offsets:
>
> var offsets: Array[OffsetRange] = Array()
> stream.foreachRDD(rdd => {
>   offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   logger.info(s"Offsets: ${offsets.mkString("|")}")
> })
>
> // Future: Move this after we've done processing.
> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>
> The offsets appear to commit successfully. However, on restart the streaming
> application consistently starts from latest whenever the Spark checkpoint is
> changed.  Drilling into the code it does not appear that re-loading offset
> data is supported in the Spark Streaming Kafka library.  How is this
> expected to work?  Is there an example of saving the offsets to Kafka and
> then loading them from Kafka?
>
> Regards,
>
> Bryan Jeffrey


Re: Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Cody Koeninger
The expectation is that you shouldn't have to manually load offsets
from kafka, because the underlying kafka consumer on the driver will
start at the offsets associated with the given group id.

That's the behavior I see with this example:

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala

What does bin/kafka-consumer-groups.sh show for your group id?

On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey  wrote:
> Hello.
>
> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based on
> the documentation
> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
> it appears that you can now use Kafka itself to store offsets.
>
> I've setup a  simple Kafka DStream:
> val kafkaParameters = Map[String, String](
>   "metadata.broker.list" -> brokers,
>   "auto.offset.reset" -> "latest",
>   "enable.auto.commit" -> false.toString,
>   "key.deserializer" ->
> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>   "partition.assignment.strategy" ->
> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>   "bootstrap.servers" -> brokersToUse.mkString(","),
>   "group.id" -> applicationName
> )
>
> val consumerStrategy = ConsumerStrategies.Subscribe[String,
> DecodedData](topics.toSeq, kafkaParameters)
> KafkaUtils.createDirectStream(ssc, locationStrategy =
> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>
>
> I then commit the offsets:
>
> var offsets: Array[OffsetRange] = Array()
> stream.foreachRDD(rdd => {
>   offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   logger.info(s"Offsets: ${offsets.mkString("|")}")
> })
>
> // Future: Move this after we've done processing.
> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>
> The offsets appear to commit successfully. However, on restart the streaming
> application consistently starts from latest whenever the Spark checkpoint is
> changed.  Drilling into the code it does not appear that re-loading offset
> data is supported in the Spark Streaming Kafka library.  How is this
> expected to work?  Is there an example of saving the offsets to Kafka and
> then loading them from Kafka?
>
> Regards,
>
> Bryan Jeffrey

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Bryan Jeffrey
Hello.

I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based
on the documentation (
https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
it appears that you can now use Kafka itself to store offsets.

I've setup a  simple Kafka DStream:
val kafkaParameters = Map[String, String](
  "metadata.broker.list" -> brokers,
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> false.toString,
  "key.deserializer" ->
classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
  "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
  "partition.assignment.strategy" ->
classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
  "bootstrap.servers" -> brokersToUse.mkString(","),
  "group.id" -> applicationName
)

val consumerStrategy = ConsumerStrategies.Subscribe[String,
DecodedData](topics.toSeq, kafkaParameters)
KafkaUtils.createDirectStream(ssc, locationStrategy =
LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)


I then commit the offsets:

var offsets: Array[OffsetRange] = Array()
stream.foreachRDD(rdd => {
  offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  logger.info(s"Offsets: ${offsets.mkString("|")}")
})

// Future: Move this after we've done processing.
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)

The offsets appear to commit successfully. However, on restart the
streaming application consistently starts from latest whenever the Spark
checkpoint is changed.  Drilling into the code it does not appear that
re-loading offset data is supported in the Spark Streaming Kafka library.
How is this expected to work?  Is there an example of saving the offsets to
Kafka and then loading them from Kafka?

Regards,

Bryan Jeffrey