[
https://issues.apache.org/jira/browse/STORM-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14965667#comment-14965667
]
ASF GitHub Bot commented on STORM-1073:
---------------------------------------
Github user arunmahadevan commented on a diff in the pull request:
https://github.com/apache/storm/pull/767#discussion_r42546202
--- Diff:
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
---
@@ -127,9 +145,112 @@ public void run() {
}
@Override
+ public final void execute(Tuple tuple) {
+
+ synchronized (this.writeLock) {
+ boolean forceSync = false;
+ if (TupleUtils.isTick(tuple)) {
+ LOG.debug("TICK! forcing a file system flush");
+ forceSync = true;
+ } else {
+ try {
+ writeTuple(tuple);
+ tupleBatch.add(tuple);
+ } catch (IOException e) {
+ //If the write failed, try to sync anything already
written
+ LOG.info("Tuple failed to write, forcing a flush of
existing data.");
+ this.collector.reportError(e);
+ forceSync = true;
+ this.collector.fail(tuple);
+ }
+ }
+
+ if (this.syncPolicy.mark(tuple, this.offset) || (forceSync &&
tupleBatch.size() > 0)) {
+ int attempts = 0;
+ boolean success = false;
+ IOException lastException = null;
+ // Make every attempt to sync the data we have. If it
can't be done then kill the bolt with
+ // a runtime exception. The filesystem is presumably in a
very bad state.
+ while (success == false && attempts < fileRetryCount) {
+ attempts += 1;
+ try {
+ syncTuples();
+ LOG.debug("Data synced to filesystem. Ack'ing [{}]
tuples", tupleBatch.size());
+ for (Tuple t : tupleBatch) {
+ this.collector.ack(t);
+ }
+ tupleBatch.clear();
+ syncPolicy.reset();
+ success = true;
+ } catch (IOException e) {
+ LOG.warn("Data could not be synced to filesystem
on attempt [" + attempts + "]");
+ this.collector.reportError(e);
+ lastException = e;
+ }
+ }
+
+ // If unsuccesful fail the pending tuples
+ if (success == false) {
+ LOG.warn("Data could not be synced to filesystem,
failing this batch of tuples");
+ for (Tuple t : tupleBatch) {
+ this.collector.fail(t);
+ }
+ tupleBatch.clear();
+
+ throw new RuntimeException("Sync failed [" + attempts
+ "] times.", lastException);
+ }
+ }
+
+ if(this.rotationPolicy.mark(tuple, this.offset)) {
+ try {
+ rotateOutputFile();
+ this.rotationPolicy.reset();
+ this.offset = 0;
+ } catch (IOException e) {
+ this.collector.reportError(e);
+ LOG.warn("File could not be rotated");
+ //At this point there is nothing to do. In all
likelihood any filesystem operations will fail.
+ //The next tuple will almost certainly fail to write
and/or sync, which force a rotation. That
+ //will give rotateAndReset() a chance to work which
includes creating a fresh file handle.
+ }
+ }
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = super.getComponentConfiguration();
+ if (conf == null)
+ conf = new Config();
+
+ if (tickTupleInterval > 0) {
+ LOG.info("Enabling tick tuple with interval [" +
tickTupleInterval + "]");
--- End diff --
Curly brace anchor for logging as above.
> SequenceFileBolt can end up in an unrecoverable state
> -----------------------------------------------------
>
> Key: STORM-1073
> URL: https://issues.apache.org/jira/browse/STORM-1073
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-hdfs
> Reporter: Aaron Dossett
> Assignee: Aaron Dossett
>
> SequenceFileBolt has the same issues that HdfsBolt has in STORM-969. This is
> also an opportunity to refactor AbstractHdfsBolt to most efficiently include
> these changes:
> Abstract HdfsBolt should define a concrete execute method and define abstract
> methods for:
> - writing a tuple
> - syncronizing file output
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)