[ 
https://issues.apache.org/jira/browse/STORM-969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14700714#comment-14700714
 ] 

ASF GitHub Bot commented on STORM-969:
--------------------------------------

Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/664#discussion_r37263411
  
    --- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java ---
    @@ -88,35 +99,94 @@ public void doPrepare(Map conf, TopologyContext 
topologyContext, OutputCollector
     
         @Override
         public void execute(Tuple tuple) {
    -        try {
    -            byte[] bytes = this.format.format(tuple);
    -            synchronized (this.writeLock) {
    -                out.write(bytes);
    -                this.offset += bytes.length;
    -
    -                if (this.syncPolicy.mark(tuple, this.offset)) {
    -                    if (this.out instanceof HdfsDataOutputStream) {
    -                        ((HdfsDataOutputStream) 
this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
    -                    } else {
    -                        this.out.hsync();
    -                    }
    -                    this.syncPolicy.reset();
    +        boolean forceRotate = false;
    +        synchronized (this.writeLock) {
    +            boolean forceSync = false;
    +            if (TupleUtils.isTick(tuple)) {
    +                LOG.debug("TICK! forcing a file system flush");
    +                forceSync = true;
    +            }
    +            else {
    +                try {
    +                    writeAndAddTuple(tuple);
    +                } catch (IOException e) {
    +                    //If the write failed, try to sync anything already 
written
    +                    LOG.info("Tuple failed to write, forcing a flush of 
existing data.");
    +                    this.collector.reportError(e);
    +                    forceSync = true;
    +                    this.collector.fail(tuple);
                     }
                 }
     
    -            this.collector.ack(tuple);
    +            if (this.syncPolicy.mark(tuple, this.offset) || forceSync) {
    +                try {
    +                    syncAndAckTuples();
    +                } catch (IOException e) {
    +                    LOG.warn("Data could not be synced to filesystem, 
failing this batch of tuples");
    +                    this.collector.reportError(e);
    +                    //Force rotation to get a new file handle
    +                    forceRotate = true;
    +                    for (Tuple t : tupleBatch)
    +                        this.collector.fail(t);
    +                    tupleBatch.clear();
    +                }
    +            }
    +        }
     
    -            if(this.rotationPolicy.mark(tuple, this.offset)){
    -                rotateOutputFile(); // synchronized
    -                this.offset = 0;
    -                this.rotationPolicy.reset();
    +        if(this.rotationPolicy.mark(tuple, this.offset) || forceRotate) {
    +            try {
    +                rotateAndReset();
    +            } catch (IOException e) {
    +                this.collector.reportError(e);
    +                LOG.warn("File could not be rotated");
    +                //At this point there is nothing to do.  In all likelihood 
any filesystem operations will fail.
    +                //The next tuple will almost certainly fail to write 
and/or sync, which force a rotation.  That
    +                //will give rotateAndReset() a chance to work which 
includes creating a fresh file handle.
                 }
    -        } catch (IOException e) {
    -            this.collector.reportError(e);
    -            this.collector.fail(tuple);
             }
         }
     
    +    private void rotateAndReset() throws IOException {
    --- End diff --
    
    You could have a low default (say 10 continuous failures or so) and also 
add an option for the users to change this. @harshach can you recommend a 
default number of retries after which the task can exit so that storm would 
spawn the task in another worker where it might succeed?


> HDFS Bolt can end up in an unrecoverable state
> ----------------------------------------------
>
>                 Key: STORM-969
>                 URL: https://issues.apache.org/jira/browse/STORM-969
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-hdfs
>            Reporter: Aaron Dossett
>            Assignee: Aaron Dossett
>
> The body of the HDFSBolt.execute() method is essentially one try-catch block. 
>  The catch block reports the error and fails the current tuple.  In some 
> cases the bolt's FSDataOutputStream object (named 'out') is in an 
> unrecoverable state and no subsequent calls to execute() can succeed.
> To produce this scenario:
> - process some tuples through HDFS bolt
> - put the underlying HDFS system into safemode
> - process some more tuples and receive a correct ClosedChannelException
> - take the underlying HDFS system out of safemode
> - subsequent tuples continue to fail with the same exception
> The three fundamental operations that execute takes (writing, sync'ing, 
> rotating) need to be isolated so that errors from each are specifically 
> handled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to