[ 
https://issues.apache.org/jira/browse/STORM-2994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399611#comment-16399611
 ] 

RAbreu commented on STORM-2994:
-------------------------------

Sorry, I had switched back to stom-kafka-client to 1.1.0 for a test and forgot 
to undo.

storm-kafka-clients 1.1.2 logs:

 

(Note: the message was expected not be emitted by the Spout, hence the "Not 
emitting null tuple")
{code:java}
2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Skipping fetch for 
partition sample.topic-0 because the
re is an in-flight request to worker:9092 (id: 1 rack: null)
2018-03-14T23:06:50.777Z o.a.k.c.NetworkClient [TRACE] - Completed receive from 
node 1, for key 1, received {throttle_time_ms=0,resp
onses=[{topic=sample.topic,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=38785}
,record_set=[(offset=38781,record=Record(magic = 0, attributes = 0, compression 
= NONE, crc = 1115540089, key = 3 bytes, value = 285 by
tes)), (offset=38782,record=Record(magic = 0, attributes = 0, compression = 
NONE, crc = 1115540089, key = 3 bytes, value = 285 bytes)),
(offset=38783,record=Record(magic = 0, attributes = 0, compression = NONE, crc 
= 1115540089, key = 3 bytes, value = 285 bytes)), (offs
et=38784,record=Record(magic = 0, attributes = 0, compression = NONE, crc = 
1115540089, key = 3 bytes, value = 285 bytes))]}]}]}
2018-03-14T23:06:50.777Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - 
Topic partitions with entries ready to be retried [{}
] 
2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Adding fetched record 
for partition sample.topic-0 with
offset 38781 to buffered record list
2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Received 4 records in 
fetch response for partition sample.topic-0 with offset 38781
2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Returning fetched 
records at offset 38781 for assigned partition sample.topic-0 and update 
position to 38785
2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [DEBUG] - Ignoring fetched records 
for sample.topic-0 at offset 3
8781 since the current position is 38785
2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Added fetch request for 
partition sample.topic-0 at off
set 38785 to node worker:9092 (id: 1 rack: null)
2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [DEBUG] - Sending fetch for 
partitions [sample.topic-0] to broker
worker:9092 (id: 1 rack: null)
2018-03-14T23:06:50.777Z o.a.k.c.NetworkClient [TRACE] - Sending 
{replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,top
ics=[{topic=sample.topic,partitions=[{partition=0,fetch_offset=38785,max_bytes=1048576}]}]}
 to node 1.
2018-03-14T23:06:50.777Z o.a.s.k.s.KafkaSpout [DEBUG] - Polled [4] records from 
Kafka
2018-03-14T23:06:50.778Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - 
Topic partitions with entries ready to be retried [{}
] 

2018-03-14T23:06:50.779Z o.a.s.k.s.KafkaSpout [DEBUG] - Not emitting null tuple 
for record [ConsumerRecord(topic = sample.topic, partition = 0, offset = 38781, 
NoTimestampType = -1, checksum = 1115540089, serialized key size = 3, 
serialized value size = 285, key = 123, value = {"id":"100"})] as defined in 
configuration.
2018-03-14T23:06:50.779Z o.a.s.k.s.KafkaSpout [DEBUG] - Received direct ack for 
message [{topic-partition=sample.topic-0, offset=38781, numFails=0, 
emitted=false}], associated with null tuple
2018-03-14T23:06:50.779Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - 
Topic partitions with entries ready to be retried [{}] 
2018-03-14T23:06:50.779Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - 
Topic partitions with entries ready to be retried [{}] 
2018-03-14T23:06:50.780Z o.a.s.k.s.KafkaSpout [DEBUG] - Not emitting null tuple 
for record [ConsumerRecord(topic = sample.topic, partition = 0, offset = 38782, 
NoTimestampType = -1, checksum = 1115540089, serialized key size = 3, 
serialized value size = 285, key = 123, value = {"id":"100"})] as defined in 
configuration.
2018-03-14T23:06:50.780Z o.a.s.k.s.KafkaSpout [DEBUG] - Received direct ack for 
message [{topic-partition=sample.topic-0, offset=38782, numFails=0, 
emitted=false}], associated with null tuple


{code}
 

 

> KafkaSpout consumes messages but doesn't commit offsets
> -------------------------------------------------------
>
>                 Key: STORM-2994
>                 URL: https://issues.apache.org/jira/browse/STORM-2994
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.0, 1.1.2
>            Reporter: RAbreu
>            Priority: Major
>
> A topology that consumes from two different Kafka clusters: 0.10.1.1 and 
> 0.10.2.1.
> Spouts consuming from 0.10.2.1 have a low lag (and regularly commit offsets) 
> The Spout that consumes from 0.10.1.1 exhibits either:
> 1- Unknown lag
> 2- Lag that increments as the Spout reads messages from Kafka
>  
> In DEBUG, Offset manager logs: "topic-partition has NO offsets ready to be 
> committed", despite continuing to consume messages.
> Several configuration tweaks were tried, including setting maxRetries to 1, 
> in case messages with a lower offset were being retried (logs didn't show it, 
> though)
> offsetCommitPeriodMs was also  lowered to no avail.
> The only configuration that works is to have 
> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG=true, but this is undesired   since 
> we lose processing guarantees.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to