[
https://issues.apache.org/jira/browse/STORM-837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695931#comment-14695931
]
ASF GitHub Bot commented on STORM-837:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/storm/pull/644#discussion_r37024551
--- Diff:
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
---
@@ -69,63 +78,92 @@
abstract void doPrepare(Map conf, int partitionIndex, int
numPartitions) throws IOException;
- protected void rotateOutputFile() throws IOException {
+ abstract long getCurrentOffset() throws IOException;
+
+ abstract void doCommit(Long txId) throws IOException;
+
+ abstract void doRecover(Path srcPath, long nBytes) throws
Exception;
+
+ protected void rotateOutputFile(boolean doRotateAction) throws
IOException {
LOG.info("Rotating output file...");
long start = System.currentTimeMillis();
- synchronized (this.writeLock) {
- closeOutputFile();
- this.rotation++;
-
- Path newFile = createOutputFile();
+ closeOutputFile();
+ this.rotation++;
+ Path newFile = createOutputFile();
+ if (doRotateAction) {
LOG.info("Performing {} file rotation actions.",
this.rotationActions.size());
for (RotationAction action : this.rotationActions) {
action.execute(this.fs, this.currentFile);
}
- this.currentFile = newFile;
}
+ this.currentFile = newFile;
long time = System.currentTimeMillis() - start;
LOG.info("File rotation took {} ms.", time);
+ }
-
+ protected void rotateOutputFile() throws IOException {
+ rotateOutputFile(true);
}
- void prepare(Map conf, int partitionIndex, int numPartitions){
- this.writeLock = new Object();
- if (this.rotationPolicy == null) throw new
IllegalStateException("RotationPolicy must be specified.");
+
+ void prepare(Map conf, int partitionIndex, int numPartitions) {
+ if (this.rotationPolicy == null) {
+ throw new IllegalStateException("RotationPolicy must be
specified.");
+ } else if (this.rotationPolicy instanceof
FileSizeRotationPolicy) {
+ long rotationBytes = ((FileSizeRotationPolicy)
rotationPolicy).getMaxBytes();
+ LOG.warn("FileSizeRotationPolicy specified with {}
bytes.", rotationBytes);
+ LOG.warn("Recovery will fail if data files cannot be
copied within topology.message.timeout.secs.");
+ LOG.warn("Ensure that the data files does not grow too big
with the FileSizeRotationPolicy.");
+ } else if (this.rotationPolicy instanceof TimedRotationPolicy)
{
+ LOG.warn("TimedRotationPolicy specified with interval {}
ms.", ((TimedRotationPolicy) rotationPolicy).getInterval());
+ LOG.warn("Recovery will fail if data files cannot be
copied within topology.message.timeout.secs.");
+ LOG.warn("Ensure that the data files does not grow too big
with the TimedRotationPolicy.");
+ }
--- End diff --
It might be nicer to move this code into the subclasses own methods:
FileSizeRotationPolicy#prepare and TimedRotationPolicy#prepare. These prepare
methods can each call `super.prepare(conf, partitionIndex, numPartitions)`.
> HdfsState ignores commits
> -------------------------
>
> Key: STORM-837
> URL: https://issues.apache.org/jira/browse/STORM-837
> Project: Apache Storm
> Issue Type: Bug
> Reporter: Robert Joseph Evans
> Assignee: Arun Mahadevan
> Priority: Critical
>
> HdfsState works with trident which is supposed to provide exactly once
> processing. It does this two ways, first by informing the state about
> commits so it can be sure the data is written out, and second by having a
> commit id, so that double commits can be handled.
> HdfsState ignores the beginCommit and commit calls, and with that ignores the
> ids. This means that if you use HdfsState and your worker crashes you may
> both lose data and get some data twice.
> At a minimum the flush and file rotation should be tied to the commit in some
> way. The commit ID should at a minimum be written out with the data so
> someone reading the data can have a hope of deduping it themselves.
> Also with the rotationActions it is possible for a file that was partially
> written is leaked, and never moved to the final location, because it is not
> rotated. I personally think the actions are too generic for this case and
> need to be deprecated.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)