[ 
https://issues.apache.org/jira/browse/STORM-2666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125129#comment-16125129
 ] 

Guang Du commented on STORM-2666:
---------------------------------

I grepped more logs below. To me seems Spout is resending after commit.

{code:java}
2017-07-27 23:55:41.100 o.a.s.k.s.i.OffsetManager Thread-59-spout-executor[128 
128] [INFO] Instantiated 
OffsetManager{topic-partition=kafka_bd_trigger_action-1, fetchOffset=17387, 
committedOffset=17386, emittedOffsets=[], ackedMsgs=[]}
2017-07-27 23:55:41.107 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] No offsets to commit. KafkaSpout{offsetManagers 
={kafka_bd_trigger_action-5=OffsetManager{topic-partition=kafka_bd_trigger_action-5,
 fetchOffset=17518, committedOffset=17517, emittedOffsets=[], ackedMsgs=[]}, 
kafka_bd_trigger_action-4=OffsetManager{topic-partition=kafka_bd_trigger_action-4,
 fetchOffset=15235, committedOffset=15234, emittedOffsets=[], ackedMsgs=[]}, 
kafka_bd_trigger_action-3=OffsetManager{topic-partition=kafka_bd_trigger_action-3,
 fetchOffset=16656, committedOffset=16655, emittedOffsets=[], ackedMsgs=[]}, 
kafka_bd_trigger_action-2=OffsetManager{topic-partition=kafka_bd_trigger_action-2,
 fetchOffset=17074, committedOffset=17073, emittedOffsets=[], ackedMsgs=[]}, 
kafka_bd_trigger_action-1=OffsetManager{topic-partition=kafka_bd_trigger_action-1,
 fetchOffset=17387, committedOffset=17386, emittedOffsets=[], ackedMsgs=[]}, 
kafka_bd_trigger_action-0=OffsetManager{topic-partition=kafka_bd_trigger_action-0,
 fetchOffset=14494, committedOffset=14493, emittedOffsets=[], ackedMsgs=[]}}, 
emitted=[]}
2017-07-27 23:55:42.078 c.q.d.s.h.HBaseDataLookupBolt 
Thread-53-pdl_lookup_hbase_bolt-executor[60 60] [INFO] Topic: 
kafka_bd_trigger_action, Partition: 1, Offset: 17388, Key: null, Value: 
{"appId":"A20170727200514663","policyId":"PDL_BLACKLIST2","transactionId":"150116558246620452","timestamp":"20170727222705953","dataKey":"['nuanxindai',
 'BR_antifraudVerify|PULL', 'ZM_creditScore|PULL']"}
2017-07-27 23:55:42.132 c.q.d.s.h.HBaseDataLookupBolt 
Thread-23-pdl_lookup_hbase_bolt-executor[72 72] [INFO] Topic: 
kafka_bd_trigger_action, Partition: 1, Offset: 17387, Key: null, Value: 
{"appId":"A20170727200514803","policyId":"PDL_TP","transactionId":"150116300744862685","timestamp":"20170727222427452","dataKey":"['nuanxindai',
 'HULU_tsp|DETAIL_PULL', 'HULU_tsp|SUMMARY_PUSH', 'ZM_creditScore|PULL', 
'QF_history|PULL', 'TD_creditReport|PULL']"}
2017-07-27 23:55:42.182 c.q.d.s.h.HBaseDataLookupBolt 
Thread-23-pdl_lookup_hbase_bolt-executor[72 72] [INFO] Topic: 
kafka_bd_trigger_action, Partition: 1, Offset: 17390, Key: null, Value: 
{"appId":"A20170727200514953","policyId":"PDL_TP","transactionId":"150116342612677949","timestamp":"20170727222759470","dataKey":"['nuanxindai',
 'HULU_tsp|DETAIL_PULL', 'HULU_tsp|SUMMARY_PUSH', 'ZM_creditScore|PULL', 
'QF_history|PULL', 'TD_creditReport|PULL']"}
2017-07-27 23:55:42.233 c.q.d.s.h.HBaseDataLookupBolt 
Thread-9-pdl_lookup_hbase_bolt-executor[68 68] [INFO] Topic: 
kafka_bd_trigger_action, Partition: 1, Offset: 17389, Key: null, Value: 
{"appId":"A20170727200514823","policyId":"PDL_TP","transactionId":"150116328342980579","timestamp":"20170727222715948","dataKey":"['nuanxindai',
 'HULU_tsp|DETAIL_PULL', 'HULU_tsp|SUMMARY_PUSH', 'ZM_creditScore|PULL', 
'QF_history|PULL', 'TD_creditReport|PULL']"}
2017-07-27 23:57:41.105 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Received ack for tuple message 
[{topic-partition=kafka_bd_trigger_action-1, offset=17388, numFails=0}].
2017-07-27 23:57:41.105 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Successful ack for tuple message 
[{topic-partition=kafka_bd_trigger_action-1, offset=17388, numFails=0}].
2017-07-27 23:57:41.105 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Received ack for tuple message 
[{topic-partition=kafka_bd_trigger_action-1, offset=17387, numFails=0}].
2017-07-27 23:57:41.105 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Successful ack for tuple message 
[{topic-partition=kafka_bd_trigger_action-1, offset=17387, numFails=0}].
2017-07-27 23:57:41.106 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Received ack for tuple message 
[{topic-partition=kafka_bd_trigger_action-1, offset=17389, numFails=0}].
2017-07-27 23:57:41.106 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Successful ack for tuple message 
[{topic-partition=kafka_bd_trigger_action-1, offset=17389, numFails=0}].
2017-07-27 23:57:41.107 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Received ack for tuple message 
[{topic-partition=kafka_bd_trigger_action-1, offset=17390, numFails=0}].
2017-07-27 23:57:41.107 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Successful ack for tuple message 
[{topic-partition=kafka_bd_trigger_action-1, offset=17390, numFails=0}].
2017-07-27 23:57:41.112 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Offsets successfully committed to Kafka 
[{kafka_bd_trigger_action-3=OffsetAndMetadata{offset=16656, 
metadata='{topic-partition=kafka_bd_trigger_action-3, offset=16656, numFails=0, 
hostname=IDC-HADOOPSH-01, thread='Thread-59-spout-executor[128 128]', 
timestamp=20170727235741}'}, 
kafka_bd_trigger_action-1=OffsetAndMetadata{offset=17390, 
metadata='{topic-partition=kafka_bd_trigger_action-1, offset=17390, numFails=0, 
hostname=IDC-HADOOPSH-01, thread='Thread-59-spout-executor[128 128]', 
timestamp=20170727235741}'}}]
2017-07-27 23:57:41.113 o.a.s.k.s.i.OffsetManager Thread-59-spout-executor[128 
128] [INFO] committing to offset 17390
2017-07-27 23:57:41.113 o.a.s.k.s.i.OffsetManager Thread-59-spout-executor[128 
128] [INFO] Committed offsets [17387-17390 = 4] for topic-partition 
[kafka_bd_trigger_action-1].
2017-07-27 23:57:41.125 c.q.d.s.h.HBaseDataLookupBolt 
Thread-25-pdl_lookup_hbase_bolt-executor[56 56] [INFO] Topic: 
kafka_bd_trigger_action, Partition: 1, Offset: 17387, Key: null, Value: 
{"appId":"A20170727200514803","policyId":"PDL_TP","transactionId":"150116300744862685","timestamp":"20170727222427452","dataKey":"['nuanxindai',
 'HULU_tsp|DETAIL_PULL', 'HULU_tsp|SUMMARY_PUSH', 'ZM_creditScore|PULL', 
'QF_history|PULL', 'TD_creditReport|PULL']"}
2017-07-27 23:57:41.125 c.q.d.s.h.HBaseDataLookupBolt 
Thread-23-pdl_lookup_hbase_bolt-executor[72 72] [INFO] Topic: 
kafka_bd_trigger_action, Partition: 1, Offset: 17388, Key: null, Value: 
{"appId":"A20170727200514663","policyId":"PDL_BLACKLIST2","transactionId":"150116558246620452","timestamp":"20170727222705953","dataKey":"['nuanxindai',
 'BR_antifraudVerify|PULL', 'ZM_creditScore|PULL']"}
2017-07-27 23:57:41.195 c.q.d.s.h.HBaseDataLookupBolt 
Thread-53-pdl_lookup_hbase_bolt-executor[60 60] [INFO] Topic: 
kafka_bd_trigger_action, Partition: 1, Offset: 17390, Key: null, Value: 
{"appId":"A20170727200514953","policyId":"PDL_TP","transactionId":"150116342612677949","timestamp":"20170727222759470","dataKey":"['nuanxindai',
 'HULU_tsp|DETAIL_PULL', 'HULU_tsp|SUMMARY_PUSH', 'ZM_creditScore|PULL', 
'QF_history|PULL', 'TD_creditReport|PULL']"}
2017-07-27 23:57:41.219 c.q.d.s.h.HBaseDataLookupBolt 
Thread-9-pdl_lookup_hbase_bolt-executor[68 68] [INFO] Topic: 
kafka_bd_trigger_action, Partition: 1, Offset: 17389, Key: null, Value: 
{"appId":"A20170727200514823","policyId":"PDL_TP","transactionId":"150116328342980579","timestamp":"20170727222715948","dataKey":"['nuanxindai',
 'HULU_tsp|DETAIL_PULL', 'HULU_tsp|SUMMARY_PUSH', 'ZM_creditScore|PULL', 
'QF_history|PULL', 'TD_creditReport|PULL']"}
2017-07-27 23:57:41.328 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Received ack for tuple message 
[{topic-partition=kafka_bd_trigger_action-1, offset=17388, numFails=0}].
2017-07-27 23:57:41.328 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Successful ack for tuple message 
[{topic-partition=kafka_bd_trigger_action-1, offset=17388, numFails=0}].
2017-07-27 23:57:42.135 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Received ack for tuple message 
[{topic-partition=kafka_bd_trigger_action-1, offset=17387, numFails=0}].
2017-07-27 23:57:42.135 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Successful ack for tuple message 
[{topic-partition=kafka_bd_trigger_action-1, offset=17387, numFails=0}].
2017-07-27 23:57:44.150 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Received ack for tuple message 
[{topic-partition=kafka_bd_trigger_action-1, offset=17390, numFails=0}].
2017-07-27 23:57:44.150 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Successful ack for tuple message 
[{topic-partition=kafka_bd_trigger_action-1, offset=17390, numFails=0}].
2017-07-27 23:57:50.200 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Received ack for tuple message 
[{topic-partition=kafka_bd_trigger_action-1, offset=17389, numFails=0}].
2017-07-27 23:57:50.200 o.a.s.k.s.KafkaSpout Thread-59-spout-executor[128 128] 
[INFO] Successful ack for tuple message 
[{topic-partition=kafka_bd_trigger_action-1, offset=17389, numFails=0}].
2017-07-27 23:57:53.103 o.a.s.k.s.i.OffsetManager Thread-59-spout-executor[128 
128] [WARN] topic-partition [kafka_bd_trigger_action-1] has unexpected offset 
[17387]. Current committed Offset [17390]
2017-07-27 23:57:53.103 o.a.s.k.s.i.OffsetManager Thread-59-spout-executor[128 
128] [WARN] topic-partition [kafka_bd_trigger_action-1] has unexpected offset 
[17388]. Current committed Offset [17390]
2017-07-27 23:57:53.103 o.a.s.k.s.i.OffsetManager Thread-59-spout-executor[128 
128] [WARN] topic-partition [kafka_bd_trigger_action-1] has unexpected offset 
[17389]. Current committed Offset [17390]
2017-07-27 23:57:53.103 o.a.s.k.s.i.OffsetManager Thread-59-spout-executor[128 
128] [WARN] topic-partition [kafka_bd_trigger_action-1] has unexpected offset 
[17390]. Current committed Offset [17390]

{code}


> Kafka Client Spout send & ack committed offsets
> -----------------------------------------------
>
>                 Key: STORM-2666
>                 URL: https://issues.apache.org/jira/browse/STORM-2666
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.1
>            Reporter: Guang Du
>
> Under a certain heavy load, for failed/timeout tuples, the retry service will 
> ack tuple for failed max times. Kafka Client Spout will commit after reached 
> the commit interval. However seems some 'on the way' tuples will be failed 
> again, the retry service will cause Spout to emit again, and acked eventually 
> to OffsetManager.
> In some cases such offsets are too many, exceeding the max-uncommit, causing 
> org.apache.storm.kafka.spout.internal.OffsetManager#findNextCommitOffset 
> unable to find next commit point, and Spout for this partition will not poll 
> any more.
> By the way I've applied STORM-2549 PR#2156 from Stig Døssing to fix 
> STORM-2625, and I'm using Python Shell Bolt as processing bolt, if this 
> information helps.
> resulting logs like below. I'm not sure if the issue has already been 
> raised/fixed, glad if anyone could help to point out existing JIRA. Thank you.
> 2017-07-27 22:23:48.398 o.a.s.k.s.KafkaSpout Thread-23-spout-executor[248 
> 248] [INFO] Successful ack for tuple message 
> [{topic-partition=kafka_bd_trigger_action-20, offset=18204, numFails=0}].
> 2017-07-27 22:23:49.203 o.a.s.k.s.i.OffsetManager 
> Thread-23-spout-executor[248 248] [WARN] topic-partition 
> [kafka_bd_trigger_action-18] has unexpected offset [16002]. Current committed 
> Offset [16003]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to