[
https://issues.apache.org/jira/browse/SPARK-20780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16746990#comment-16746990
]
Chang Quanyou commented on SPARK-20780:
---------------------------------------
I used the same config maven;It has another problem;
Example:
1:consumer desc:
PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
0 4745061 5643514 898453
1 4745062 5643515 898453
2 4745060 5643512 898452
3 4939461 5643514 704053
2:the current batch always consume the partition-3; other partitions not
consumed;
> Spark Kafka10 Consumer Hangs
> ----------------------------
>
> Key: SPARK-20780
> URL: https://issues.apache.org/jira/browse/SPARK-20780
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.1.0
> Environment: Spark 2.1.0
> Spark Streaming Kafka 010
> Yarn - Cluster Mode
> CDH 5.8.4
> CentOS Linux release 7.2
> Reporter: jayadeepj
> Priority: Major
> Attachments: streaming_1.png, streaming_2.png, tasks_timing_out_3.png
>
>
> We have recently upgraded our Streaming App with Direct Stream to Spark 2
> (spark-streaming-kafka-0-10 - 2.1.0) with Kafka version (0.10.0.0) & Consumer
> 10 . We find abnormal delays after the application has run for a couple of
> hours or completed consumption of approx. ~ 5 million records.
> See screenshot 1 & 2
> There is a sudden dip in the processing time from ~15 seconds (usual for this
> app) to ~3 minutes & from then on the processing time keeps degrading
> throughout.
> We have seen that the delay is due to certain tasks taking the exact time
> duration of the configured Kafka Consumer 'request.timeout.ms' . We have
> tested this by varying timeout property to different values.
> See screenshot 3.
> I think the get(offset: Long, timeout: Long): ConsumerRecord[K, V] method &
> subsequent poll(timeout) method in CachedKafkaConsumer.scala is actually
> timing out on some of the partitions without reading data. But the executor
> logs it as successfully completed after the exact timeout duration. Note that
> most other tasks are completing successfully with millisecond duration. The
> timeout is most likely from the
> org.apache.kafka.clients.consumer.KafkaConsumer & we did not observe any
> network latency difference.
> We have observed this across multiple clusters & multiple apps with & without
> TLS/SSL. Spark 1.6 with 0-8 consumer seems to be fine with consistent
> performance
> 17/05/17 10:30:06 INFO executor.CoarseGrainedExecutorBackend: Got assigned
> task 446288
> 17/05/17 10:30:06 INFO executor.Executor: Running task 11.0 in stage 5663.0
> (TID 446288)
> 17/05/17 10:30:06 INFO kafka010.KafkaRDD: Computing topic XX-XXX-XX,
> partition 0 offsets 776843 -> 779591
> 17/05/17 10:30:06 INFO kafka010.CachedKafkaConsumer: Initial fetch for
> spark-executor-default1 XX-XXX-XX 0 776843
> 17/05/17 10:30:56 INFO executor.Executor: Finished task 11.0 in stage 5663.0
> (TID 446288). 1699 bytes result sent to driver
> 17/05/17 10:30:56 INFO executor.CoarseGrainedExecutorBackend: Got assigned
> task 446329
> 17/05/17 10:30:56 INFO executor.Executor: Running task 0.0 in stage 5667.0
> (TID 446329)
> 17/05/17 10:30:56 INFO spark.MapOutputTrackerWorker: Updating epoch to 3116
> and clearing cache
> 17/05/17 10:30:56 INFO broadcast.TorrentBroadcast: Started reading broadcast
> variable 6807
> 17/05/17 10:30:56 INFO memory.MemoryStore: Block broadcast_6807_piece0 stored
> as bytes in memory (estimated size 13.1 KB, free 4.1 GB)
> 17/05/17 10:30:56 INFO broadcast.TorrentBroadcast: Reading broadcast variable
> 6807 took 4 ms
> 17/05/17 10:30:56 INFO memory.MemoryStore: Block broadcast_6807 stored as
> values in m
> We can see that the log statement differ with the exact timeout duration.
> Our consumer config is below.
> 17/05/17 12:33:13 INFO dstream.ForEachDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ForEachDStream@1171dde4
> 17/05/17 12:33:13 INFO consumer.ConsumerConfig: ConsumerConfig values:
> metric.reporters = []
> metadata.max.age.ms = 300000
> partition.assignment.strategy =
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [xxxxx.xxx.xxx:9092]
> ssl.keystore.type = JKS
> enable.auto.commit = true
> sasl.mechanism = GSSAPI
> interceptor.classes = null
> exclude.internal.topics = true
> ssl.truststore.password = null
> client.id =
> ssl.endpoint.identification.algorithm = null
> max.poll.records = 2147483647
> check.crcs = true
> request.timeout.ms = 50000
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 65536
> ssl.truststore.type = JKS
> ssl.truststore.location = null
> ssl.keystore.password = null
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = default1
> retry.backoff.ms = 100
> ssl.secure.random.implementation = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> session.timeout.ms = 30000
> metrics.num.samples = 2
> key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> auto.offset.reset = latest
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]