[
https://issues.apache.org/jira/browse/STORM-1006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14993592#comment-14993592
]
Jark Wu commented on STORM-1006:
--------------------------------
Yes, I agree with you. I think a more elegent design is storm taking care of
the tuples (wrap it, add request id , de-wrap it like IP/TCP) 。 Maybe we
should open a new issue to talk about it.
> Storm is not garbage collecting the messages (causing memory hit)
> -----------------------------------------------------------------
>
> Key: STORM-1006
> URL: https://issues.apache.org/jira/browse/STORM-1006
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-kafka
> Affects Versions: 0.9.3
> Reporter: Sachin Pasalkar
>
> We are reading whole file in memory around 5 MB, which is send through Kafaka
> to Storm. In next bolt, we performs the operation on file and sends out tuple
> to next bolt. After profiling we found that file (bytes of file) does not get
> garbage collected. So after further investigation we found that
> backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String,
> Collection<Tuple>, List<Object>) API gets the first object and use it for
> tracking :(. Can you confirm reason behind this? Is there any way we can send
> different unique id as first element in list or the unique id of tuple used
> as indicator.
> However, for time being we have made changes in schema assigned to
> KafkaSpout, so that it will parse the file and send out list of values.
> If you below code CoordinatedBolt, "Object id = tuple.getValue(0);” takes the
> 1st element from tuple instead of taking id of tuple. This "id" is then saved
> to _tracked hashhMap(TimeCache). In our case the 0th element is files byte
> data. This gets stored in the _tracked map till tree of tuple doesn’t get
> complete. As we are processing huge data we run outofMemory issue.
> Code:
> public void execute(Tuple tuple) {
> *Object id = tuple.getValue(0);*
> TrackingInfo track;
> TupleType type = getTupleType(tuple);
> synchronized(_tracked) {
> track = _tracked.get(id);
> if(track==null) {
> track = new TrackingInfo();
> if(_idStreamSpec==null) track.receivedId = true;
> _tracked.put(id, track);*
> }
> }
> if(type==TupleType.ID) {
> synchronized(_tracked) {
> track.receivedId = true;
> }
> checkFinishId(tuple, type);
> } else if(type==TupleType.COORD) {
> int count = (Integer) tuple.getValue(1);
> synchronized(_tracked) {
> track.reportCount++;
> track.expectedTupleCount+=count;
> }
> checkFinishId(tuple, type);
> } else {
> synchronized(_tracked) {
> _delegate.execute(tuple);
> }
> }
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)