This is running locally on my mac, but it's still a standalone spark master with multiple separate executor jvms (i.e. using --master not --local[2]), so it should be the same code paths. I can't speak to yarn one way or the other, but you said you tried it with the standalone scheduler.
At the very least, you should run ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell with -1 and -2 and compare those results to what you're seeing from spark. The results you posted from spark didn't show any incoming messages at all. On Sat, Nov 19, 2016 at 11:12 AM, Hster Geguri <hster.investiga...@gmail.com> wrote: > Hi Cody, > > Thank you for testing this on a Saturday morning! I failed to mention that > when our data engineer runs our drivers(even complex ones) locally on his > Mac, the drivers work fine. However when we launch it into the cluster (4 > machines either for a YARN cluster or spark standalone) we get this issue. > > Heji > > > On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger <c...@koeninger.org> wrote: >> >> I ran your example using the versions of kafka and spark you are >> using, against a standalone cluster. This is what I observed: >> >> (in kafka working directory) >> >> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell >> --broker-list 'localhost:9092' --topic simple_logtest --time -2 >> simple_logtest:2:0 >> simple_logtest:4:0 >> simple_logtest:1:0 >> simple_logtest:3:0 >> simple_logtest:0:0 >> >> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell >> --broker-list 'localhost:9092' --topic simple_logtest --time -1 >> simple_logtest:2:31 >> simple_logtest:4:31 >> simple_logtest:1:31 >> simple_logtest:3:31 >> simple_logtest:0:31 >> >> So in other words, there are 5 partitions, they all have messages in them >> >> (in spark working directory) >> >> bash-3.2$ ./bin/spark-submit --master >> spark://Codys-MacBook-Pro.local:7077 --class >> example.SimpleKafkaLoggingDriver >> >> /private/var/tmp/kafka-bug-report/target/scala-2.11/kafka-example-assembly-2.0.0.jar >> localhost:9092 simple_logtest mygroup earliest >> >> >> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job >> 1479574025000 ms.0 from job set of time 1479574025000 ms >> >> simple_logtest 3 offsets: 0 to 31 >> simple_logtest 0 offsets: 0 to 31 >> simple_logtest 1 offsets: 0 to 31 >> simple_logtest 2 offsets: 0 to 31 >> simple_logtest 4 offsets: 0 to 31 >> >> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job >> 1479574025000 ms.0 from job set of time 1479574025000 ms >> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time >> 1479574025000 ms (execution: 0.005 s) >> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches: >> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata: >> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 1479574030000 ms >> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job >> 1479574030000 ms.0 from job set of time 1479574030000 ms >> >> simple_logtest 3 offsets: 31 to 31 >> simple_logtest 0 offsets: 31 to 31 >> simple_logtest 1 offsets: 31 to 31 >> simple_logtest 2 offsets: 31 to 31 >> simple_logtest 4 offsets: 31 to 31 >> >> So in other words, spark is indeed seeing offsets for each partition. >> >> >> The results you posted look to me like there aren't any messages going >> into the other partitions, which looks like a misbehaving producer. >> >> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri >> <hster.investiga...@gmail.com> wrote: >> > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have >> > been >> > struggling with this show stopper problem. >> > >> > When we run our drivers with auto.offset.reset=latest ingesting from a >> > single kafka topic with 10 partitions, the driver reads correctly from >> > all >> > 10 partitions. >> > >> > However when we use auto.offset.reset=earliest, the driver will read >> > only a >> > single partition. >> > >> > When we turn on the debug logs, we sometimes see partitions being set to >> > different offset configuration even though the consumer config correctly >> > indicates auto.offset.reset=earliest. >> > >> >> 8 DEBUG Resetting offset for partition simple_test-8 to earliest >> >> offset. >> >> (org.apache.kafka.clients.consumer.internals.Fetcher) >> >> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. >> >> (org.apache.kafka.clients.consumer.internals.Fetcher) >> >> 8 TRACE Sending ListOffsetRequest >> >> >> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} >> >> to broker 10.102.20.12:9092 (id: 12 rack: null) >> >> (org.apache.kafka.clients.consumer.internals.Fetcher) >> >> 9 TRACE Sending ListOffsetRequest >> >> >> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} >> >> to broker 10.102.20.13:9092 (id: 13 rack: null) >> >> (org.apache.kafka.clients.consumer.internals.Fetcher) >> >> 8 TRACE Received ListOffsetResponse >> >> >> >> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} >> >> from broker 10.102.20.12:9092 (id: 12 rack: null) >> >> (org.apache.kafka.clients.consumer.internals.Fetcher) >> >> 9 TRACE Received ListOffsetResponse >> >> >> >> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} >> >> from broker 10.102.20.13:9092 (id: 13 rack: null) >> >> (org.apache.kafka.clients.consumer.internals.Fetcher) >> >> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 >> >> (org.apache.kafka.clients.consumer.internals.Fetcher) >> >> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition >> >> simple_test-9 >> >> (org.apache.kafka.clients.consumer.internals.Fetcher) >> > >> > >> > >> > I've enclosed below the completely stripped down trivial test driver >> > that >> > shows this behavior. We normally run with YARN 2.7.3 but have also tried >> > running spark standalone mode which has the same behavior. Our drivers >> > are >> > normally java but we have tried the scala version which also has the >> > same >> > incorrect behavior. We have tried different LocationStrategies and >> > partition >> > assignment strategies all without success. Any insight would be greatly >> > appreciated. >> > >> > package com.xxxxx.labs.analytics.diagnostics.spark.drivers >> > >> > import org.apache.kafka.common.serialization.StringDeserializer >> > import org.apache.spark.SparkConf >> > import org.apache.spark.streaming.{Seconds, StreamingContext} >> > import org.apache.spark.streaming.kafka010._ >> > import org.apache.spark.streaming.kafka010.LocationStrategies >> > import org.apache.spark.streaming.kafka010.ConsumerStrategies >> > >> > >> > /** >> > * >> > * This driver is only for pulling data from the stream and logging to >> > output just to isolate single partition bug >> > */ >> > object SimpleKafkaLoggingDriver { >> > def main(args: Array[String]) { >> > if (args.length != 4) { >> > System.err.println("Usage: SimpleTestDriver <broker bootstrap >> > servers> >> > <topic> <groupId> <offsetReset>") >> > System.exit(1) >> > } >> > >> > val Array(brokers, topic, groupId, offsetReset) = args >> > val preferredHosts = LocationStrategies.PreferConsistent >> > val topics = List(topic) >> > >> > val kafkaParams = Map( >> > "bootstrap.servers" -> brokers, >> > "key.deserializer" -> classOf[StringDeserializer], >> > "value.deserializer" -> classOf[StringDeserializer], >> > "group.id" -> groupId, >> > "auto.offset.reset" -> offsetReset, >> > "enable.auto.commit" -> (false: java.lang.Boolean) >> > ) >> > >> > val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_" >> > +topic) >> > val streamingContext = new StreamingContext(sparkConf, Seconds(5)) >> > >> > >> > val dstream = KafkaUtils.createDirectStream[String, String]( >> > streamingContext, >> > preferredHosts, >> > ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) >> > >> > dstream.foreachRDD { rdd => >> > // Get the offset ranges in the RDD and log >> > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >> > for (o <- offsetRanges) { >> > println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to >> > ${o.untilOffset}") >> > } >> > } >> > >> > streamingContext.start >> > streamingContext.awaitTermination() >> > >> > } >> > >> > } >> > >> > >> > >> >> 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values: >> >> >> >> auto.commit.interval.ms = 5000 >> >> >> >> auto.offset.reset = earliest >> >> >> >> bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092] >> >> >> >> check.crcs = true >> >> >> >> client.id = >> >> >> >> connections.max.idle.ms = 540000 >> >> >> >> enable.auto.commit = false >> >> >> >> exclude.internal.topics = true >> >> >> >> fetch.max.bytes = 52428800 >> >> >> >> fetch.max.wait.ms = 500 >> >> >> >> fetch.min.bytes = 1 >> >> >> >> group.id = simple_test_group >> >> >> >> heartbeat.interval.ms = 3000 >> >> >> >> interceptor.classes = null >> >> >> >> key.deserializer = class >> >> org.apache.kafka.common.serialization.StringDeserializer >> >> >> >> max.partition.fetch.bytes = 1048576 >> >> >> >> max.poll.interval.ms = 300000 >> >> >> >> max.poll.records = 500 >> >> >> >> metadata.max.age.ms = 300000 >> >> >> >> metric.reporters = [] >> >> >> >> metrics.num.samples = 2 >> >> >> >> metrics.sample.window.ms = 30000 >> >> >> >> partition.assignment.strategy = [class >> >> org.apache.kafka.clients.consumer.RangeAssignor] >> >> >> >> receive.buffer.bytes = 65536 >> >> >> >> reconnect.backoff.ms = 50 >> >> >> >> request.timeout.ms = 305000 >> >> >> >> retry.backoff.ms = 100 >> >> >> >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> >> >> >> sasl.kerberos.min.time.before.relogin = 60000 >> >> >> >> sasl.kerberos.service.name = null >> >> >> >> sasl.kerberos.ticket.renew.jitter = 0.05 >> >> >> >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> >> >> >> sasl.mechanism = GSSAPI >> >> >> >> security.protocol = PLAINTEXT >> >> >> >> send.buffer.bytes = 131072 >> >> >> >> session.timeout.ms = 10000 >> >> >> >> ssl.cipher.suites = null >> >> >> >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> >> >> >> ssl.endpoint.identification.algorithm = null >> >> >> >> ssl.key.password = null >> >> >> >> ssl.keymanager.algorithm = SunX509 >> >> >> >> ssl.keystore.location = null >> >> >> >> ssl.keystore.password = null >> >> >> >> ssl.keystore.type = JKS >> >> >> >> ssl.protocol = TLS >> >> >> >> ssl.provider = null >> >> >> >> ssl.secure.random.implementation = null >> >> >> >> ssl.trustmanager.algorithm = PKIX >> >> >> >> ssl.truststore.location = null >> >> >> >> ssl.truststore.password = null >> >> >> >> ssl.truststore.type = JKS >> >> >> >> value.deserializer = class >> >> org.apache.kafka.common.serialization.StringDeserializer >> > >> > >> > >> > Below is the output of above driver for 5 partition topic. Offsets >> > always >> > remain 0 for all but a single partition in this case 3 >> > >> > simple_logtest 3 offsets: 1623531 to 1623531 >> > simple_logtest 0 offsets: 0 to 0 >> > simple_logtest 1 offsets: 0 to 0 >> > simple_logtest 2 offsets: 0 to 0 >> > simple_logtest 4 offsets: 0 to 0 >> > simple_logtest 3 offsets: 1623531 to 1623531 >> > simple_logtest 0 offsets: 0 to 0 >> > simple_logtest 1 offsets: 0 to 0 >> > simple_logtest 2 offsets: 0 to 0 >> > simple_logtest 4 offsets: 0 to 0 >> > >> > simple_logtest 3 offsets: 1623531 to 1623531 >> > >> > >> > >> > > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org