[ https://issues.apache.org/jira/browse/SPARK-22782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
licun updated SPARK-22782: -------------------------- Description: We use spark structured streaming to consumer kafka, but we find the consumer speed is too slow compare spark streaming . we set kafka "maxOffsetsPerTrigger": 100000000. By adding debug log in CachedKafkaConsumer.scala (kafka-0-10-sql), we found the following situation: The debug code: {code:scala} private def poll(pollTimeoutMs: Long): Unit = { val startTime = System.currentTimeMillis() val p = consumer.poll(pollTimeoutMs) val r = p.records(topicPartition) val endTime = System.currentTimeMillis() val delta = endTime - startTime logError(s"Polled $groupId ${p.partitions()}, Size:${r.size}, spend: ${delta} ms") fetchedData = r.iterator } {code} was: We use spark structured streaming to consumer kafka, but we find the consumer speed is too slow compare spark streaming . we set kafka "maxOffsetsPerTrigger": 100000000. By adding debug log in CachedKafkaConsumer.scala (kafka-0-10-sql), we found the following situation: The debug code: {color:red}private def poll(pollTimeoutMs: Long): Unit = { val startTime = System.currentTimeMillis() val p = consumer.poll(pollTimeoutMs) val r = p.records(topicPartition) val endTime = System.currentTimeMillis() val delta = endTime - startTime logError(s"Polled $groupId ${p.partitions()}, Size:${r.size}, spend: ${delta} ms") fetchedData = r.iterator }{color} > Boost speed, use kafka010 consumer kafka > ---------------------------------------- > > Key: SPARK-22782 > URL: https://issues.apache.org/jira/browse/SPARK-22782 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 2.1.0, 2.2.0 > Environment: kafka-version: 0.10 > spark-version: 2.2.0 > schedule spark on yarn > Reporter: licun > Priority: Critical > > We use spark structured streaming to consumer kafka, but we find the > consumer speed is too slow compare spark streaming . we set kafka > "maxOffsetsPerTrigger": 100000000. > By adding debug log in CachedKafkaConsumer.scala (kafka-0-10-sql), we > found the following situation: > > The debug code: > > {code:scala} > private def poll(pollTimeoutMs: Long): Unit = { > val startTime = System.currentTimeMillis() > val p = consumer.poll(pollTimeoutMs) > val r = p.records(topicPartition) > val endTime = System.currentTimeMillis() > val delta = endTime - startTime > logError(s"Polled $groupId ${p.partitions()}, Size:${r.size}, spend: > ${delta} ms") > fetchedData = r.iterator > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org