[ 
https://issues.apache.org/jira/browse/STORM-837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695931#comment-14695931
 ] 

ASF GitHub Bot commented on STORM-837:
--------------------------------------

Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/644#discussion_r37024551
  
    --- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java 
---
    @@ -69,63 +78,92 @@
     
             abstract void doPrepare(Map conf, int partitionIndex, int 
numPartitions) throws IOException;
     
    -        protected void rotateOutputFile() throws IOException {
    +        abstract long getCurrentOffset() throws  IOException;
    +
    +        abstract void doCommit(Long txId) throws IOException;
    +
    +        abstract void doRecover(Path srcPath, long nBytes) throws 
Exception;
    +
    +        protected void rotateOutputFile(boolean doRotateAction) throws 
IOException {
                 LOG.info("Rotating output file...");
                 long start = System.currentTimeMillis();
    -            synchronized (this.writeLock) {
    -                closeOutputFile();
    -                this.rotation++;
    -
    -                Path newFile = createOutputFile();
    +            closeOutputFile();
    +            this.rotation++;
    +            Path newFile = createOutputFile();
    +            if (doRotateAction) {
                     LOG.info("Performing {} file rotation actions.", 
this.rotationActions.size());
                     for (RotationAction action : this.rotationActions) {
                         action.execute(this.fs, this.currentFile);
                     }
    -                this.currentFile = newFile;
                 }
    +            this.currentFile = newFile;
                 long time = System.currentTimeMillis() - start;
                 LOG.info("File rotation took {} ms.", time);
    +        }
     
    -
    +        protected void rotateOutputFile() throws IOException {
    +            rotateOutputFile(true);
             }
     
    -        void prepare(Map conf, int partitionIndex, int numPartitions){
    -            this.writeLock = new Object();
    -            if (this.rotationPolicy == null) throw new 
IllegalStateException("RotationPolicy must be specified.");
    +
    +        void prepare(Map conf, int partitionIndex, int numPartitions) {
    +            if (this.rotationPolicy == null) {
    +                throw new IllegalStateException("RotationPolicy must be 
specified.");
    +            } else if (this.rotationPolicy instanceof 
FileSizeRotationPolicy) {
    +                long rotationBytes = ((FileSizeRotationPolicy) 
rotationPolicy).getMaxBytes();
    +                LOG.warn("FileSizeRotationPolicy specified with {} 
bytes.", rotationBytes);
    +                LOG.warn("Recovery will fail if data files cannot be 
copied within topology.message.timeout.secs.");
    +                LOG.warn("Ensure that the data files does not grow too big 
with the FileSizeRotationPolicy.");
    +            } else if (this.rotationPolicy instanceof TimedRotationPolicy) 
{
    +                LOG.warn("TimedRotationPolicy specified with interval {} 
ms.", ((TimedRotationPolicy) rotationPolicy).getInterval());
    +                LOG.warn("Recovery will fail if data files cannot be 
copied within topology.message.timeout.secs.");
    +                LOG.warn("Ensure that the data files does not grow too big 
with the TimedRotationPolicy.");
    +            }
    --- End diff --
    
    It might be nicer to move this code into the subclasses own methods: 
FileSizeRotationPolicy#prepare and TimedRotationPolicy#prepare.  These prepare 
methods can each call `super.prepare(conf, partitionIndex, numPartitions)`.


> HdfsState ignores commits
> -------------------------
>
>                 Key: STORM-837
>                 URL: https://issues.apache.org/jira/browse/STORM-837
>             Project: Apache Storm
>          Issue Type: Bug
>            Reporter: Robert Joseph Evans
>            Assignee: Arun Mahadevan
>            Priority: Critical
>
> HdfsState works with trident which is supposed to provide exactly once 
> processing.  It does this two ways, first by informing the state about 
> commits so it can be sure the data is written out, and second by having a 
> commit id, so that double commits can be handled.
> HdfsState ignores the beginCommit and commit calls, and with that ignores the 
> ids.  This means that if you use HdfsState and your worker crashes you may 
> both lose data and get some data twice.
> At a minimum the flush and file rotation should be tied to the commit in some 
> way.  The commit ID should at a minimum be written out with the data so 
> someone reading the data can have a hope of deduping it themselves.
> Also with the rotationActions it is possible for a file that was partially 
> written is leaked, and never moved to the final location, because it is not 
> rotated.  I personally think the actions are too generic for this case and 
> need to be deprecated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to