[ 
https://issues.apache.org/jira/browse/METRON-1153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16159593#comment-16159593
 ] 

ASF GitHub Bot commented on METRON-1153:
----------------------------------------

Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/metron/pull/741#discussion_r137913296
  
    --- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
 ---
    @@ -64,19 +64,34 @@ public SourceHandler(List<RotationAction> 
rotationActions
         this.rotationPolicy = rotationPolicy;
         this.syncPolicy = syncPolicy;
         this.fileNameFormat = fileNameFormat;
    +    this.cleanupCallback = cleanupCallback;
         initialize();
       }
     
     
       protected void handle(JSONObject message, String sensor, 
WriterConfiguration config, SyncPolicyCreator syncPolicyCreator) throws 
IOException {
         byte[] bytes = (message.toJSONString() + "\n").getBytes();
         synchronized (this.writeLock) {
    -      out.write(bytes);
    +      try {
    +        out.write(bytes);
    +      } catch (IOException writeException) {
    +        LOG.warn("IOException while writing output", writeException);
    +        // If the stream is closed, attempt to rotate the file and try 
again, hoping it's transient
    +        if (writeException.getMessage().contains("Stream Closed")) {
    +          LOG.warn("Output Stream was closed. Attempting to rotate file 
and continue");
    +          rotateOutputFile();
    +          // If this write fails, the exception will be allowed to bubble 
up.
    +          out.write(bytes);
    --- End diff --
    
    If there is a non-recoverable error, you are in the same position as before.
    I was thinking of having a 'failed' flag ( similar to what hadoop has in 
it's screams where they track closed explicitly )


> HDFS HdfsWriter never recovers from exceptions
> ----------------------------------------------
>
>                 Key: METRON-1153
>                 URL: https://issues.apache.org/jira/browse/METRON-1153
>             Project: Metron
>          Issue Type: Bug
>            Reporter: Otto Fowler
>
> {code:java}
> o.a.m.w.BulkWriterComponent [ERROR] Failing 51 tuples
> java.io.IOException: Stream closed
>         at 
> org.apache.hadoop.crypto.CryptoOutputStream.checkStream(CryptoOutputStream.java:250)
>  ~[stormjar.jar:?]
>         at 
> org.apache.hadoop.crypto.CryptoOutputStream.write(CryptoOutputStream.java:133)
>  ~[stormjar.jar:?]
>         at 
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
>  ~[stormjar.jar:?]
>         at java.io.DataOutputStream.write(DataOutputStream.java:107) 
> ~[?:1.8.0_131]
>         at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
> ~[?:1.8.0_131]
>         at 
> org.apache.metron.writer.hdfs.SourceHandler.handle(SourceHandler.java:74) 
> ~[stormjar.jar:?]
>         at 
> org.apache.metron.writer.hdfs.HdfsWriter.write(HdfsWriter.java:113) 
> ~[stormjar.jar:?]
>         at 
> org.apache.metron.writer.BulkWriterComponent.flush(BulkWriterComponent.java:239)
>  [stormjar.jar:?]
>         at 
> org.apache.metron.writer.BulkWriterComponent.flushTimeouts(BulkWriterComponent.java:281)
>  [stormjar.jar:?]
>         at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:211)
>  [stormjar.jar:?]
>         at 
> org.apache.storm.daemon.executor$fn__6573$tuple_action_fn__6575.invoke(executor.clj:734)
>  [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
>         at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__6494.invoke(executor.clj:469)
>  [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
>         at 
> org.apache.storm.disruptor$clojure_handler$reify__6007.onEvent(disruptor.clj:40)
>  [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
>         at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
>  [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
>         at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
>  [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
>         at 
> org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
>  [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
>         at 
> org.apache.storm.daemon.executor$fn__6573$fn__6586$fn__6639.invoke(executor.clj:853)
>  [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
>         at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) 
> [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
> {code}
> The SourceHandler does not verify that the output stream it works with is 
> open before writing.  As a long running process, it should not assume that 
> the stream is always valid.
> This is hard however, because there is no great way to verify that the stream 
> is OK.
> Instead, the HdfsWriter would remove the source handler if there is an 
> IOException, but then the issue is how we do not couple tuples to messages, 
> which means that there will need to be refactoring from the bolt on down.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to