[ 
https://issues.apache.org/jira/browse/STORM-124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rick Kellogg updated STORM-124:
-------------------------------
    Component/s: storm-core

> Tuple Lost in Trident Topology on Remote Cluster, while the same topology 
> works correctly on local cluster
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: STORM-124
>                 URL: https://issues.apache.org/jira/browse/STORM-124
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-core
>            Reporter: James Xu
>            Priority: Minor
>
> https://github.com/nathanmarz/storm/issues/535
> I have created a trident topology, while it runs correctly on local cluster , 
> it gets random result when deployed on remote cluster due to tuple loss in 
> trident
> Code example is as followed, In ExtractFunction it will parse kafka message 
> and create 4 tuples from the content(5 fields), then CustomReduceAggregator 
> will aggregate based on first 4 fields.
> I generated 800 kafka messages during the test, ExtractFunction is expected 
> to get 800 tuples and generate 800*4=3200 tuples for CustomReduceAggregator.
> This works correctly in local cluster.
> In remoted cluster, while ExtractFunction generated 3200 tuples, 
> CustomReduceAggregator will always get only a few hundred tuples, therefore 
> the result is not correct
> Is this a bug in storm/trident or there is some configuration needed for my 
> trident topology?
>     TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(new 
> ZkHosts(zkServer+":"+zkPort, zkBrokerPath), kafkaTopic);
>     tridentKafkaConfig.forceStartOffsetTime(-2);
>     TridentTopology topology=new TridentTopology();
>     TridentState tridentState = 
> topology.newStream("kafkaSpout"+System.currentTimeMillis(), new 
> TransactionalTridentKafkaSpout(tridentKafkaConfig))
>         .parallelismHint(1)
>         .each(new Fields("bytes"), new ExtractFunction(), new Fields("a", 
> "b", "c", "b", "e"))
>         .parallelismHint(1)
>         .groupBy(new Fields("a", "b", "c", "f"))
>         .persistentAggregate(new 
> CassandraMapState.Factory(StateType.TRANSACTIONAL, "cf_1"),  
>                 new Fields("a", "b", "c", "b", "e"), 
>                 new CustomReduceAggregator(), 
>                 new Fields("value"))
>         .parallelismHint(1);
>     topology.newDRPCStream("profile", localDRPC)
>         .each(new Fields("args"), new DrpcParamSplitFunction(), new 
> Fields("a", "b", "c", "b"))
>         .groupBy(new Fields("a", "b", "c", "b"))
>         .stateQuery(tridentState, new Fields("a", "b", "c", "b"), new 
> MapGet(), new Fields("value"));
>     return topology.build();



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to