Github user ottobackwards commented on a diff in the pull request: https://github.com/apache/metron/pull/741#discussion_r137913296 --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java --- @@ -64,19 +64,34 @@ public SourceHandler(List<RotationAction> rotationActions this.rotationPolicy = rotationPolicy; this.syncPolicy = syncPolicy; this.fileNameFormat = fileNameFormat; + this.cleanupCallback = cleanupCallback; initialize(); } protected void handle(JSONObject message, String sensor, WriterConfiguration config, SyncPolicyCreator syncPolicyCreator) throws IOException { byte[] bytes = (message.toJSONString() + "\n").getBytes(); synchronized (this.writeLock) { - out.write(bytes); + try { + out.write(bytes); + } catch (IOException writeException) { + LOG.warn("IOException while writing output", writeException); + // If the stream is closed, attempt to rotate the file and try again, hoping it's transient + if (writeException.getMessage().contains("Stream Closed")) { + LOG.warn("Output Stream was closed. Attempting to rotate file and continue"); + rotateOutputFile(); + // If this write fails, the exception will be allowed to bubble up. + out.write(bytes); --- End diff -- If there is a non-recoverable error, you are in the same position as before. I was thinking of having a 'failed' flag ( similar to what hadoop has in it's screams where they track closed explicitly )
---