Re: Mac vs cluster Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Hster Geguri
Hi Cody,

Our test producer has been vetted for producing evenly into each
partition.  We use kafka-manager to track this.

$ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list '
> 10.102.22.11:9092' --topic simple_logtest --time -2
> simple_logtest:2:0
> simple_logtest:4:0
> simple_logtest:1:0
> simple_logtest:3:0
> simple_logtest:0:0
> $ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list '
> 10.102.22.11:9092' --topic simple_logtest --time -1
> simple_logtest:2:722964
> simple_logtest:4:722864
> simple_logtest:1:722957
> simple_logtest:3:722960
> simple_logtest:0:723021


We have spent two weeks trying different configurations and stripping
everything down.  The only thing we have not tried is a different cloud
provider- we are using GCE. Since previous versions work properly as does
the "latest" offset setting, we did not think the problem was in the
infrastructure layer.

Thanks,
Heji


On Sat, Nov 19, 2016 at 9:27 AM, Cody Koeninger  wrote:

> This is running locally on my mac, but it's still a standalone spark
> master with multiple separate executor jvms (i.e. using --master not
> --local[2]), so it should be the same code paths.  I can't speak to
> yarn one way or the other, but you said you tried it with the
> standalone scheduler.
>
> At the very least, you should run ./bin/kafka-run-class.sh
> kafka.tools.GetOffsetShell  with -1 and -2 and compare those results
> to what you're seeing from spark.  The results you posted from spark
> didn't show any incoming messages at all.
>
> On Sat, Nov 19, 2016 at 11:12 AM, Hster Geguri
>  wrote:
> > Hi Cody,
> >
> > Thank you for testing this on a Saturday morning!  I failed to mention
> that
> > when our data engineer runs our drivers(even complex ones) locally on his
> > Mac, the drivers work fine. However when we launch it into the cluster (4
> > machines either for a YARN cluster or spark standalone) we get this
> issue.
> >
> > Heji
> >
> >
> > On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger 
> wrote:
> >>
> >> I ran your example using the versions of kafka and spark you are
> >> using, against a standalone cluster.  This is what I observed:
> >>
> >> (in kafka working directory)
> >>
> >> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> >> --broker-list 'localhost:9092' --topic simple_logtest --time -2
> >> simple_logtest:2:0
> >> simple_logtest:4:0
> >> simple_logtest:1:0
> >> simple_logtest:3:0
> >> simple_logtest:0:0
> >>
> >> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> >> --broker-list 'localhost:9092' --topic simple_logtest --time -1
> >> simple_logtest:2:31
> >> simple_logtest:4:31
> >> simple_logtest:1:31
> >> simple_logtest:3:31
> >> simple_logtest:0:31
> >>
> >> So in other words, there are 5 partitions, they all have messages in
> them
> >>
> >> (in spark working directory)
> >>
> >> bash-3.2$ ./bin/spark-submit --master
> >> spark://Codys-MacBook-Pro.local:7077 --class
> >> example.SimpleKafkaLoggingDriver
> >>
> >> /private/var/tmp/kafka-bug-report/target/scala-2.11/
> kafka-example-assembly-2.0.0.jar
> >> localhost:9092 simple_logtest mygroup earliest
> >>
> >>
> >> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
> >> 1479574025000 ms.0 from job set of time 1479574025000 ms
> >>
> >> simple_logtest 3 offsets: 0 to 31
> >> simple_logtest 0 offsets: 0 to 31
> >> simple_logtest 1 offsets: 0 to 31
> >> simple_logtest 2 offsets: 0 to 31
> >> simple_logtest 4 offsets: 0 to 31
> >>
> >> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
> >> 1479574025000 ms.0 from job set of time 1479574025000 ms
> >> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
> >> 1479574025000 ms (execution: 0.005 s)
> >> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
> >> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
> >> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 147957403
> ms
> >> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
> >> 147957403 ms.0 from job set of time 147957403 ms
> >>
> >> simple_logtest 3 offsets: 31 to 31
> >> simple_logtest 0 offsets: 31 to 31
> >> simple_logtest 1 offsets: 31 to 31
> >> simple_logtest 2 offsets: 31 to 31
> >> simple_logtest 4 offsets: 31 to 31
> >>
> >> So in other words, spark is indeed seeing offsets for each partition.
> >>
> >>
> >> The results you posted look to me like there aren't any messages going
> >> into the other partitions, which looks like a misbehaving producer.
> >>
> >> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
> >>  wrote:
> >> > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we
> have
> >> > been
> >> > struggling with this show stopper problem.
> >> >
> >> > When we run our drivers with auto.offset.reset=latest ingesting from a
> >> > single kafka topic with 10 partitions, the driver reads correctly from
> >> > 

Re: Mac vs cluster Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Cody Koeninger
This is running locally on my mac, but it's still a standalone spark
master with multiple separate executor jvms (i.e. using --master not
--local[2]), so it should be the same code paths.  I can't speak to
yarn one way or the other, but you said you tried it with the
standalone scheduler.

At the very least, you should run ./bin/kafka-run-class.sh
kafka.tools.GetOffsetShell  with -1 and -2 and compare those results
to what you're seeing from spark.  The results you posted from spark
didn't show any incoming messages at all.

On Sat, Nov 19, 2016 at 11:12 AM, Hster Geguri
 wrote:
> Hi Cody,
>
> Thank you for testing this on a Saturday morning!  I failed to mention that
> when our data engineer runs our drivers(even complex ones) locally on his
> Mac, the drivers work fine. However when we launch it into the cluster (4
> machines either for a YARN cluster or spark standalone) we get this issue.
>
> Heji
>
>
> On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger  wrote:
>>
>> I ran your example using the versions of kafka and spark you are
>> using, against a standalone cluster.  This is what I observed:
>>
>> (in kafka working directory)
>>
>> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
>> --broker-list 'localhost:9092' --topic simple_logtest --time -2
>> simple_logtest:2:0
>> simple_logtest:4:0
>> simple_logtest:1:0
>> simple_logtest:3:0
>> simple_logtest:0:0
>>
>> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
>> --broker-list 'localhost:9092' --topic simple_logtest --time -1
>> simple_logtest:2:31
>> simple_logtest:4:31
>> simple_logtest:1:31
>> simple_logtest:3:31
>> simple_logtest:0:31
>>
>> So in other words, there are 5 partitions, they all have messages in them
>>
>> (in spark working directory)
>>
>> bash-3.2$ ./bin/spark-submit --master
>> spark://Codys-MacBook-Pro.local:7077 --class
>> example.SimpleKafkaLoggingDriver
>>
>> /private/var/tmp/kafka-bug-report/target/scala-2.11/kafka-example-assembly-2.0.0.jar
>> localhost:9092 simple_logtest mygroup earliest
>>
>>
>> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
>> 1479574025000 ms.0 from job set of time 1479574025000 ms
>>
>> simple_logtest 3 offsets: 0 to 31
>> simple_logtest 0 offsets: 0 to 31
>> simple_logtest 1 offsets: 0 to 31
>> simple_logtest 2 offsets: 0 to 31
>> simple_logtest 4 offsets: 0 to 31
>>
>> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
>> 1479574025000 ms.0 from job set of time 1479574025000 ms
>> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
>> 1479574025000 ms (execution: 0.005 s)
>> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
>> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
>> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 147957403 ms
>> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
>> 147957403 ms.0 from job set of time 147957403 ms
>>
>> simple_logtest 3 offsets: 31 to 31
>> simple_logtest 0 offsets: 31 to 31
>> simple_logtest 1 offsets: 31 to 31
>> simple_logtest 2 offsets: 31 to 31
>> simple_logtest 4 offsets: 31 to 31
>>
>> So in other words, spark is indeed seeing offsets for each partition.
>>
>>
>> The results you posted look to me like there aren't any messages going
>> into the other partitions, which looks like a misbehaving producer.
>>
>> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
>>  wrote:
>> > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have
>> > been
>> > struggling with this show stopper problem.
>> >
>> > When we run our drivers with auto.offset.reset=latest ingesting from a
>> > single kafka topic with 10 partitions, the driver reads correctly from
>> > all
>> > 10 partitions.
>> >
>> > However when we use auto.offset.reset=earliest, the driver will read
>> > only a
>> > single partition.
>> >
>> > When we turn on the debug logs, we sometimes see partitions being set to
>> > different offset configuration even though the consumer config correctly
>> > indicates auto.offset.reset=earliest.
>> >
>> >> 8 DEBUG Resetting offset for partition simple_test-8 to earliest
>> >> offset.
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 8 TRACE Sending ListOffsetRequest
>> >>
>> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>> >> to broker 10.102.20.12:9092 (id: 12 rack: null)
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 9 TRACE Sending ListOffsetRequest
>> >>
>> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>> >> to broker 10.102.20.13:9092 (id: 13 rack: null)
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 8 TRACE Received ListOffsetResponse
>> >>
>> >> 

Mac vs cluster Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Hster Geguri
Hi Cody,

Thank you for testing this on a Saturday morning!  I failed to mention that
when our data engineer runs our drivers(even complex ones) locally on his
Mac, the drivers work fine. However when we launch it into the cluster (4
machines either for a YARN cluster or spark standalone) we get this issue.

Heji


On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger  wrote:

> I ran your example using the versions of kafka and spark you are
> using, against a standalone cluster.  This is what I observed:
>
> (in kafka working directory)
>
> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> --broker-list 'localhost:9092' --topic simple_logtest --time -2
> simple_logtest:2:0
> simple_logtest:4:0
> simple_logtest:1:0
> simple_logtest:3:0
> simple_logtest:0:0
>
> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> --broker-list 'localhost:9092' --topic simple_logtest --time -1
> simple_logtest:2:31
> simple_logtest:4:31
> simple_logtest:1:31
> simple_logtest:3:31
> simple_logtest:0:31
>
> So in other words, there are 5 partitions, they all have messages in them
>
> (in spark working directory)
>
> bash-3.2$ ./bin/spark-submit --master
> spark://Codys-MacBook-Pro.local:7077 --class
> example.SimpleKafkaLoggingDriver
> /private/var/tmp/kafka-bug-report/target/scala-2.11/
> kafka-example-assembly-2.0.0.jar
> localhost:9092 simple_logtest mygroup earliest
>
>
> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
> 1479574025000 ms.0 from job set of time 1479574025000 ms
>
> simple_logtest 3 offsets: 0 to 31
> simple_logtest 0 offsets: 0 to 31
> simple_logtest 1 offsets: 0 to 31
> simple_logtest 2 offsets: 0 to 31
> simple_logtest 4 offsets: 0 to 31
>
> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
> 1479574025000 ms.0 from job set of time 1479574025000 ms
> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
> 1479574025000 ms (execution: 0.005 s)
> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 147957403 ms
> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
> 147957403 ms.0 from job set of time 147957403 ms
>
> simple_logtest 3 offsets: 31 to 31
> simple_logtest 0 offsets: 31 to 31
> simple_logtest 1 offsets: 31 to 31
> simple_logtest 2 offsets: 31 to 31
> simple_logtest 4 offsets: 31 to 31
>
> So in other words, spark is indeed seeing offsets for each partition.
>
>
> The results you posted look to me like there aren't any messages going
> into the other partitions, which looks like a misbehaving producer.
>
> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
>  wrote:
> > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have
> been
> > struggling with this show stopper problem.
> >
> > When we run our drivers with auto.offset.reset=latest ingesting from a
> > single kafka topic with 10 partitions, the driver reads correctly from
> all
> > 10 partitions.
> >
> > However when we use auto.offset.reset=earliest, the driver will read
> only a
> > single partition.
> >
> > When we turn on the debug logs, we sometimes see partitions being set to
> > different offset configuration even though the consumer config correctly
> > indicates auto.offset.reset=earliest.
> >
> >> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset.
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 8 TRACE Sending ListOffsetRequest
> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{
> partition=8,timestamp=-2}]}]}
> >> to broker 10.102.20.12:9092 (id: 12 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 TRACE Sending ListOffsetRequest
> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{
> partition=9,timestamp=-1}]}]}
> >> to broker 10.102.20.13:9092 (id: 13 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 8 TRACE Received ListOffsetResponse
> >> {responses=[{topic=simple_test,partition_responses=[{
> partition=8,error_code=0,timestamp=-1,offset=0}]}]}
> >> from broker 10.102.20.12:9092 (id: 12 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 TRACE Received ListOffsetResponse
> >> {responses=[{topic=simple_test,partition_responses=[{
> partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
> >> from broker 10.102.20.13:9092 (id: 13 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >
> >
> >
>