miaoyichen created STORM-1557:
---------------------------------

             Summary: trident get repeat data from kafka 
                 Key: STORM-1557
                 URL: https://issues.apache.org/jira/browse/STORM-1557
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-core
            Reporter: miaoyichen
            Assignee: miaoyichen


hello, i'm Felix.

When we used Trident API and set Config.TOPOLOGY_MAX_SPOUT_PENDING greater than 
1, assuming 100, we found the txid was incontinuous.

That phenomenon has two effects:

* When using OpaqueTridentKafkaSpout class, we found that the different txid 
got same offset which lead that we got repeated data value from kafka. we 
printed some logs here:

11:10:13.516 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:96,offset:1805,nextOffset:1824]
11:10:13.567 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:142,offset:1824,nextOffset:1843]
11:10:13.619 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:97,offset:1824,nextOffset:1843]
11:10:13.670 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:98,offset:1843,nextOffset:1862]
please NOTICE that id 142 and 97 got same kafka offset.

* When using TransactionalTridentKafkaSpout class, we got different txid and 
continuance offset value but the process speed would be slow, because some 
unnecessary loops in function MasterBatchCoordinator.sync(). And the greater 
Config.TOPOLOGY_MAX_SPOUT_PENDING, the slower speed. we printed some logs here:

Config.TOPOLOGY_MAX_SPOUT_PENDING=100
Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50

14:05:00.337 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:135,offset:2546,nextOffset:2565]
14:05:00.483 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:136,offset:2565,nextOffset:2584]
14:05:00.495 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:197,offset:2584,nextOffset:2603]
14:05:03.550 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:198,offset:2603,nextOffset:2622]
14:05:03.593 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:199,offset:2622,nextOffset:2641]
please NOTICE the timestamp from id 136 to 198 and offset.

Config.TOPOLOGY_MAX_SPOUT_PENDING=1000
Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50
11:35:36.265 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:232,offset:228,nextOffset:247]
11:35:36.305 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:233,offset:247,nextOffset:266]
11:35:36.343 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:446,offset:266,nextOffset:285]
11:35:41.345 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:1266,offset:285,nextOffset:304]
11:35:47.063 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:1330,offset:304,nextOffset:323]
11:35:56.221 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:1447,offset:323,nextOffset:342]
please notice every log's timestamp,txid and offset.


We thought the txid's distribution algorithm needs to be with the continuous 
principle in MasterBatchCoordinator class. ONLY when the time windows and other 
condition is ready, the txid could be added.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to