[ https://issues.apache.org/jira/browse/STORM-969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14700305#comment-14700305 ]
ASF GitHub Bot commented on STORM-969: -------------------------------------- Github user dossett commented on a diff in the pull request: https://github.com/apache/storm/pull/664#discussion_r37242296 --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java --- @@ -88,35 +99,94 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector @Override public void execute(Tuple tuple) { - try { - byte[] bytes = this.format.format(tuple); - synchronized (this.writeLock) { - out.write(bytes); - this.offset += bytes.length; - - if (this.syncPolicy.mark(tuple, this.offset)) { - if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - } else { - this.out.hsync(); - } - this.syncPolicy.reset(); + boolean forceRotate = false; + synchronized (this.writeLock) { + boolean forceSync = false; + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + } + else { + try { + writeAndAddTuple(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); } } - this.collector.ack(tuple); + if (this.syncPolicy.mark(tuple, this.offset) || forceSync) { + try { + syncAndAckTuples(); + } catch (IOException e) { + LOG.warn("Data could not be synced to filesystem, failing this batch of tuples"); + this.collector.reportError(e); + //Force rotation to get a new file handle + forceRotate = true; + for (Tuple t : tupleBatch) + this.collector.fail(t); + tupleBatch.clear(); + } + } + } - if(this.rotationPolicy.mark(tuple, this.offset)){ - rotateOutputFile(); // synchronized - this.offset = 0; - this.rotationPolicy.reset(); + if(this.rotationPolicy.mark(tuple, this.offset) || forceRotate) { + try { + rotateAndReset(); + } catch (IOException e) { + this.collector.reportError(e); + LOG.warn("File could not be rotated"); + //At this point there is nothing to do. In all likelihood any filesystem operations will fail. + //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That + //will give rotateAndReset() a chance to work which includes creating a fresh file handle. } - } catch (IOException e) { - this.collector.reportError(e); - this.collector.fail(tuple); } } + private void rotateAndReset() throws IOException { + rotateOutputFile(); // synchronized + this.offset = 0; + this.rotationPolicy.reset(); + } + + private void syncAndAckTuples() throws IOException { --- End diff -- Agreed, just pushed a commit to do that. > HDFS Bolt can end up in an unrecoverable state > ---------------------------------------------- > > Key: STORM-969 > URL: https://issues.apache.org/jira/browse/STORM-969 > Project: Apache Storm > Issue Type: Improvement > Components: storm-hdfs > Reporter: Aaron Dossett > Assignee: Aaron Dossett > > The body of the HDFSBolt.execute() method is essentially one try-catch block. > The catch block reports the error and fails the current tuple. In some > cases the bolt's FSDataOutputStream object (named 'out') is in an > unrecoverable state and no subsequent calls to execute() can succeed. > To produce this scenario: > - process some tuples through HDFS bolt > - put the underlying HDFS system into safemode > - process some more tuples and receive a correct ClosedChannelException > - take the underlying HDFS system out of safemode > - subsequent tuples continue to fail with the same exception > The three fundamental operations that execute takes (writing, sync'ing, > rotating) need to be isolated so that errors from each are specifically > handled. -- This message was sent by Atlassian JIRA (v6.3.4#6332)