[ 
https://issues.apache.org/jira/browse/STORM-837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628729#comment-14628729
 ] 

Robert Joseph Evans commented on STORM-837:
-------------------------------------------

So for each instance of the state beginCommit, update and commit will be called 
from a single thread, but if there is parallelism for the state, each of them 
may be called in parallel but for different instances.  We have to store the 
batch ID somewhere to make this work, but we need a way to dedupe the data, 
either on the read side or as part of the commit  If we store the batch ID with 
each tuple and do nothing on the commit side the consumer of the data has to 
dedup duplicate batches.  We can do that on the read side but we essentially 
have to dedupe everything in the data set, even previously processed data, and 
hope that the storm code produced the exact same result each time or else we 
could end up with a problem.  Because of the complexity of doign this right, 
and the fact that we would put this burden on the end user I personally would 
rather see two files employed and we dedupe on the commit side.  One file is 
for writing out data, and the second file is batch ID index into the first file.

In the simplest case the second file is just an edit log and can be thrown away 
when the commit is complete.

beginCommit would write out to the edit log that batch X is starting at file Y 
offset Z and flush to be sure it is on disk.
update would write out the data similar to what it does now.
commit would at least flush the data file and possibly close and rotate it to 
the final location, and would write out to the edit log that batch X is 
finished at file Y offset whatever.

The interesting code would all be in the error handling.

If the state comes up and it sees existing files that have not been rotated it 
needs to recover any batches that were committed but not rotated to the final 
location.
If beginCommit is called without the previous batch ID being committed {
    if the commit ID is the same as the current batch ID, we need to recover 
the batches that succeeded but have not been rotated, then continue on as normal
    else if the commit ID is not the same as the current batch ID we disable 
writing any data out until the current batch ID fully commits. 
}
if beginCommit is called for a batchID that we already completed we disable 
output until we move on to the next batch ID.

There may be other cases that we need to think about too, I just threw this out 
as a starting point.

> HdfsState ignores commits
> -------------------------
>
>                 Key: STORM-837
>                 URL: https://issues.apache.org/jira/browse/STORM-837
>             Project: Apache Storm
>          Issue Type: Bug
>            Reporter: Robert Joseph Evans
>            Assignee: Arun Mahadevan
>            Priority: Critical
>
> HdfsState works with trident which is supposed to provide exactly once 
> processing.  It does this two ways, first by informing the state about 
> commits so it can be sure the data is written out, and second by having a 
> commit id, so that double commits can be handled.
> HdfsState ignores the beginCommit and commit calls, and with that ignores the 
> ids.  This means that if you use HdfsState and your worker crashes you may 
> both lose data and get some data twice.
> At a minimum the flush and file rotation should be tied to the commit in some 
> way.  The commit ID should at a minimum be written out with the data so 
> someone reading the data can have a hope of deduping it themselves.
> Also with the rotationActions it is possible for a file that was partially 
> written is leaked, and never moved to the final location, because it is not 
> rotated.  I personally think the actions are too generic for this case and 
> need to be deprecated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to