[ 
https://issues.apache.org/jira/browse/STORM-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14965667#comment-14965667
 ] 

ASF GitHub Bot commented on STORM-1073:
---------------------------------------

Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/767#discussion_r42546202
  
    --- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
    @@ -127,9 +145,112 @@ public void run() {
         }
     
         @Override
    +    public final void execute(Tuple tuple) {
    +
    +        synchronized (this.writeLock) {
    +            boolean forceSync = false;
    +            if (TupleUtils.isTick(tuple)) {
    +                LOG.debug("TICK! forcing a file system flush");
    +                forceSync = true;
    +            } else {
    +                try {
    +                    writeTuple(tuple);
    +                    tupleBatch.add(tuple);
    +                } catch (IOException e) {
    +                    //If the write failed, try to sync anything already 
written
    +                    LOG.info("Tuple failed to write, forcing a flush of 
existing data.");
    +                    this.collector.reportError(e);
    +                    forceSync = true;
    +                    this.collector.fail(tuple);
    +                }
    +            }
    +
    +            if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && 
tupleBatch.size() > 0)) {
    +                int attempts = 0;
    +                boolean success = false;
    +                IOException lastException = null;
    +                // Make every attempt to sync the data we have.  If it 
can't be done then kill the bolt with
    +                // a runtime exception.  The filesystem is presumably in a 
very bad state.
    +                while (success == false && attempts < fileRetryCount) {
    +                    attempts += 1;
    +                    try {
    +                        syncTuples();
    +                        LOG.debug("Data synced to filesystem. Ack'ing [{}] 
tuples", tupleBatch.size());
    +                        for (Tuple t : tupleBatch) {
    +                            this.collector.ack(t);
    +                        }
    +                        tupleBatch.clear();
    +                        syncPolicy.reset();
    +                        success = true;
    +                    } catch (IOException e) {
    +                        LOG.warn("Data could not be synced to filesystem 
on attempt [" + attempts + "]");
    +                        this.collector.reportError(e);
    +                        lastException = e;
    +                    }
    +                }
    +
    +                // If unsuccesful fail the pending tuples
    +                if (success == false) {
    +                    LOG.warn("Data could not be synced to filesystem, 
failing this batch of tuples");
    +                    for (Tuple t : tupleBatch) {
    +                        this.collector.fail(t);
    +                    }
    +                    tupleBatch.clear();
    +
    +                    throw new RuntimeException("Sync failed [" + attempts 
+ "] times.", lastException);
    +                }
    +            }
    +
    +            if(this.rotationPolicy.mark(tuple, this.offset)) {
    +                try {
    +                    rotateOutputFile();
    +                    this.rotationPolicy.reset();
    +                    this.offset = 0;
    +                } catch (IOException e) {
    +                    this.collector.reportError(e);
    +                    LOG.warn("File could not be rotated");
    +                    //At this point there is nothing to do.  In all 
likelihood any filesystem operations will fail.
    +                    //The next tuple will almost certainly fail to write 
and/or sync, which force a rotation.  That
    +                    //will give rotateAndReset() a chance to work which 
includes creating a fresh file handle.
    +                }
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public Map<String, Object> getComponentConfiguration() {
    +        Map<String, Object> conf = super.getComponentConfiguration();
    +        if (conf == null)
    +            conf = new Config();
    +
    +        if (tickTupleInterval > 0) {
    +            LOG.info("Enabling tick tuple with interval [" + 
tickTupleInterval + "]");
    --- End diff --
    
    Curly brace anchor for logging as above.


> SequenceFileBolt can end up in an unrecoverable state
> -----------------------------------------------------
>
>                 Key: STORM-1073
>                 URL: https://issues.apache.org/jira/browse/STORM-1073
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-hdfs
>            Reporter: Aaron Dossett
>            Assignee: Aaron Dossett
>
> SequenceFileBolt has the same issues that HdfsBolt has in STORM-969.  This is 
> also an opportunity to refactor AbstractHdfsBolt to most efficiently include 
> these changes:
> Abstract HdfsBolt should define a concrete execute method and define abstract 
> methods for:
> - writing a tuple
> - syncronizing file output



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to