Heji Kim created SPARK-18506:
--------------------------------

             Summary: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest 
will only read from a single partition on a multi partition topic
                 Key: SPARK-18506
                 URL: https://issues.apache.org/jira/browse/SPARK-18506
             Project: Spark
          Issue Type: Bug
          Components: DStreams
    Affects Versions: 2.0.2
         Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark 
standalone mode 2.0.2 
with Kafka 0.10.1.0.   
            Reporter: Heji Kim


Our team is trying to upgrade to Spark 2.0.2/Kafka 
0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
drivers to read all partitions of a single stream when kafka 
auto.offset.reset=earliest.

When we run our drivers with auto.offset.reset=latest ingesting from a single 
kafka topic with multiple partitions (usually 10 but problem shows up  with 
only 3 partitions), the driver reads correctly from all partitions.  
Unfortunately, we need "earliest" for exactly once semantics.

In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
spark-streaming-kafka-0-8_2.11 with the prior setting 
auto.offset.reset=smallest runs correctly.

We have tried the following configurations in trying to isolate our problem but 
it is only auto.offset.reset=earliest which causes this problem.
1. Ran with spark standalone cluster instead of YARN 2.7.3. Single partition 
read problem persists both cases.
2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
3. Turned off checkpointing. Problem persists with or without checkpointing.
4. Turned off backpressure. Problem persists with or without backpressure.
5. Tried both partition.assignment.strategy RangeAssignor and 
RoundRobinAssignor. Broken with both.
6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
both.
7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
Broken with both.
8. Tried increasing GCE capacity for cluster but already we were highly 
overprovisioned for cores and memory. Also tried ramping up executors and 
cores.  Since driver works with auto.offset.reset=latest, we have ruled out GCP 
cloud infrastructure issues.


When we turn on the debug logs, we sometimes see partitions being set to 
different offset configuration even though the consumer config correctly 
indicates auto.offset.reset=earliest. 
{noformat}
8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
(org.apache.kafka.clients.consumer.internals.Fetcher)
9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
(org.apache.kafka.clients.consumer.internals.Fetcher)
8 TRACE Sending ListOffsetRequest 
{replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
 to broker 10.102.20.12:9092 (id: 12 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
9 TRACE Sending ListOffsetRequest 
{replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
 to broker 10.102.20.13:9092 (id: 13 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
8 TRACE Received ListOffsetResponse 
{responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
 from broker 10.102.20.12:9092 (id: 12 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
9 TRACE Received ListOffsetResponse 
{responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
 from broker 10.102.20.13:9092 (id: 13 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 
(org.apache.kafka.clients.consumer.internals.Fetcher)
9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 
(org.apache.kafka.clients.consumer.internals.Fetcher)
{noformat}

I've enclosed below the completely stripped down trivial test driver that shows 
this behavior.  Any insight would be greatly appreciated.

{code}
package com.xxxxx.labs.analytics.diagnostics.spark.drivers

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.ConsumerStrategies


/**
  *
  * This driver is only for pulling data from the stream and logging to output 
just to isolate single partition bug
  */
object SimpleKafkaLoggingDriver {
  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println("Usage: SimpleTestDriver <broker bootstrap servers> 
<topic> <groupId> <offsetReset>")
      System.exit(1)
    }

    val Array(brokers, topic, groupId, offsetReset) = args
    val preferredHosts = LocationStrategies.PreferConsistent
    val topics = List(topic)

    val kafkaParams = Map(
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> offsetReset,
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_" +topic)
    val streamingContext = new StreamingContext(sparkConf, Seconds(5))


    val dstream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      preferredHosts,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

    dstream.foreachRDD { rdd =>
      // Get the offset ranges in the RDD and log
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      for (o <- offsetRanges) {
        println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to 
${o.untilOffset}")
      }
    }

    streamingContext.start
    streamingContext.awaitTermination()

  }

}
{code}

{noformat}
16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = simple_test_group
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
{noformat}

Below is the output of above driver for 5 partition topic.  Offsets always 
remain 0 for all but a single partition in this case partition 3

{noformat}
simple_logtest 3 offsets: 1623531 to 1623531
simple_logtest 0 offsets: 0 to 0
simple_logtest 1 offsets: 0 to 0
simple_logtest 2 offsets: 0 to 0
simple_logtest 4 offsets: 0 to 0
simple_logtest 3 offsets: 1623531 to 1623531
simple_logtest 0 offsets: 0 to 0
simple_logtest 1 offsets: 0 to 0
simple_logtest 2 offsets: 0 to 0
simple_logtest 4 offsets: 0 to 0
simple_logtest 3 offsets: 1623531 to 1623531 

{noformat}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to