[ 
https://issues.apache.org/jira/browse/SAMZA-699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14573112#comment-14573112
 ] 

Yan Fang commented on SAMZA-699:
--------------------------------

Thanks, [~a.pejakovic]. Added you to the contributor. The patch looks good for 
me.

[~naveenatceg], does this change make sense to you? 

> CoordinatorStreamMessages lose orders in CoordinatorStreamSystemConsumer
> ------------------------------------------------------------------------
>
>                 Key: SAMZA-699
>                 URL: https://issues.apache.org/jira/browse/SAMZA-699
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Yan Fang
>         Attachments: SAMZA-TASK-699.0.patch
>
>
> Currently we are using HashSet<CoordinatorStreamMessage>() for both 
> bootstrappedStreamSet and bootstrappedStream. But the messages' order in the 
> Set is unpredictable. This causes the wrong checkpoint. Something like:
> {code}
> 2015-06-02 16:18:39 CheckpointManager [DEBUG] Adding checkpoint Partition 1 
> for taskName Partition 0
> 2015-06-02 16:18:39 CheckpointManager [INFO] previous checkpoint offsets 
> Partition 0 Checkpoint [offsets={}]
> 2015-06-02 16:18:39 CheckpointManager [DEBUG] Adding checkpoint Partition 0 
> for taskName Partition 0
> 2015-06-02 16:18:39 CheckpointManager [INFO] previous checkpoint offsets 
> Partition 0 Checkpoint [offsets={SystemStreamPartition [kafka, 
> test-global-stream, 0]=52, SystemStreamPartition [kafka, test-input-stream, 
> 0]=330}]
> 2015-06-02 16:18:39 CheckpointManager [DEBUG] Adding checkpoint Partition 0 
> for taskName Partition 0
> 2015-06-02 16:18:39 CheckpointManager [INFO] previous checkpoint offsets 
> Partition 2 Checkpoint [offsets={}]
> 2015-06-02 16:18:39 CheckpointManager [DEBUG] Adding checkpoint Partition 2 
> for taskName Partition 0
> 2015-06-02 16:18:39 CheckpointManager [INFO] previous checkpoint offsets 
> Partition 2 Checkpoint [offsets={}]
> {code}
> "previous checkpoint offsets" is added log, not from the system.
> The checkpoint maybe overwritten by old checkpoint.
> So I think we should use LinkedHashSet to preserve the order.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to