[
https://issues.apache.org/jira/browse/STORM-837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628729#comment-14628729
]
Robert Joseph Evans commented on STORM-837:
-------------------------------------------
So for each instance of the state beginCommit, update and commit will be called
from a single thread, but if there is parallelism for the state, each of them
may be called in parallel but for different instances. We have to store the
batch ID somewhere to make this work, but we need a way to dedupe the data,
either on the read side or as part of the commit If we store the batch ID with
each tuple and do nothing on the commit side the consumer of the data has to
dedup duplicate batches. We can do that on the read side but we essentially
have to dedupe everything in the data set, even previously processed data, and
hope that the storm code produced the exact same result each time or else we
could end up with a problem. Because of the complexity of doign this right,
and the fact that we would put this burden on the end user I personally would
rather see two files employed and we dedupe on the commit side. One file is
for writing out data, and the second file is batch ID index into the first file.
In the simplest case the second file is just an edit log and can be thrown away
when the commit is complete.
beginCommit would write out to the edit log that batch X is starting at file Y
offset Z and flush to be sure it is on disk.
update would write out the data similar to what it does now.
commit would at least flush the data file and possibly close and rotate it to
the final location, and would write out to the edit log that batch X is
finished at file Y offset whatever.
The interesting code would all be in the error handling.
If the state comes up and it sees existing files that have not been rotated it
needs to recover any batches that were committed but not rotated to the final
location.
If beginCommit is called without the previous batch ID being committed {
if the commit ID is the same as the current batch ID, we need to recover
the batches that succeeded but have not been rotated, then continue on as normal
else if the commit ID is not the same as the current batch ID we disable
writing any data out until the current batch ID fully commits.
}
if beginCommit is called for a batchID that we already completed we disable
output until we move on to the next batch ID.
There may be other cases that we need to think about too, I just threw this out
as a starting point.
> HdfsState ignores commits
> -------------------------
>
> Key: STORM-837
> URL: https://issues.apache.org/jira/browse/STORM-837
> Project: Apache Storm
> Issue Type: Bug
> Reporter: Robert Joseph Evans
> Assignee: Arun Mahadevan
> Priority: Critical
>
> HdfsState works with trident which is supposed to provide exactly once
> processing. It does this two ways, first by informing the state about
> commits so it can be sure the data is written out, and second by having a
> commit id, so that double commits can be handled.
> HdfsState ignores the beginCommit and commit calls, and with that ignores the
> ids. This means that if you use HdfsState and your worker crashes you may
> both lose data and get some data twice.
> At a minimum the flush and file rotation should be tied to the commit in some
> way. The commit ID should at a minimum be written out with the data so
> someone reading the data can have a hope of deduping it themselves.
> Also with the rotationActions it is possible for a file that was partially
> written is leaked, and never moved to the final location, because it is not
> rotated. I personally think the actions are too generic for this case and
> need to be deprecated.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)