Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/metron/pull/741#discussion_r137913296
  
    --- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
 ---
    @@ -64,19 +64,34 @@ public SourceHandler(List<RotationAction> 
rotationActions
         this.rotationPolicy = rotationPolicy;
         this.syncPolicy = syncPolicy;
         this.fileNameFormat = fileNameFormat;
    +    this.cleanupCallback = cleanupCallback;
         initialize();
       }
     
     
       protected void handle(JSONObject message, String sensor, 
WriterConfiguration config, SyncPolicyCreator syncPolicyCreator) throws 
IOException {
         byte[] bytes = (message.toJSONString() + "\n").getBytes();
         synchronized (this.writeLock) {
    -      out.write(bytes);
    +      try {
    +        out.write(bytes);
    +      } catch (IOException writeException) {
    +        LOG.warn("IOException while writing output", writeException);
    +        // If the stream is closed, attempt to rotate the file and try 
again, hoping it's transient
    +        if (writeException.getMessage().contains("Stream Closed")) {
    +          LOG.warn("Output Stream was closed. Attempting to rotate file 
and continue");
    +          rotateOutputFile();
    +          // If this write fails, the exception will be allowed to bubble 
up.
    +          out.write(bytes);
    --- End diff --
    
    If there is a non-recoverable error, you are in the same position as before.
    I was thinking of having a 'failed' flag ( similar to what hadoop has in 
it's screams where they track closed explicitly )


---

Reply via email to