[
https://issues.apache.org/jira/browse/STORM-1557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
miaoyichen updated STORM-1557:
------------------------------
Description:
hello, i'm Felix.
When we used Trident API and set Config.TOPOLOGY_MAX_SPOUT_PENDING greater than
1, assuming 100, we found the txid was incontinuous.
That phenomenon has two effects:
* When using {color:red} OpaqueTridentKafkaSpout class {color}, we found that
the {color:red} different txid got same offset which lead that we got repeated
data value from kafka {color}. we printed some logs here:
{noformat}
11:10:13.516 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:96,offset:1805,nextOffset:1824]
11:10:13.567 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:142,offset:1824,nextOffset:1843]
11:10:13.619 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:97,offset:1824,nextOffset:1843]
11:10:13.670 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:98,offset:1843,nextOffset:1862]
please NOTICE that id 142 and 97 got same kafka offset.
{noformat}
* When using {color:red} TransactionalTridentKafkaSpout class {color}, we got
different txid and continuance offset value but the process {color:red} speed
would be slow {color}, because some unnecessary loops in function
MasterBatchCoordinator.sync(). And the greater
Config.TOPOLOGY_MAX_SPOUT_PENDING, the slower speed. we printed some logs here:
{noformat}
Config.TOPOLOGY_MAX_SPOUT_PENDING=100
Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50
14:05:00.337 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:135,offset:2546,nextOffset:2565]
14:05:00.483 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:136,offset:2565,nextOffset:2584]
14:05:00.495 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:197,offset:2584,nextOffset:2603]
14:05:03.550 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:198,offset:2603,nextOffset:2622]
14:05:03.593 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:199,offset:2622,nextOffset:2641]
please NOTICE the timestamp from id 136 to 198 and offset.
{noformat}
{noformat}
Config.TOPOLOGY_MAX_SPOUT_PENDING=1000
Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50
11:35:36.265 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:232,offset:228,nextOffset:247]
11:35:36.305 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:233,offset:247,nextOffset:266]
11:35:36.343 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:446,offset:266,nextOffset:285]
11:35:41.345 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:1266,offset:285,nextOffset:304]
11:35:47.063 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:1330,offset:304,nextOffset:323]
11:35:56.221 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:1447,offset:323,nextOffset:342]
please notice every log's timestamp,txid and offset.
{noformat}
We thought the txid's distribution algorithm needs to be with the continuous
principle in MasterBatchCoordinator class. ONLY when the time windows and other
condition is ready, the txid could be added.
was:
hello, i'm Felix.
When we used Trident API and set Config.TOPOLOGY_MAX_SPOUT_PENDING greater than
1, assuming 100, we found the txid was incontinuous.
That phenomenon has two effects:
* When using OpaqueTridentKafkaSpout class, we found that the different txid
got same offset which lead that we got repeated data value from kafka. we
printed some logs here:
{noformat}
11:10:13.516 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:96,offset:1805,nextOffset:1824]
11:10:13.567 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:142,offset:1824,nextOffset:1843]
11:10:13.619 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:97,offset:1824,nextOffset:1843]
11:10:13.670 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:98,offset:1843,nextOffset:1862]
please NOTICE that id 142 and 97 got same kafka offset.
{noformat}
* When using TransactionalTridentKafkaSpout class, we got different txid and
continuance offset value but the process speed would be slow, because some
unnecessary loops in function MasterBatchCoordinator.sync(). And the greater
Config.TOPOLOGY_MAX_SPOUT_PENDING, the slower speed. we printed some logs here:
{noformat}
Config.TOPOLOGY_MAX_SPOUT_PENDING=100
Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50
14:05:00.337 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:135,offset:2546,nextOffset:2565]
14:05:00.483 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:136,offset:2565,nextOffset:2584]
14:05:00.495 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:197,offset:2584,nextOffset:2603]
14:05:03.550 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:198,offset:2603,nextOffset:2622]
14:05:03.593 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:199,offset:2622,nextOffset:2641]
please NOTICE the timestamp from id 136 to 198 and offset.
{noformat}
{noformat}
Config.TOPOLOGY_MAX_SPOUT_PENDING=1000
Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50
11:35:36.265 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:232,offset:228,nextOffset:247]
11:35:36.305 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:233,offset:247,nextOffset:266]
11:35:36.343 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:446,offset:266,nextOffset:285]
11:35:41.345 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:1266,offset:285,nextOffset:304]
11:35:47.063 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:1330,offset:304,nextOffset:323]
11:35:56.221 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
emit:[id:1447,offset:323,nextOffset:342]
please notice every log's timestamp,txid and offset.
{noformat}
We thought the txid's distribution algorithm needs to be with the continuous
principle in MasterBatchCoordinator class. ONLY when the time windows and other
condition is ready, the txid could be added.
> trident get repeat data from kafka
> -----------------------------------
>
> Key: STORM-1557
> URL: https://issues.apache.org/jira/browse/STORM-1557
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-core
> Reporter: miaoyichen
> Assignee: miaoyichen
>
> hello, i'm Felix.
> When we used Trident API and set Config.TOPOLOGY_MAX_SPOUT_PENDING greater
> than 1, assuming 100, we found the txid was incontinuous.
> That phenomenon has two effects:
> * When using {color:red} OpaqueTridentKafkaSpout class {color}, we found that
> the {color:red} different txid got same offset which lead that we got
> repeated data value from kafka {color}. we printed some logs here:
> {noformat}
> 11:10:13.516 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
> emit:[id:96,offset:1805,nextOffset:1824]
> 11:10:13.567 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
> emit:[id:142,offset:1824,nextOffset:1843]
> 11:10:13.619 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
> emit:[id:97,offset:1824,nextOffset:1843]
> 11:10:13.670 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
> emit:[id:98,offset:1843,nextOffset:1862]
> please NOTICE that id 142 and 97 got same kafka offset.
> {noformat}
> * When using {color:red} TransactionalTridentKafkaSpout class {color}, we got
> different txid and continuance offset value but the process {color:red} speed
> would be slow {color}, because some unnecessary loops in function
> MasterBatchCoordinator.sync(). And the greater
> Config.TOPOLOGY_MAX_SPOUT_PENDING, the slower speed. we printed some logs
> here:
> {noformat}
> Config.TOPOLOGY_MAX_SPOUT_PENDING=100
> Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50
> 14:05:00.337 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
> emit:[id:135,offset:2546,nextOffset:2565]
> 14:05:00.483 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
> emit:[id:136,offset:2565,nextOffset:2584]
> 14:05:00.495 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
> emit:[id:197,offset:2584,nextOffset:2603]
> 14:05:03.550 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
> emit:[id:198,offset:2603,nextOffset:2622]
> 14:05:03.593 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
> emit:[id:199,offset:2622,nextOffset:2641]
> please NOTICE the timestamp from id 136 to 198 and offset.
> {noformat}
> {noformat}
> Config.TOPOLOGY_MAX_SPOUT_PENDING=1000
> Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50
> 11:35:36.265 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
> emit:[id:232,offset:228,nextOffset:247]
> 11:35:36.305 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
> emit:[id:233,offset:247,nextOffset:266]
> 11:35:36.343 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
> emit:[id:446,offset:266,nextOffset:285]
> 11:35:41.345 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
> emit:[id:1266,offset:285,nextOffset:304]
> 11:35:47.063 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
> emit:[id:1330,offset:304,nextOffset:323]
> 11:35:56.221 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter -
> emit:[id:1447,offset:323,nextOffset:342]
> please notice every log's timestamp,txid and offset.
> {noformat}
> We thought the txid's distribution algorithm needs to be with the continuous
> principle in MasterBatchCoordinator class. ONLY when the time windows and
> other condition is ready, the txid could be added.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)