Repository: metron Updated Branches: refs/heads/master ccb5a0bdd -> b0c6e68ce
METRON-1153 HDFS HdfsWriter never recovers from exceptions closes apache/incubator-metron#741 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/b0c6e68c Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/b0c6e68c Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/b0c6e68c Branch: refs/heads/master Commit: b0c6e68ce5a3e9caa12711cf76aed1776d3745db Parents: ccb5a0b Author: justinleet <[email protected]> Authored: Fri Sep 8 20:30:02 2017 -0400 Committer: cstella <[email protected]> Committed: Fri Sep 8 20:30:02 2017 -0400 ---------------------------------------------------------------------- .../metron/writer/hdfs/SourceHandler.java | 21 ++++- .../metron/writer/hdfs/HdfsWriterTest.java | 91 ++++++++++++++++++-- 2 files changed, 104 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/b0c6e68c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java index fbb5561..b841249 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java @@ -64,6 +64,7 @@ public class SourceHandler { this.rotationPolicy = rotationPolicy; this.syncPolicy = syncPolicy; this.fileNameFormat = fileNameFormat; + this.cleanupCallback = cleanupCallback; initialize(); } @@ -71,12 +72,26 @@ public class SourceHandler { protected void handle(JSONObject message, String sensor, WriterConfiguration config, SyncPolicyCreator syncPolicyCreator) throws IOException { byte[] bytes = (message.toJSONString() + "\n").getBytes(); synchronized (this.writeLock) { - out.write(bytes); + try { + out.write(bytes); + } catch (IOException writeException) { + LOG.warn("IOException while writing output", writeException); + // If the stream is closed, attempt to rotate the file and try again, hoping it's transient + if (writeException.getMessage().contains("Stream Closed")) { + LOG.warn("Output Stream was closed. Attempting to rotate file and continue"); + rotateOutputFile(); + // If this write fails, the exception will be allowed to bubble up. + out.write(bytes); + } else { + throw writeException; + } + } this.offset += bytes.length; if (this.syncPolicy.mark(null, this.offset)) { if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); + ((HdfsDataOutputStream) this.out) + .hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); } else { this.out.hsync(); } @@ -146,7 +161,7 @@ public class SourceHandler { return path; } - private void closeOutputFile() throws IOException { + protected void closeOutputFile() throws IOException { this.out.close(); } http://git-wip-us.apache.org/repos/asf/metron/blob/b0c6e68c/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java index 0118a15..832f8bf 100644 --- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java +++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java @@ -18,11 +18,21 @@ package org.apache.metron.writer.hdfs; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.metron.common.configuration.IndexingConfigurations; import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; import org.apache.storm.hdfs.bolt.format.FileNameFormat; +import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; +import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; @@ -32,11 +42,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.util.*; - // Suppress ConstantConditions to avoid NPE warnings that only would occur on test failure anyway @SuppressWarnings("ConstantConditions") public class HdfsWriterTest { @@ -410,6 +415,82 @@ public class HdfsWriterTest { } } + @Test + @SuppressWarnings("unchecked") + public void testSingleFileIfNoStreamClosed() throws Exception { + String function = "FORMAT('test-%s/%s', test.key, test.key)"; + WriterConfiguration config = buildWriterConfiguration(function); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); + + JSONObject message = new JSONObject(); + message.put("test.key", "test.value"); + ArrayList<JSONObject> messages = new ArrayList<>(); + messages.add(message); + ArrayList<Tuple> tuples = new ArrayList<>(); + + CountSyncPolicy basePolicy = new CountSyncPolicy(5); + ClonedSyncPolicyCreator creator = new ClonedSyncPolicyCreator(basePolicy); + + writer.write(SENSOR_NAME, config, tuples, messages); + writer.write(SENSOR_NAME, config, tuples, messages); + writer.close(); + + File outputFolder = new File(folder.getAbsolutePath() + "/test-test.value/test.value/"); + + // The message should show up twice, once in each file + ArrayList<String> expected = new ArrayList<>(); + expected.add(message.toJSONString()); + expected.add(message.toJSONString()); + + // Assert both messages are in the same file, because the stream stayed open + Assert.assertEquals(1, outputFolder.listFiles().length); + for (File file : outputFolder.listFiles()) { + List<String> lines = Files.readAllLines(file.toPath()); + // One line per file + Assert.assertEquals(2, lines.size()); + Assert.assertEquals(expected, lines); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testHandleAttemptsRotateIfStreamClosed() throws Exception { + String function = "FORMAT('test-%s/%s', test.key, test.key)"; + WriterConfiguration config = buildWriterConfiguration(function); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); + + JSONObject message = new JSONObject(); + message.put("test.key", "test.value"); + ArrayList<JSONObject> messages = new ArrayList<>(); + messages.add(message); + ArrayList<Tuple> tuples = new ArrayList<>(); + + CountSyncPolicy basePolicy = new CountSyncPolicy(5); + ClonedSyncPolicyCreator creator = new ClonedSyncPolicyCreator(basePolicy); + + writer.write(SENSOR_NAME, config, tuples, messages); + writer.getSourceHandler(SENSOR_NAME, "test-test.value/test.value", config).closeOutputFile(); + writer.getSourceHandler(SENSOR_NAME, "test-test.value/test.value", config).handle(message, SENSOR_NAME, config, creator); + writer.close(); + + File outputFolder = new File(folder.getAbsolutePath() + "/test-test.value/test.value/"); + + // The message should show up twice, once in each file + ArrayList<String> expected = new ArrayList<>(); + expected.add(message.toJSONString()); + + // Assert this went into a new file because it actually rotated + Assert.assertEquals(2, outputFolder.listFiles().length); + for (File file : outputFolder.listFiles()) { + List<String> lines = Files.readAllLines(file.toPath()); + // One line per file + Assert.assertEquals(1, lines.size()); + Assert.assertEquals(expected, lines); + } + } + protected WriterConfiguration buildWriterConfiguration(String function) { IndexingConfigurations indexingConfig = new IndexingConfigurations(); Map<String, Object> sensorIndexingConfig = new HashMap<>();
