Repository: incubator-gobblin Updated Branches: refs/heads/master d6e51f59f -> eca08356b
[GOBBLIN-206] Remove extra close of CloseOnFlushWriterWrapper [GOBBLIN-206] Remove extra close of CloseOnFlushWriterWrapper Fix test. Closes #2058 from htran1/close_fix_1 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/eca08356 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/eca08356 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/eca08356 Branch: refs/heads/master Commit: eca08356b0a460627d0db90bc397858497e6e29d Parents: d6e51f5 Author: Hung Tran <[email protected]> Authored: Tue Aug 15 12:44:28 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Tue Aug 15 12:44:28 2017 -0700 ---------------------------------------------------------------------- .../writer/CloseOnFlushWriterWrapper.java | 6 ++- .../writer/CloseOnFlushWriterWrapperTest.java | 39 +++++++++++++++++--- 2 files changed, 37 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eca08356/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java index c244b2d..5b30cba 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java @@ -79,8 +79,10 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De @Override public void close() throws IOException { - writer.close(); - this.closed = true; + if (!this.closed) { + writer.close(); + this.closed = true; + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eca08356/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java index b14793a..6006572 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java @@ -47,7 +47,7 @@ public class CloseOnFlushWriterWrapperTest { Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1); Assert.assertEquals(dummyWriters.get(0).flushCount, 1); - Assert.assertFalse(dummyWriters.get(0).closed); + Assert.assertEquals(dummyWriters.get(0).closeCount, 0); Assert.assertFalse(dummyWriters.get(0).committed); Assert.assertTrue(dummyWriters.get(0).handlerCalled); } @@ -67,7 +67,7 @@ public class CloseOnFlushWriterWrapperTest { Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1); Assert.assertEquals(dummyWriters.get(0).flushCount, 1); - Assert.assertTrue(dummyWriters.get(0).closed); + Assert.assertEquals(dummyWriters.get(0).closeCount, 1); Assert.assertTrue(dummyWriters.get(0).committed); // handler from CloseOnFlushWriterWrapper should have been called instead Assert.assertFalse(dummyWriters.get(0).handlerCalled); @@ -89,7 +89,7 @@ public class CloseOnFlushWriterWrapperTest { Assert.assertEquals(dummyWriters.size(), 1); Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1); Assert.assertEquals(dummyWriters.get(0).flushCount, 1); - Assert.assertTrue(dummyWriters.get(0).closed); + Assert.assertEquals(dummyWriters.get(0).closeCount, 1); Assert.assertTrue(dummyWriters.get(0).committed); Assert.assertFalse(dummyWriters.get(0).handlerCalled); @@ -99,11 +99,38 @@ public class CloseOnFlushWriterWrapperTest { Assert.assertEquals(dummyWriters.size(), 2); Assert.assertEquals(dummyWriters.get(1).recordsWritten(), 1); Assert.assertEquals(dummyWriters.get(1).flushCount, 1); - Assert.assertTrue(dummyWriters.get(1).closed); + Assert.assertEquals(dummyWriters.get(1).closeCount, 1); Assert.assertTrue(dummyWriters.get(1).committed); Assert.assertFalse(dummyWriters.get(1).handlerCalled); } + @Test + public void testCloseAfterFlush() + throws IOException { + WorkUnitState state = new WorkUnitState(); + state.getJobState().setProp(ConfigurationKeys.WRITER_CLOSE_ON_FLUSH_KEY, "true"); + List<DummyWriter> dummyWriters = new ArrayList<>(); + CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters, state); + + byte[] record = new byte[]{'a', 'b', 'c', 'd'}; + + writer.writeEnvelope(new RecordEnvelope(record)); + writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush"))); + + Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1); + Assert.assertEquals(dummyWriters.get(0).flushCount, 1); + Assert.assertEquals(dummyWriters.get(0).closeCount, 1); + Assert.assertTrue(dummyWriters.get(0).committed); + // handler from CloseOnFlushWriterWrapper should have been called instead + Assert.assertFalse(dummyWriters.get(0).handlerCalled); + + writer.close(); + + // writer should not be closed multiple times + Assert.assertEquals(dummyWriters.get(0).closeCount, 1); + } + + private CloseOnFlushWriterWrapper getCloseOnFlushWriter(List<DummyWriter> dummyWriters, WorkUnitState state) { return new CloseOnFlushWriterWrapper<>(new Supplier<DataWriter<byte[]>>() { @Override @@ -119,8 +146,8 @@ public class CloseOnFlushWriterWrapperTest { private int recordsSeen = 0; private byte[] lastWrittenRecord; private int flushCount = 0; + private int closeCount = 0; private boolean committed = false; - private boolean closed = false; private boolean handlerCalled = false; DummyWriter() { @@ -158,7 +185,7 @@ public class CloseOnFlushWriterWrapperTest { @Override public void close() throws IOException { - this.closed = true; + this.closeCount++; } @Override
