miaoyichen created STORM-1557:
---------------------------------
Summary: trident get repeat data from kafka
Key: STORM-1557
URL: https://issues.apache.org/jira/browse/STORM-1557
Project: Apache Storm
Issue Type: Bug
Components: storm-core
Reporter: miaoyichen
Assignee: miaoyichen
hello, i'm Felix.
When we used Trident API and set Config.TOPOLOGY_MAX_SPOUT_PENDING greater than
1, assuming 100, we found the txid was incontinuous.
That phenomenon has two effects:
* When using OpaqueTridentKafkaSpout class, we found that the different txid
got same offset which lead that we got repeated data value from kafka. we
printed some logs here:
11:10:13.516 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:96,offset:1805,nextOffset:1824]
11:10:13.567 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:142,offset:1824,nextOffset:1843]
11:10:13.619 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:97,offset:1824,nextOffset:1843]
11:10:13.670 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:98,offset:1843,nextOffset:1862]
please NOTICE that id 142 and 97 got same kafka offset.
* When using TransactionalTridentKafkaSpout class, we got different txid and
continuance offset value but the process speed would be slow, because some
unnecessary loops in function MasterBatchCoordinator.sync(). And the greater
Config.TOPOLOGY_MAX_SPOUT_PENDING, the slower speed. we printed some logs here:
Config.TOPOLOGY_MAX_SPOUT_PENDING=100
Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50
14:05:00.337 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:135,offset:2546,nextOffset:2565]
14:05:00.483 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:136,offset:2565,nextOffset:2584]
14:05:00.495 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:197,offset:2584,nextOffset:2603]
14:05:03.550 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:198,offset:2603,nextOffset:2622]
14:05:03.593 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:199,offset:2622,nextOffset:2641]
please NOTICE the timestamp from id 136 to 198 and offset.
Config.TOPOLOGY_MAX_SPOUT_PENDING=1000
Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50
11:35:36.265 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:232,offset:228,nextOffset:247]
11:35:36.305 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:233,offset:247,nextOffset:266]
11:35:36.343 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:446,offset:266,nextOffset:285]
11:35:41.345 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:1266,offset:285,nextOffset:304]
11:35:47.063 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:1330,offset:304,nextOffset:323]
11:35:56.221 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:1447,offset:323,nextOffset:342]
please notice every log's timestamp,txid and offset.
We thought the txid's distribution algorithm needs to be with the continuous
principle in MasterBatchCoordinator class. ONLY when the time windows and other
condition is ready, the txid could be added.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)