[ 
https://issues.apache.org/jira/browse/METRON-1153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Otto Fowler updated METRON-1153:
--------------------------------
    Description: 
{code:java}
o.a.m.w.BulkWriterComponent [ERROR] Failing 51 tuples
java.io.IOException: Stream closed
        at 
org.apache.hadoop.crypto.CryptoOutputStream.checkStream(CryptoOutputStream.java:250)
 ~[stormjar.jar:?]
        at 
org.apache.hadoop.crypto.CryptoOutputStream.write(CryptoOutputStream.java:133) 
~[stormjar.jar:?]
        at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
 ~[stormjar.jar:?]
        at java.io.DataOutputStream.write(DataOutputStream.java:107) 
~[?:1.8.0_131]
        at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
~[?:1.8.0_131]
        at 
org.apache.metron.writer.hdfs.SourceHandler.handle(SourceHandler.java:74) 
~[stormjar.jar:?]
        at org.apache.metron.writer.hdfs.HdfsWriter.write(HdfsWriter.java:113) 
~[stormjar.jar:?]
        at 
org.apache.metron.writer.BulkWriterComponent.flush(BulkWriterComponent.java:239)
 [stormjar.jar:?]
        at 
org.apache.metron.writer.BulkWriterComponent.flushTimeouts(BulkWriterComponent.java:281)
 [stormjar.jar:?]
        at 
org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:211)
 [stormjar.jar:?]
        at 
org.apache.storm.daemon.executor$fn__6573$tuple_action_fn__6575.invoke(executor.clj:734)
 [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
        at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__6494.invoke(executor.clj:469)
 [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
        at 
org.apache.storm.disruptor$clojure_handler$reify__6007.onEvent(disruptor.clj:40)
 [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
        at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
 [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
        at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
 [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
        at 
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
 [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
        at 
org.apache.storm.daemon.executor$fn__6573$fn__6586$fn__6639.invoke(executor.clj:853)
 [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
        at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) 
[storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
{code}

The SourceHandler does not verify that the output stream it works with is open 
before writing.  As a long running process, it should not assume that the 
stream is always valid.
This is hard however, because there is no great way to verify that the stream 
is OK.
Instead, the HdfsWriter would remove the source handler if there is an 
IOException, but then the issue is how we do not couple tuples to messages, 
which means that there will need to be refactoring from the bolt on down.

  was:

{code:java}
o.a.m.w.BulkWriterComponent [ERROR] Failing 51 tuples
java.io.IOException: Stream closed
        at 
org.apache.hadoop.crypto.CryptoOutputStream.checkStream(CryptoOutputStream.java:250)
 ~[stormjar.jar:?]
        at 
org.apache.hadoop.crypto.CryptoOutputStream.write(CryptoOutputStream.java:133) 
~[stormjar.jar:?]
        at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
 ~[stormjar.jar:?]
        at java.io.DataOutputStream.write(DataOutputStream.java:107) 
~[?:1.8.0_131]
        at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
~[?:1.8.0_131]
        at 
org.apache.metron.writer.hdfs.SourceHandler.handle(SourceHandler.java:74) 
~[stormjar.jar:?]
        at org.apache.metron.writer.hdfs.HdfsWriter.write(HdfsWriter.java:113) 
~[stormjar.jar:?]
        at 
org.apache.metron.writer.BulkWriterComponent.flush(BulkWriterComponent.java:239)
 [stormjar.jar:?]
        at 
org.apache.metron.writer.BulkWriterComponent.flushTimeouts(BulkWriterComponent.java:281)
 [stormjar.jar:?]
        at 
org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:211)
 [stormjar.jar:?]
        at 
org.apache.storm.daemon.executor$fn__6573$tuple_action_fn__6575.invoke(executor.clj:734)
 [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
        at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__6494.invoke(executor.clj:469)
 [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
        at 
org.apache.storm.disruptor$clojure_handler$reify__6007.onEvent(disruptor.clj:40)
 [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
        at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
 [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
        at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
 [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
        at 
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
 [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
        at 
org.apache.storm.daemon.executor$fn__6573$fn__6586$fn__6639.invoke(executor.clj:853)
 [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
        at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) 
[storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
{code}

The SourceHandler does not verify that the output stream it works with is open 
before writing.  As a long running process, it should not assume that the 
stream is always valid.

        Summary: HDFS HdfsWriter never recovers from exceptions  (was: HDFS 
SourceHandler should verify stream before writing)

> HDFS HdfsWriter never recovers from exceptions
> ----------------------------------------------
>
>                 Key: METRON-1153
>                 URL: https://issues.apache.org/jira/browse/METRON-1153
>             Project: Metron
>          Issue Type: Bug
>            Reporter: Otto Fowler
>
> {code:java}
> o.a.m.w.BulkWriterComponent [ERROR] Failing 51 tuples
> java.io.IOException: Stream closed
>         at 
> org.apache.hadoop.crypto.CryptoOutputStream.checkStream(CryptoOutputStream.java:250)
>  ~[stormjar.jar:?]
>         at 
> org.apache.hadoop.crypto.CryptoOutputStream.write(CryptoOutputStream.java:133)
>  ~[stormjar.jar:?]
>         at 
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
>  ~[stormjar.jar:?]
>         at java.io.DataOutputStream.write(DataOutputStream.java:107) 
> ~[?:1.8.0_131]
>         at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
> ~[?:1.8.0_131]
>         at 
> org.apache.metron.writer.hdfs.SourceHandler.handle(SourceHandler.java:74) 
> ~[stormjar.jar:?]
>         at 
> org.apache.metron.writer.hdfs.HdfsWriter.write(HdfsWriter.java:113) 
> ~[stormjar.jar:?]
>         at 
> org.apache.metron.writer.BulkWriterComponent.flush(BulkWriterComponent.java:239)
>  [stormjar.jar:?]
>         at 
> org.apache.metron.writer.BulkWriterComponent.flushTimeouts(BulkWriterComponent.java:281)
>  [stormjar.jar:?]
>         at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:211)
>  [stormjar.jar:?]
>         at 
> org.apache.storm.daemon.executor$fn__6573$tuple_action_fn__6575.invoke(executor.clj:734)
>  [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
>         at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__6494.invoke(executor.clj:469)
>  [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
>         at 
> org.apache.storm.disruptor$clojure_handler$reify__6007.onEvent(disruptor.clj:40)
>  [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
>         at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
>  [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
>         at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
>  [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
>         at 
> org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
>  [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
>         at 
> org.apache.storm.daemon.executor$fn__6573$fn__6586$fn__6639.invoke(executor.clj:853)
>  [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
>         at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) 
> [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40]
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
> {code}
> The SourceHandler does not verify that the output stream it works with is 
> open before writing.  As a long running process, it should not assume that 
> the stream is always valid.
> This is hard however, because there is no great way to verify that the stream 
> is OK.
> Instead, the HdfsWriter would remove the source handler if there is an 
> IOException, but then the issue is how we do not couple tuples to messages, 
> which means that there will need to be refactoring from the bolt on down.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to