This is running locally on my mac, but it's still a standalone spark
master with multiple separate executor jvms (i.e. using --master not
--local[2]), so it should be the same code paths.  I can't speak to
yarn one way or the other, but you said you tried it with the
standalone scheduler.

At the very least, you should run ./bin/kafka-run-class.sh
kafka.tools.GetOffsetShell  with -1 and -2 and compare those results
to what you're seeing from spark.  The results you posted from spark
didn't show any incoming messages at all.

On Sat, Nov 19, 2016 at 11:12 AM, Hster Geguri
<hster.investiga...@gmail.com> wrote:
> Hi Cody,
>
> Thank you for testing this on a Saturday morning!  I failed to mention that
> when our data engineer runs our drivers(even complex ones) locally on his
> Mac, the drivers work fine. However when we launch it into the cluster (4
> machines either for a YARN cluster or spark standalone) we get this issue.
>
> Heji
>
>
> On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> I ran your example using the versions of kafka and spark you are
>> using, against a standalone cluster.  This is what I observed:
>>
>> (in kafka working directory)
>>
>> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
>> --broker-list 'localhost:9092' --topic simple_logtest --time -2
>> simple_logtest:2:0
>> simple_logtest:4:0
>> simple_logtest:1:0
>> simple_logtest:3:0
>> simple_logtest:0:0
>>
>> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
>> --broker-list 'localhost:9092' --topic simple_logtest --time -1
>> simple_logtest:2:31
>> simple_logtest:4:31
>> simple_logtest:1:31
>> simple_logtest:3:31
>> simple_logtest:0:31
>>
>> So in other words, there are 5 partitions, they all have messages in them
>>
>> (in spark working directory)
>>
>> bash-3.2$ ./bin/spark-submit --master
>> spark://Codys-MacBook-Pro.local:7077 --class
>> example.SimpleKafkaLoggingDriver
>>
>> /private/var/tmp/kafka-bug-report/target/scala-2.11/kafka-example-assembly-2.0.0.jar
>> localhost:9092 simple_logtest mygroup earliest
>>
>>
>> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
>> 1479574025000 ms.0 from job set of time 1479574025000 ms
>>
>> simple_logtest 3 offsets: 0 to 31
>> simple_logtest 0 offsets: 0 to 31
>> simple_logtest 1 offsets: 0 to 31
>> simple_logtest 2 offsets: 0 to 31
>> simple_logtest 4 offsets: 0 to 31
>>
>> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
>> 1479574025000 ms.0 from job set of time 1479574025000 ms
>> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
>> 1479574025000 ms (execution: 0.005 s)
>> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
>> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
>> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 1479574030000 ms
>> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
>> 1479574030000 ms.0 from job set of time 1479574030000 ms
>>
>> simple_logtest 3 offsets: 31 to 31
>> simple_logtest 0 offsets: 31 to 31
>> simple_logtest 1 offsets: 31 to 31
>> simple_logtest 2 offsets: 31 to 31
>> simple_logtest 4 offsets: 31 to 31
>>
>> So in other words, spark is indeed seeing offsets for each partition.
>>
>>
>> The results you posted look to me like there aren't any messages going
>> into the other partitions, which looks like a misbehaving producer.
>>
>> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
>> <hster.investiga...@gmail.com> wrote:
>> > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have
>> > been
>> > struggling with this show stopper problem.
>> >
>> > When we run our drivers with auto.offset.reset=latest ingesting from a
>> > single kafka topic with 10 partitions, the driver reads correctly from
>> > all
>> > 10 partitions.
>> >
>> > However when we use auto.offset.reset=earliest, the driver will read
>> > only a
>> > single partition.
>> >
>> > When we turn on the debug logs, we sometimes see partitions being set to
>> > different offset configuration even though the consumer config correctly
>> > indicates auto.offset.reset=earliest.
>> >
>> >> 8 DEBUG Resetting offset for partition simple_test-8 to earliest
>> >> offset.
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 8 TRACE Sending ListOffsetRequest
>> >>
>> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>> >> to broker 10.102.20.12:9092 (id: 12 rack: null)
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 9 TRACE Sending ListOffsetRequest
>> >>
>> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>> >> to broker 10.102.20.13:9092 (id: 13 rack: null)
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 8 TRACE Received ListOffsetResponse
>> >>
>> >> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>> >> from broker 10.102.20.12:9092 (id: 12 rack: null)
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 9 TRACE Received ListOffsetResponse
>> >>
>> >> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>> >> from broker 10.102.20.13:9092 (id: 13 rack: null)
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition
>> >> simple_test-9
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >
>> >
>> >
>> > I've enclosed below the completely stripped down trivial test driver
>> > that
>> > shows this behavior. We normally run with YARN 2.7.3 but have also tried
>> > running spark standalone mode which has the same behavior. Our drivers
>> > are
>> > normally java but we have tried the scala version which also has the
>> > same
>> > incorrect behavior. We have tried different LocationStrategies and
>> > partition
>> > assignment strategies all without success.  Any insight would be greatly
>> > appreciated.
>> >
>> > package com.xxxxx.labs.analytics.diagnostics.spark.drivers
>> >
>> > import org.apache.kafka.common.serialization.StringDeserializer
>> > import org.apache.spark.SparkConf
>> > import org.apache.spark.streaming.{Seconds, StreamingContext}
>> > import org.apache.spark.streaming.kafka010._
>> > import org.apache.spark.streaming.kafka010.LocationStrategies
>> > import org.apache.spark.streaming.kafka010.ConsumerStrategies
>> >
>> >
>> > /**
>> >   *
>> >   * This driver is only for pulling data from the stream and logging to
>> > output just to isolate single partition bug
>> >   */
>> > object SimpleKafkaLoggingDriver {
>> >   def main(args: Array[String]) {
>> >     if (args.length != 4) {
>> >       System.err.println("Usage: SimpleTestDriver <broker bootstrap
>> > servers>
>> > <topic> <groupId> <offsetReset>")
>> >       System.exit(1)
>> >     }
>> >
>> >     val Array(brokers, topic, groupId, offsetReset) = args
>> >     val preferredHosts = LocationStrategies.PreferConsistent
>> >     val topics = List(topic)
>> >
>> >     val kafkaParams = Map(
>> >       "bootstrap.servers" -> brokers,
>> >       "key.deserializer" -> classOf[StringDeserializer],
>> >       "value.deserializer" -> classOf[StringDeserializer],
>> >       "group.id" -> groupId,
>> >       "auto.offset.reset" -> offsetReset,
>> >       "enable.auto.commit" -> (false: java.lang.Boolean)
>> >     )
>> >
>> >     val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_"
>> > +topic)
>> >     val streamingContext = new StreamingContext(sparkConf, Seconds(5))
>> >
>> >
>> >     val dstream = KafkaUtils.createDirectStream[String, String](
>> >       streamingContext,
>> >       preferredHosts,
>> >       ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
>> >
>> >     dstream.foreachRDD { rdd =>
>> >       // Get the offset ranges in the RDD and log
>> >       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>> >       for (o <- offsetRanges) {
>> >         println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to
>> > ${o.untilOffset}")
>> >       }
>> >     }
>> >
>> >     streamingContext.start
>> >     streamingContext.awaitTermination()
>> >
>> >   }
>> >
>> > }
>> >
>> >
>> >
>> >> 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values:
>> >>
>> >> auto.commit.interval.ms = 5000
>> >>
>> >> auto.offset.reset = earliest
>> >>
>> >> bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092]
>> >>
>> >> check.crcs = true
>> >>
>> >> client.id =
>> >>
>> >> connections.max.idle.ms = 540000
>> >>
>> >> enable.auto.commit = false
>> >>
>> >> exclude.internal.topics = true
>> >>
>> >> fetch.max.bytes = 52428800
>> >>
>> >> fetch.max.wait.ms = 500
>> >>
>> >> fetch.min.bytes = 1
>> >>
>> >> group.id = simple_test_group
>> >>
>> >> heartbeat.interval.ms = 3000
>> >>
>> >> interceptor.classes = null
>> >>
>> >> key.deserializer = class
>> >> org.apache.kafka.common.serialization.StringDeserializer
>> >>
>> >> max.partition.fetch.bytes = 1048576
>> >>
>> >> max.poll.interval.ms = 300000
>> >>
>> >> max.poll.records = 500
>> >>
>> >> metadata.max.age.ms = 300000
>> >>
>> >> metric.reporters = []
>> >>
>> >> metrics.num.samples = 2
>> >>
>> >> metrics.sample.window.ms = 30000
>> >>
>> >> partition.assignment.strategy = [class
>> >> org.apache.kafka.clients.consumer.RangeAssignor]
>> >>
>> >> receive.buffer.bytes = 65536
>> >>
>> >> reconnect.backoff.ms = 50
>> >>
>> >> request.timeout.ms = 305000
>> >>
>> >> retry.backoff.ms = 100
>> >>
>> >> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> >>
>> >> sasl.kerberos.min.time.before.relogin = 60000
>> >>
>> >> sasl.kerberos.service.name = null
>> >>
>> >> sasl.kerberos.ticket.renew.jitter = 0.05
>> >>
>> >> sasl.kerberos.ticket.renew.window.factor = 0.8
>> >>
>> >> sasl.mechanism = GSSAPI
>> >>
>> >> security.protocol = PLAINTEXT
>> >>
>> >> send.buffer.bytes = 131072
>> >>
>> >> session.timeout.ms = 10000
>> >>
>> >> ssl.cipher.suites = null
>> >>
>> >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> >>
>> >> ssl.endpoint.identification.algorithm = null
>> >>
>> >> ssl.key.password = null
>> >>
>> >> ssl.keymanager.algorithm = SunX509
>> >>
>> >> ssl.keystore.location = null
>> >>
>> >> ssl.keystore.password = null
>> >>
>> >> ssl.keystore.type = JKS
>> >>
>> >> ssl.protocol = TLS
>> >>
>> >> ssl.provider = null
>> >>
>> >> ssl.secure.random.implementation = null
>> >>
>> >> ssl.trustmanager.algorithm = PKIX
>> >>
>> >> ssl.truststore.location = null
>> >>
>> >> ssl.truststore.password = null
>> >>
>> >> ssl.truststore.type = JKS
>> >>
>> >> value.deserializer = class
>> >> org.apache.kafka.common.serialization.StringDeserializer
>> >
>> >
>> >
>> > Below is the output of above driver for 5 partition topic.  Offsets
>> > always
>> > remain 0 for all but a single partition in this case 3
>> >
>> > simple_logtest 3 offsets: 1623531 to 1623531
>> > simple_logtest 0 offsets: 0 to 0
>> > simple_logtest 1 offsets: 0 to 0
>> > simple_logtest 2 offsets: 0 to 0
>> > simple_logtest 4 offsets: 0 to 0
>> > simple_logtest 3 offsets: 1623531 to 1623531
>> > simple_logtest 0 offsets: 0 to 0
>> > simple_logtest 1 offsets: 0 to 0
>> > simple_logtest 2 offsets: 0 to 0
>> > simple_logtest 4 offsets: 0 to 0
>> >
>> > simple_logtest 3 offsets: 1623531 to 1623531
>> >
>> >
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to