[
https://issues.apache.org/jira/browse/STORM-837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14640886#comment-14640886
]
ASF GitHub Bot commented on STORM-837:
--------------------------------------
Github user ptgoetz commented on the pull request:
https://github.com/apache/storm/pull/644#issuecomment-124612279
It took me some time to follow what's going on with this patch, so I'll
document it here for the benefit other reviewers.
It operates by using the concept of a temporary "transaction file" that
contains the last successfully written/committed file write offset and the
transaction ID. That file is initially read in the `prepare()` method call, and
is initialized if it does not already exist. The POJO representing that file is
stored in a `lastTxn` instance variable, which represents that last committed
transaction.
In the `beginCommit()` method, it checks the transaction ID against the one
stored in `lastTxn`. If it is <= (indicating a replay of a batch), it triggers
recovery (more on that later), otherwise it continues normally (writing data in
the `execute()` method).
In the `commit()` method, it essentially re-writes the transaction file
with the new transaction ID and the current file offset after the latest batch
data was written. This offset serves as the starting point for recovery.
**Recovery**
When recovery is triggered, it is assumed that there was a failure during
the `execute()` method, and the data file now contains data from tuples that
will be replayed. If it were to simply append to that file, there would be
duplicate records. So it effectively truncates the data file by creating new
file, and copying everything from the old data file *up to the last
successfully committed offset,* and discarding the remaining data.
I think this is an interesting approach and is certainly more resilient
than what we currently have, but I think there's a scaling problem with the
recovery functionality. Recovery works by essentially copying the old/orphaned
data file into a new file and skipping any extra, uncommitted data it contains.
This is fine for small files, but what about the case where the data file is
really big and the recovery time would take longer than
`topology.message.timeout.secs`?
Another problem I see is the lack of support for time-based file rotation.
I understand why it's not supported by this patch, but as I mentioned earlier,
that's one of the most common use cases.
> HdfsState ignores commits
> -------------------------
>
> Key: STORM-837
> URL: https://issues.apache.org/jira/browse/STORM-837
> Project: Apache Storm
> Issue Type: Bug
> Reporter: Robert Joseph Evans
> Assignee: Arun Mahadevan
> Priority: Critical
>
> HdfsState works with trident which is supposed to provide exactly once
> processing. It does this two ways, first by informing the state about
> commits so it can be sure the data is written out, and second by having a
> commit id, so that double commits can be handled.
> HdfsState ignores the beginCommit and commit calls, and with that ignores the
> ids. This means that if you use HdfsState and your worker crashes you may
> both lose data and get some data twice.
> At a minimum the flush and file rotation should be tied to the commit in some
> way. The commit ID should at a minimum be written out with the data so
> someone reading the data can have a hope of deduping it themselves.
> Also with the rotationActions it is possible for a file that was partially
> written is leaked, and never moved to the final location, because it is not
> rotated. I personally think the actions are too generic for this case and
> need to be deprecated.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)