GitHub user mycFelix opened a pull request:
https://github.com/apache/storm/pull/1041
make the txid continuous and bug fixed
hello, i'm Felix.
When we used Trident API and set Config.TOPOLOGY_MAX_SPOUT_PENDING greater
than 1, assuming 100, we found the txid was incontinuous.
That phenomenon has two effects:
* When using TransactionalTridentKafkaSpout class, we got different txid
and continuance offset value but the process speed would be slow, because some
unnecessary loops in function MasterBatchCoordinator.sync(). And the greater
Config.TOPOLOGY_MAX_SPOUT_PENDING, the slower speed.
we printed some logs here:
````
Config.TOPOLOGY_MAX_SPOUT_PENDING=100
Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50
14:05:00.337 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:135,offset:2546,nextOffset:2565]
14:05:00.483 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:136,offset:2565,nextOffset:2584]
14:05:00.495 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:197,offset:2584,nextOffset:2603]
14:05:03.550 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:198,offset:2603,nextOffset:2622]
14:05:03.593 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:199,offset:2622,nextOffset:2641]
please NOTICE the timestamp from id 136 to 198 and offset.
Config.TOPOLOGY_MAX_SPOUT_PENDING=1000
Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50
11:35:36.265 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:232,offset:228,nextOffset:247]
11:35:36.305 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:233,offset:247,nextOffset:266]
11:35:36.343 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:446,offset:266,nextOffset:285]
11:35:41.345 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:1266,offset:285,nextOffset:304]
11:35:47.063 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:1330,offset:304,nextOffset:323]
11:35:56.221 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:1447,offset:323,nextOffset:342]
please notice every log's timestamp,txid and offset.
````
* When using OpaqueTridentKafkaSpout class, we found that the different
txid got same offset value which lead that we got repeated value from kafka.
we printed some logs here:
````
11:10:13.516 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:96,offset:1805,nextOffset:1824]
11:10:13.567 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:142,offset:1824,nextOffset:1843]
11:10:13.619 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:97,offset:1824,nextOffset:1843]
11:10:13.670 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:98,offset:1843,nextOffset:1862]
please NOTICE that id 142 and 97 got same kafka offset.
````
We thought the txid's distribution algorithm needs to be with the
continuous principle in MasterBatchCoordinator class. ONLY when the time
windows and other condition is ready, the txid could be added.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/mycFelix/storm 0.9.x-branch
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/storm/pull/1041.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1041
----
commit 3abcd55a9b8105c7c5eccfd882fbca35fa16ce21
Author: mycFelix <[email protected]>
Date: 2016-01-25T11:23:53Z
make the txid continuous
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---