[ 
https://issues.apache.org/jira/browse/STORM-837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14640886#comment-14640886
 ] 

ASF GitHub Bot commented on STORM-837:
--------------------------------------

Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/644#issuecomment-124612279
  
    It took me some time to follow what's going on with this patch, so I'll 
document it here for the benefit other reviewers.
    
    It operates by using the concept of a temporary "transaction file" that 
contains the last successfully written/committed file write offset and the 
transaction ID. That file is initially read in the `prepare()` method call, and 
is initialized if it does not already exist. The POJO representing that file is 
stored in a `lastTxn` instance variable, which represents that last committed 
transaction.
    
    In the `beginCommit()` method, it checks the transaction ID against the one 
stored in `lastTxn`. If it is <= (indicating a replay of a batch), it triggers 
recovery (more on that later), otherwise it continues normally (writing data in 
the `execute()` method).
    
    In the `commit()` method, it essentially re-writes the transaction file 
with the new transaction ID and the current file offset after the latest batch 
data was written. This offset serves as the starting point for recovery.
    
    **Recovery**
    When recovery is triggered, it is assumed that there was a failure during 
the `execute()` method, and the data file now contains data from tuples that 
will be replayed. If it were to simply append to that file, there would be 
duplicate records. So it effectively truncates the data file by creating new 
file, and copying everything from the old data file *up to the last 
successfully committed offset,* and discarding the remaining data.
    
    I think this is an interesting approach and is certainly more resilient 
than what we currently have, but I think there's a scaling problem with the 
recovery functionality. Recovery works by essentially copying the old/orphaned 
data file into a new file and skipping any extra, uncommitted data it contains. 
This is fine for small files, but what about the case where the data file is 
really big and the recovery time would take longer than 
`topology.message.timeout.secs`?
    
    Another problem I see is the lack of support for time-based file rotation. 
I understand why it's not supported by this patch, but as I mentioned earlier, 
that's one of the most common use cases.


> HdfsState ignores commits
> -------------------------
>
>                 Key: STORM-837
>                 URL: https://issues.apache.org/jira/browse/STORM-837
>             Project: Apache Storm
>          Issue Type: Bug
>            Reporter: Robert Joseph Evans
>            Assignee: Arun Mahadevan
>            Priority: Critical
>
> HdfsState works with trident which is supposed to provide exactly once 
> processing.  It does this two ways, first by informing the state about 
> commits so it can be sure the data is written out, and second by having a 
> commit id, so that double commits can be handled.
> HdfsState ignores the beginCommit and commit calls, and with that ignores the 
> ids.  This means that if you use HdfsState and your worker crashes you may 
> both lose data and get some data twice.
> At a minimum the flush and file rotation should be tied to the commit in some 
> way.  The commit ID should at a minimum be written out with the data so 
> someone reading the data can have a hope of deduping it themselves.
> Also with the rotationActions it is possible for a file that was partially 
> written is leaked, and never moved to the final location, because it is not 
> rotated.  I personally think the actions are too generic for this case and 
> need to be deprecated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to