Re: Mac vs cluster Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Hster Geguri
Hi Cody,

Our test producer has been vetted for producing evenly into each
partition.  We use kafka-manager to track this.

$ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list '
> 10.102.22.11:9092' --topic simple_logtest --time -2
> simple_logtest:2:0
> simple_logtest:4:0
> simple_logtest:1:0
> simple_logtest:3:0
> simple_logtest:0:0
> $ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list '
> 10.102.22.11:9092' --topic simple_logtest --time -1
> simple_logtest:2:722964
> simple_logtest:4:722864
> simple_logtest:1:722957
> simple_logtest:3:722960
> simple_logtest:0:723021


We have spent two weeks trying different configurations and stripping
everything down.  The only thing we have not tried is a different cloud
provider- we are using GCE. Since previous versions work properly as does
the "latest" offset setting, we did not think the problem was in the
infrastructure layer.

Thanks,
Heji


On Sat, Nov 19, 2016 at 9:27 AM, Cody Koeninger  wrote:

> This is running locally on my mac, but it's still a standalone spark
> master with multiple separate executor jvms (i.e. using --master not
> --local[2]), so it should be the same code paths.  I can't speak to
> yarn one way or the other, but you said you tried it with the
> standalone scheduler.
>
> At the very least, you should run ./bin/kafka-run-class.sh
> kafka.tools.GetOffsetShell  with -1 and -2 and compare those results
> to what you're seeing from spark.  The results you posted from spark
> didn't show any incoming messages at all.
>
> On Sat, Nov 19, 2016 at 11:12 AM, Hster Geguri
>  wrote:
> > Hi Cody,
> >
> > Thank you for testing this on a Saturday morning!  I failed to mention
> that
> > when our data engineer runs our drivers(even complex ones) locally on his
> > Mac, the drivers work fine. However when we launch it into the cluster (4
> > machines either for a YARN cluster or spark standalone) we get this
> issue.
> >
> > Heji
> >
> >
> > On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger 
> wrote:
> >>
> >> I ran your example using the versions of kafka and spark you are
> >> using, against a standalone cluster.  This is what I observed:
> >>
> >> (in kafka working directory)
> >>
> >> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> >> --broker-list 'localhost:9092' --topic simple_logtest --time -2
> >> simple_logtest:2:0
> >> simple_logtest:4:0
> >> simple_logtest:1:0
> >> simple_logtest:3:0
> >> simple_logtest:0:0
> >>
> >> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> >> --broker-list 'localhost:9092' --topic simple_logtest --time -1
> >> simple_logtest:2:31
> >> simple_logtest:4:31
> >> simple_logtest:1:31
> >> simple_logtest:3:31
> >> simple_logtest:0:31
> >>
> >> So in other words, there are 5 partitions, they all have messages in
> them
> >>
> >> (in spark working directory)
> >>
> >> bash-3.2$ ./bin/spark-submit --master
> >> spark://Codys-MacBook-Pro.local:7077 --class
> >> example.SimpleKafkaLoggingDriver
> >>
> >> /private/var/tmp/kafka-bug-report/target/scala-2.11/
> kafka-example-assembly-2.0.0.jar
> >> localhost:9092 simple_logtest mygroup earliest
> >>
> >>
> >> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
> >> 1479574025000 ms.0 from job set of time 1479574025000 ms
> >>
> >> simple_logtest 3 offsets: 0 to 31
> >> simple_logtest 0 offsets: 0 to 31
> >> simple_logtest 1 offsets: 0 to 31
> >> simple_logtest 2 offsets: 0 to 31
> >> simple_logtest 4 offsets: 0 to 31
> >>
> >> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
> >> 1479574025000 ms.0 from job set of time 1479574025000 ms
> >> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
> >> 1479574025000 ms (execution: 0.005 s)
> >> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
> >> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
> >> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 147957403
> ms
> >> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
> >> 147957403 ms.0 from job set of time 147957403 ms
> >>
> >> simple_logtest 3 offsets: 31 to 31
> >> simple_logtest 0 offsets: 31 to 31
> >> simple_logtest 1 offsets: 31 to 31
> >> simple_logtest 2 offsets: 31 to 31
> >> simple_logtest 4 offsets: 31 to 31
> >>
> >> So in other words, spark is indeed seeing offsets for each partition.
> >>
> >>
> >> The results you posted look to me like there aren't any messages going
> >> into the other partitions, which looks like a misbehaving producer.
> >>
> >> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
> >>  wrote:
> >> > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we
> have
> >> > been
> >> > struggling with this show stopper problem.
> >> >
> >> > When we run our drivers with auto.offset.reset=latest ingesting from a
> >> > single kafka topic with 10 partitions, the driver reads correctly from
> >> > 

Re: Mac vs cluster Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Cody Koeninger
This is running locally on my mac, but it's still a standalone spark
master with multiple separate executor jvms (i.e. using --master not
--local[2]), so it should be the same code paths.  I can't speak to
yarn one way or the other, but you said you tried it with the
standalone scheduler.

At the very least, you should run ./bin/kafka-run-class.sh
kafka.tools.GetOffsetShell  with -1 and -2 and compare those results
to what you're seeing from spark.  The results you posted from spark
didn't show any incoming messages at all.

On Sat, Nov 19, 2016 at 11:12 AM, Hster Geguri
 wrote:
> Hi Cody,
>
> Thank you for testing this on a Saturday morning!  I failed to mention that
> when our data engineer runs our drivers(even complex ones) locally on his
> Mac, the drivers work fine. However when we launch it into the cluster (4
> machines either for a YARN cluster or spark standalone) we get this issue.
>
> Heji
>
>
> On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger  wrote:
>>
>> I ran your example using the versions of kafka and spark you are
>> using, against a standalone cluster.  This is what I observed:
>>
>> (in kafka working directory)
>>
>> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
>> --broker-list 'localhost:9092' --topic simple_logtest --time -2
>> simple_logtest:2:0
>> simple_logtest:4:0
>> simple_logtest:1:0
>> simple_logtest:3:0
>> simple_logtest:0:0
>>
>> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
>> --broker-list 'localhost:9092' --topic simple_logtest --time -1
>> simple_logtest:2:31
>> simple_logtest:4:31
>> simple_logtest:1:31
>> simple_logtest:3:31
>> simple_logtest:0:31
>>
>> So in other words, there are 5 partitions, they all have messages in them
>>
>> (in spark working directory)
>>
>> bash-3.2$ ./bin/spark-submit --master
>> spark://Codys-MacBook-Pro.local:7077 --class
>> example.SimpleKafkaLoggingDriver
>>
>> /private/var/tmp/kafka-bug-report/target/scala-2.11/kafka-example-assembly-2.0.0.jar
>> localhost:9092 simple_logtest mygroup earliest
>>
>>
>> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
>> 1479574025000 ms.0 from job set of time 1479574025000 ms
>>
>> simple_logtest 3 offsets: 0 to 31
>> simple_logtest 0 offsets: 0 to 31
>> simple_logtest 1 offsets: 0 to 31
>> simple_logtest 2 offsets: 0 to 31
>> simple_logtest 4 offsets: 0 to 31
>>
>> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
>> 1479574025000 ms.0 from job set of time 1479574025000 ms
>> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
>> 1479574025000 ms (execution: 0.005 s)
>> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
>> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
>> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 147957403 ms
>> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
>> 147957403 ms.0 from job set of time 147957403 ms
>>
>> simple_logtest 3 offsets: 31 to 31
>> simple_logtest 0 offsets: 31 to 31
>> simple_logtest 1 offsets: 31 to 31
>> simple_logtest 2 offsets: 31 to 31
>> simple_logtest 4 offsets: 31 to 31
>>
>> So in other words, spark is indeed seeing offsets for each partition.
>>
>>
>> The results you posted look to me like there aren't any messages going
>> into the other partitions, which looks like a misbehaving producer.
>>
>> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
>>  wrote:
>> > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have
>> > been
>> > struggling with this show stopper problem.
>> >
>> > When we run our drivers with auto.offset.reset=latest ingesting from a
>> > single kafka topic with 10 partitions, the driver reads correctly from
>> > all
>> > 10 partitions.
>> >
>> > However when we use auto.offset.reset=earliest, the driver will read
>> > only a
>> > single partition.
>> >
>> > When we turn on the debug logs, we sometimes see partitions being set to
>> > different offset configuration even though the consumer config correctly
>> > indicates auto.offset.reset=earliest.
>> >
>> >> 8 DEBUG Resetting offset for partition simple_test-8 to earliest
>> >> offset.
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 8 TRACE Sending ListOffsetRequest
>> >>
>> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>> >> to broker 10.102.20.12:9092 (id: 12 rack: null)
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 9 TRACE Sending ListOffsetRequest
>> >>
>> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>> >> to broker 10.102.20.13:9092 (id: 13 rack: null)
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 8 TRACE Received ListOffsetResponse
>> >>
>> >> 

Mac vs cluster Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Hster Geguri
Hi Cody,

Thank you for testing this on a Saturday morning!  I failed to mention that
when our data engineer runs our drivers(even complex ones) locally on his
Mac, the drivers work fine. However when we launch it into the cluster (4
machines either for a YARN cluster or spark standalone) we get this issue.

Heji


On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger  wrote:

> I ran your example using the versions of kafka and spark you are
> using, against a standalone cluster.  This is what I observed:
>
> (in kafka working directory)
>
> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> --broker-list 'localhost:9092' --topic simple_logtest --time -2
> simple_logtest:2:0
> simple_logtest:4:0
> simple_logtest:1:0
> simple_logtest:3:0
> simple_logtest:0:0
>
> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> --broker-list 'localhost:9092' --topic simple_logtest --time -1
> simple_logtest:2:31
> simple_logtest:4:31
> simple_logtest:1:31
> simple_logtest:3:31
> simple_logtest:0:31
>
> So in other words, there are 5 partitions, they all have messages in them
>
> (in spark working directory)
>
> bash-3.2$ ./bin/spark-submit --master
> spark://Codys-MacBook-Pro.local:7077 --class
> example.SimpleKafkaLoggingDriver
> /private/var/tmp/kafka-bug-report/target/scala-2.11/
> kafka-example-assembly-2.0.0.jar
> localhost:9092 simple_logtest mygroup earliest
>
>
> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
> 1479574025000 ms.0 from job set of time 1479574025000 ms
>
> simple_logtest 3 offsets: 0 to 31
> simple_logtest 0 offsets: 0 to 31
> simple_logtest 1 offsets: 0 to 31
> simple_logtest 2 offsets: 0 to 31
> simple_logtest 4 offsets: 0 to 31
>
> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
> 1479574025000 ms.0 from job set of time 1479574025000 ms
> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
> 1479574025000 ms (execution: 0.005 s)
> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 147957403 ms
> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
> 147957403 ms.0 from job set of time 147957403 ms
>
> simple_logtest 3 offsets: 31 to 31
> simple_logtest 0 offsets: 31 to 31
> simple_logtest 1 offsets: 31 to 31
> simple_logtest 2 offsets: 31 to 31
> simple_logtest 4 offsets: 31 to 31
>
> So in other words, spark is indeed seeing offsets for each partition.
>
>
> The results you posted look to me like there aren't any messages going
> into the other partitions, which looks like a misbehaving producer.
>
> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
>  wrote:
> > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have
> been
> > struggling with this show stopper problem.
> >
> > When we run our drivers with auto.offset.reset=latest ingesting from a
> > single kafka topic with 10 partitions, the driver reads correctly from
> all
> > 10 partitions.
> >
> > However when we use auto.offset.reset=earliest, the driver will read
> only a
> > single partition.
> >
> > When we turn on the debug logs, we sometimes see partitions being set to
> > different offset configuration even though the consumer config correctly
> > indicates auto.offset.reset=earliest.
> >
> >> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset.
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 8 TRACE Sending ListOffsetRequest
> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{
> partition=8,timestamp=-2}]}]}
> >> to broker 10.102.20.12:9092 (id: 12 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 TRACE Sending ListOffsetRequest
> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{
> partition=9,timestamp=-1}]}]}
> >> to broker 10.102.20.13:9092 (id: 13 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 8 TRACE Received ListOffsetResponse
> >> {responses=[{topic=simple_test,partition_responses=[{
> partition=8,error_code=0,timestamp=-1,offset=0}]}]}
> >> from broker 10.102.20.12:9092 (id: 12 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 TRACE Received ListOffsetResponse
> >> {responses=[{topic=simple_test,partition_responses=[{
> partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
> >> from broker 10.102.20.13:9092 (id: 13 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >
> >
> >
> 

Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Cody Koeninger
I ran your example using the versions of kafka and spark you are
using, against a standalone cluster.  This is what I observed:

(in kafka working directory)

bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list 'localhost:9092' --topic simple_logtest --time -2
simple_logtest:2:0
simple_logtest:4:0
simple_logtest:1:0
simple_logtest:3:0
simple_logtest:0:0

bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list 'localhost:9092' --topic simple_logtest --time -1
simple_logtest:2:31
simple_logtest:4:31
simple_logtest:1:31
simple_logtest:3:31
simple_logtest:0:31

So in other words, there are 5 partitions, they all have messages in them

(in spark working directory)

bash-3.2$ ./bin/spark-submit --master
spark://Codys-MacBook-Pro.local:7077 --class
example.SimpleKafkaLoggingDriver
/private/var/tmp/kafka-bug-report/target/scala-2.11/kafka-example-assembly-2.0.0.jar
localhost:9092 simple_logtest mygroup earliest


16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
1479574025000 ms.0 from job set of time 1479574025000 ms

simple_logtest 3 offsets: 0 to 31
simple_logtest 0 offsets: 0 to 31
simple_logtest 1 offsets: 0 to 31
simple_logtest 2 offsets: 0 to 31
simple_logtest 4 offsets: 0 to 31

16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
1479574025000 ms.0 from job set of time 1479574025000 ms
16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
1479574025000 ms (execution: 0.005 s)
16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 147957403 ms
16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
147957403 ms.0 from job set of time 147957403 ms

simple_logtest 3 offsets: 31 to 31
simple_logtest 0 offsets: 31 to 31
simple_logtest 1 offsets: 31 to 31
simple_logtest 2 offsets: 31 to 31
simple_logtest 4 offsets: 31 to 31

So in other words, spark is indeed seeing offsets for each partition.


The results you posted look to me like there aren't any messages going
into the other partitions, which looks like a misbehaving producer.

On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
 wrote:
> Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have been
> struggling with this show stopper problem.
>
> When we run our drivers with auto.offset.reset=latest ingesting from a
> single kafka topic with 10 partitions, the driver reads correctly from all
> 10 partitions.
>
> However when we use auto.offset.reset=earliest, the driver will read only a
> single partition.
>
> When we turn on the debug logs, we sometimes see partitions being set to
> different offset configuration even though the consumer config correctly
> indicates auto.offset.reset=earliest.
>
>> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset.
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 8 TRACE Sending ListOffsetRequest
>> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>> to broker 10.102.20.12:9092 (id: 12 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 TRACE Sending ListOffsetRequest
>> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>> to broker 10.102.20.13:9092 (id: 13 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 8 TRACE Received ListOffsetResponse
>> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>> from broker 10.102.20.12:9092 (id: 12 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 TRACE Received ListOffsetResponse
>> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>> from broker 10.102.20.13:9092 (id: 13 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>
>
>
> I've enclosed below the completely stripped down trivial test driver that
> shows this behavior. We normally run with YARN 2.7.3 but have also tried
> running spark standalone mode which has the same behavior. Our drivers are
> normally java but we have tried the scala version which also has the same
> incorrect behavior. We have tried different LocationStrategies and partition
> assignment strategies all without success.  Any insight would be greatly
> appreciated.
>
> package com.x.labs.analytics.diagnostics.spark.drivers
>
> import org.apache.kafka.common.serialization.StringDeserializer
> import 

kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-17 Thread Hster Geguri
Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have
been struggling with this show stopper problem.

When we run our drivers with auto.offset.reset=latest ingesting from a
single kafka topic with 10 partitions, the driver reads correctly from all
10 partitions.

However when we use auto.offset.reset=earliest, the driver will read only a
single partition.

When we turn on the debug logs, we sometimes see partitions being set to
different offset configuration even though the consumer config correctly
indicates auto.offset.reset=earliest.

8 DEBUG Resetting offset for *partition simple_test-8 to earliest offset*.
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for *partition simple_test-9 to latest offset*.
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
> to broker 10.102.20.12:9092 (id: 12 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
> to broker 10.102.20.13:9092 (id: 13 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
> from broker 10.102.20.12:9092 (id: 12 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
> from broker 10.102.20.13:9092 (id: 13 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9
> (org.apache.kafka.clients.consumer.internals.Fetcher)



I've enclosed below the completely stripped down trivial test driver that
shows this behavior. We normally run with YARN 2.7.3 but have also tried
running spark standalone mode which has the same behavior. Our drivers are
normally java but we have tried the scala version which also has the same
incorrect behavior. We have tried different LocationStrategies and
partition assignment strategies all without success.  Any insight would be
greatly appreciated.

package com.x.labs.analytics.diagnostics.spark.drivers

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.ConsumerStrategies


/**
  *
  * This driver is only for pulling data from the stream and logging
to output just to isolate single partition bug
  */
object SimpleKafkaLoggingDriver {
  def main(args: Array[String]) {
if (args.length != 4) {
  System.err.println("Usage: SimpleTestDriver")
  System.exit(1)
}

val Array(brokers, topic, groupId, offsetReset) = args
val preferredHosts = LocationStrategies.PreferConsistent
val topics = List(topic)

val kafkaParams = Map(
  "bootstrap.servers" -> brokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> groupId,
  "auto.offset.reset" -> offsetReset,
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_" +topic)
val streamingContext = new StreamingContext(sparkConf, Seconds(5))


val dstream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  preferredHosts,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

dstream.foreachRDD { rdd =>
  // Get the offset ranges in the RDD and log
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset}
to ${o.untilOffset}")
  }
}

streamingContext.start
streamingContext.awaitTermination()

  }

}



16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values:

auto.commit.interval.ms = 5000

auto.offset.reset = earliest

bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092]

check.crcs = true

client.id =

connections.max.idle.ms = 54

enable.auto.commit = false

exclude.internal.topics = true

fetch.max.bytes = 52428800

fetch.max.wait.ms = 500

fetch.min.bytes = 1

group.id = simple_test_group

heartbeat.interval.ms = 3000

interceptor.classes = null

key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer

max.partition.fetch.bytes = 1048576