METRON-1153 HDFS HdfsWriter never recovers from exceptions closes 
apache/incubator-metron#741


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/b0c6e68c
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/b0c6e68c
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/b0c6e68c

Branch: refs/heads/Metron_0.4.1
Commit: b0c6e68ce5a3e9caa12711cf76aed1776d3745db
Parents: ccb5a0b
Author: justinleet <[email protected]>
Authored: Fri Sep 8 20:30:02 2017 -0400
Committer: cstella <[email protected]>
Committed: Fri Sep 8 20:30:02 2017 -0400

----------------------------------------------------------------------
 .../metron/writer/hdfs/SourceHandler.java       | 21 ++++-
 .../metron/writer/hdfs/HdfsWriterTest.java      | 91 ++++++++++++++++++--
 2 files changed, 104 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/b0c6e68c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
index fbb5561..b841249 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
@@ -64,6 +64,7 @@ public class SourceHandler {
     this.rotationPolicy = rotationPolicy;
     this.syncPolicy = syncPolicy;
     this.fileNameFormat = fileNameFormat;
+    this.cleanupCallback = cleanupCallback;
     initialize();
   }
 
@@ -71,12 +72,26 @@ public class SourceHandler {
   protected void handle(JSONObject message, String sensor, WriterConfiguration 
config, SyncPolicyCreator syncPolicyCreator) throws IOException {
     byte[] bytes = (message.toJSONString() + "\n").getBytes();
     synchronized (this.writeLock) {
-      out.write(bytes);
+      try {
+        out.write(bytes);
+      } catch (IOException writeException) {
+        LOG.warn("IOException while writing output", writeException);
+        // If the stream is closed, attempt to rotate the file and try again, 
hoping it's transient
+        if (writeException.getMessage().contains("Stream Closed")) {
+          LOG.warn("Output Stream was closed. Attempting to rotate file and 
continue");
+          rotateOutputFile();
+          // If this write fails, the exception will be allowed to bubble up.
+          out.write(bytes);
+        } else {
+          throw writeException;
+        }
+      }
       this.offset += bytes.length;
 
       if (this.syncPolicy.mark(null, this.offset)) {
         if (this.out instanceof HdfsDataOutputStream) {
-          ((HdfsDataOutputStream) 
this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+          ((HdfsDataOutputStream) this.out)
+              .hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
         } else {
           this.out.hsync();
         }
@@ -146,7 +161,7 @@ public class SourceHandler {
     return path;
   }
 
-  private void closeOutputFile() throws IOException {
+  protected void closeOutputFile() throws IOException {
     this.out.close();
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/b0c6e68c/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
index 0118a15..832f8bf 100644
--- 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
+++ 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
@@ -18,11 +18,21 @@
 
 package org.apache.metron.writer.hdfs;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.metron.common.configuration.IndexingConfigurations;
 import 
org.apache.metron.common.configuration.writer.IndexingWriterConfiguration;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
@@ -32,11 +42,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.*;
-
 // Suppress ConstantConditions to avoid NPE warnings that only would occur on 
test failure anyway
 @SuppressWarnings("ConstantConditions")
 public class HdfsWriterTest {
@@ -410,6 +415,82 @@ public class HdfsWriterTest {
     }
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testSingleFileIfNoStreamClosed() throws Exception {
+    String function = "FORMAT('test-%s/%s', test.key, test.key)";
+    WriterConfiguration config = buildWriterConfiguration(function);
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
+    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
+
+    JSONObject message = new JSONObject();
+    message.put("test.key", "test.value");
+    ArrayList<JSONObject> messages = new ArrayList<>();
+    messages.add(message);
+    ArrayList<Tuple> tuples = new ArrayList<>();
+
+    CountSyncPolicy basePolicy = new CountSyncPolicy(5);
+    ClonedSyncPolicyCreator creator = new ClonedSyncPolicyCreator(basePolicy);
+
+    writer.write(SENSOR_NAME, config, tuples, messages);
+    writer.write(SENSOR_NAME, config, tuples, messages);
+    writer.close();
+
+    File outputFolder = new File(folder.getAbsolutePath() + 
"/test-test.value/test.value/");
+
+    // The message should show up twice, once in each file
+    ArrayList<String> expected = new ArrayList<>();
+    expected.add(message.toJSONString());
+    expected.add(message.toJSONString());
+
+    // Assert both messages are in the same file, because the stream stayed 
open
+    Assert.assertEquals(1, outputFolder.listFiles().length);
+    for (File file : outputFolder.listFiles()) {
+      List<String> lines = Files.readAllLines(file.toPath());
+      // One line per file
+      Assert.assertEquals(2, lines.size());
+      Assert.assertEquals(expected, lines);
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testHandleAttemptsRotateIfStreamClosed() throws Exception {
+    String function = "FORMAT('test-%s/%s', test.key, test.key)";
+    WriterConfiguration config = buildWriterConfiguration(function);
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
+    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
+
+    JSONObject message = new JSONObject();
+    message.put("test.key", "test.value");
+    ArrayList<JSONObject> messages = new ArrayList<>();
+    messages.add(message);
+    ArrayList<Tuple> tuples = new ArrayList<>();
+
+    CountSyncPolicy basePolicy = new CountSyncPolicy(5);
+    ClonedSyncPolicyCreator creator = new ClonedSyncPolicyCreator(basePolicy);
+
+    writer.write(SENSOR_NAME, config, tuples, messages);
+    writer.getSourceHandler(SENSOR_NAME, "test-test.value/test.value", 
config).closeOutputFile();
+    writer.getSourceHandler(SENSOR_NAME, "test-test.value/test.value", 
config).handle(message, SENSOR_NAME, config, creator);
+    writer.close();
+
+    File outputFolder = new File(folder.getAbsolutePath() + 
"/test-test.value/test.value/");
+
+    // The message should show up twice, once in each file
+    ArrayList<String> expected = new ArrayList<>();
+    expected.add(message.toJSONString());
+
+    // Assert this went into a new file because it actually rotated
+    Assert.assertEquals(2, outputFolder.listFiles().length);
+    for (File file : outputFolder.listFiles()) {
+      List<String> lines = Files.readAllLines(file.toPath());
+      // One line per file
+      Assert.assertEquals(1, lines.size());
+      Assert.assertEquals(expected, lines);
+    }
+  }
+
   protected WriterConfiguration buildWriterConfiguration(String function) {
     IndexingConfigurations indexingConfig = new IndexingConfigurations();
     Map<String, Object> sensorIndexingConfig = new HashMap<>();

Reply via email to