[ 
https://issues.apache.org/jira/browse/STORM-1557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

miaoyichen updated STORM-1557:
------------------------------
    Description: 
When we used Trident API and set Config.TOPOLOGY_MAX_SPOUT_PENDING greater than 
1, assuming 100, we found the txid was incontinuous.

That phenomenon has two effects:

* When using {color:red} OpaqueTridentKafkaSpout class {color}, we found that 
the {color:red} different txid got same offset which lead that we got repeated 
data value from kafka {color}. we printed some logs here:

{noformat}
11:10:13.516 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:96,offset:1805,nextOffset:1824]
11:10:13.567 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:142,offset:1824,nextOffset:1843]
11:10:13.619 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:97,offset:1824,nextOffset:1843]
11:10:13.670 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:98,offset:1843,nextOffset:1862]

please NOTICE that id 142 and 97 got same kafka offset.
{noformat}

* When using {color:red} TransactionalTridentKafkaSpout class {color}, we got 
different txid and continuance offset value but the process {color:red} speed 
would be slow {color}, because some unnecessary loops in function 
MasterBatchCoordinator.sync(). And the greater 
Config.TOPOLOGY_MAX_SPOUT_PENDING, the slower speed. we printed some logs here:

{noformat}
Config.TOPOLOGY_MAX_SPOUT_PENDING=100
Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50

14:05:00.337 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:135,offset:2546,nextOffset:2565]
14:05:00.483 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:136,offset:2565,nextOffset:2584]
14:05:00.495 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:197,offset:2584,nextOffset:2603]
14:05:03.550 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:198,offset:2603,nextOffset:2622]
14:05:03.593 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:199,offset:2622,nextOffset:2641]

please NOTICE the timestamp from id 136 to 198 and offset.
{noformat}


{noformat}
Config.TOPOLOGY_MAX_SPOUT_PENDING=1000
Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50
11:35:36.265 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:232,offset:228,nextOffset:247]
11:35:36.305 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:233,offset:247,nextOffset:266]
11:35:36.343 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:446,offset:266,nextOffset:285]
11:35:41.345 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:1266,offset:285,nextOffset:304]
11:35:47.063 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:1330,offset:304,nextOffset:323]
11:35:56.221 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:1447,offset:323,nextOffset:342]

please notice every log's timestamp,txid and offset.
{noformat}

We thought the txid's distribution algorithm needs to be with the continuous 
principle in MasterBatchCoordinator class. ONLY when the time windows and other 
condition is ready, the txid could be added.

{color:red}I submitted a solution on pull request #1041 
{color}[https://github.com/apache/storm/pull/1041]

  was:

When we used Trident API and set Config.TOPOLOGY_MAX_SPOUT_PENDING greater than 
1, assuming 100, we found the txid was incontinuous.

That phenomenon has two effects:

* When using {color:red} OpaqueTridentKafkaSpout class {color}, we found that 
the {color:red} different txid got same offset which lead that we got repeated 
data value from kafka {color}. we printed some logs here:

{noformat}
11:10:13.516 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:96,offset:1805,nextOffset:1824]
11:10:13.567 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:142,offset:1824,nextOffset:1843]
11:10:13.619 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:97,offset:1824,nextOffset:1843]
11:10:13.670 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:98,offset:1843,nextOffset:1862]

please NOTICE that id 142 and 97 got same kafka offset.
{noformat}

* When using {color:red} TransactionalTridentKafkaSpout class {color}, we got 
different txid and continuance offset value but the process {color:red} speed 
would be slow {color}, because some unnecessary loops in function 
MasterBatchCoordinator.sync(). And the greater 
Config.TOPOLOGY_MAX_SPOUT_PENDING, the slower speed. we printed some logs here:

{noformat}
Config.TOPOLOGY_MAX_SPOUT_PENDING=100
Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50

14:05:00.337 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:135,offset:2546,nextOffset:2565]
14:05:00.483 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:136,offset:2565,nextOffset:2584]
14:05:00.495 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:197,offset:2584,nextOffset:2603]
14:05:03.550 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:198,offset:2603,nextOffset:2622]
14:05:03.593 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:199,offset:2622,nextOffset:2641]

please NOTICE the timestamp from id 136 to 198 and offset.
{noformat}


{noformat}
Config.TOPOLOGY_MAX_SPOUT_PENDING=1000
Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50
11:35:36.265 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:232,offset:228,nextOffset:247]
11:35:36.305 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:233,offset:247,nextOffset:266]
11:35:36.343 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:446,offset:266,nextOffset:285]
11:35:41.345 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:1266,offset:285,nextOffset:304]
11:35:47.063 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:1330,offset:304,nextOffset:323]
11:35:56.221 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
emit:[id:1447,offset:323,nextOffset:342]

please notice every log's timestamp,txid and offset.
{noformat}

We thought the txid's distribution algorithm needs to be with the continuous 
principle in MasterBatchCoordinator class. ONLY when the time windows and other 
condition is ready, the txid could be added.

I submitted a solution on pull request #1041 
[https://github.com/apache/storm/pull/1041]


> trident get repeat data from kafka 
> -----------------------------------
>
>                 Key: STORM-1557
>                 URL: https://issues.apache.org/jira/browse/STORM-1557
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-core
>            Reporter: miaoyichen
>            Assignee: miaoyichen
>
> When we used Trident API and set Config.TOPOLOGY_MAX_SPOUT_PENDING greater 
> than 1, assuming 100, we found the txid was incontinuous.
> That phenomenon has two effects:
> * When using {color:red} OpaqueTridentKafkaSpout class {color}, we found that 
> the {color:red} different txid got same offset which lead that we got 
> repeated data value from kafka {color}. we printed some logs here:
> {noformat}
> 11:10:13.516 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
> emit:[id:96,offset:1805,nextOffset:1824]
> 11:10:13.567 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
> emit:[id:142,offset:1824,nextOffset:1843]
> 11:10:13.619 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
> emit:[id:97,offset:1824,nextOffset:1843]
> 11:10:13.670 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
> emit:[id:98,offset:1843,nextOffset:1862]
> please NOTICE that id 142 and 97 got same kafka offset.
> {noformat}
> * When using {color:red} TransactionalTridentKafkaSpout class {color}, we got 
> different txid and continuance offset value but the process {color:red} speed 
> would be slow {color}, because some unnecessary loops in function 
> MasterBatchCoordinator.sync(). And the greater 
> Config.TOPOLOGY_MAX_SPOUT_PENDING, the slower speed. we printed some logs 
> here:
> {noformat}
> Config.TOPOLOGY_MAX_SPOUT_PENDING=100
> Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50
> 14:05:00.337 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
> emit:[id:135,offset:2546,nextOffset:2565]
> 14:05:00.483 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
> emit:[id:136,offset:2565,nextOffset:2584]
> 14:05:00.495 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
> emit:[id:197,offset:2584,nextOffset:2603]
> 14:05:03.550 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
> emit:[id:198,offset:2603,nextOffset:2622]
> 14:05:03.593 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
> emit:[id:199,offset:2622,nextOffset:2641]
> please NOTICE the timestamp from id 136 to 198 and offset.
> {noformat}
> {noformat}
> Config.TOPOLOGY_MAX_SPOUT_PENDING=1000
> Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS=50
> 11:35:36.265 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
> emit:[id:232,offset:228,nextOffset:247]
> 11:35:36.305 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
> emit:[id:233,offset:247,nextOffset:266]
> 11:35:36.343 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
> emit:[id:446,offset:266,nextOffset:285]
> 11:35:41.345 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
> emit:[id:1266,offset:285,nextOffset:304]
> 11:35:47.063 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
> emit:[id:1330,offset:304,nextOffset:323]
> 11:35:56.221 [Thread-15-spout0] ERROR s.kafka.trident.TridentKafkaEmitter - 
> emit:[id:1447,offset:323,nextOffset:342]
> please notice every log's timestamp,txid and offset.
> {noformat}
> We thought the txid's distribution algorithm needs to be with the continuous 
> principle in MasterBatchCoordinator class. ONLY when the time windows and 
> other condition is ready, the txid could be added.
> {color:red}I submitted a solution on pull request #1041 
> {color}[https://github.com/apache/storm/pull/1041]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to