Repository: incubator-gobblin
Updated Branches:
  refs/heads/master d6e51f59f -> eca08356b


[GOBBLIN-206] Remove extra close of CloseOnFlushWriterWrapper

[GOBBLIN-206] Remove extra close of
CloseOnFlushWriterWrapper

Fix test.

Closes #2058 from htran1/close_fix_1


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/eca08356
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/eca08356
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/eca08356

Branch: refs/heads/master
Commit: eca08356b0a460627d0db90bc397858497e6e29d
Parents: d6e51f5
Author: Hung Tran <[email protected]>
Authored: Tue Aug 15 12:44:28 2017 -0700
Committer: Abhishek Tiwari <[email protected]>
Committed: Tue Aug 15 12:44:28 2017 -0700

----------------------------------------------------------------------
 .../writer/CloseOnFlushWriterWrapper.java       |  6 ++-
 .../writer/CloseOnFlushWriterWrapperTest.java   | 39 +++++++++++++++++---
 2 files changed, 37 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eca08356/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
index c244b2d..5b30cba 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
@@ -79,8 +79,10 @@ public class CloseOnFlushWriterWrapper<D> extends 
WriterWrapper<D> implements De
 
   @Override
   public void close() throws IOException {
-    writer.close();
-    this.closed = true;
+    if (!this.closed) {
+      writer.close();
+      this.closed = true;
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eca08356/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
index b14793a..6006572 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
@@ -47,7 +47,7 @@ public class CloseOnFlushWriterWrapperTest {
 
     Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
     Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
-    Assert.assertFalse(dummyWriters.get(0).closed);
+    Assert.assertEquals(dummyWriters.get(0).closeCount, 0);
     Assert.assertFalse(dummyWriters.get(0).committed);
     Assert.assertTrue(dummyWriters.get(0).handlerCalled);
   }
@@ -67,7 +67,7 @@ public class CloseOnFlushWriterWrapperTest {
 
     Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
     Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
-    Assert.assertTrue(dummyWriters.get(0).closed);
+    Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
     Assert.assertTrue(dummyWriters.get(0).committed);
     // handler from CloseOnFlushWriterWrapper should have been called instead
     Assert.assertFalse(dummyWriters.get(0).handlerCalled);
@@ -89,7 +89,7 @@ public class CloseOnFlushWriterWrapperTest {
     Assert.assertEquals(dummyWriters.size(), 1);
     Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
     Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
-    Assert.assertTrue(dummyWriters.get(0).closed);
+    Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
     Assert.assertTrue(dummyWriters.get(0).committed);
     Assert.assertFalse(dummyWriters.get(0).handlerCalled);
 
@@ -99,11 +99,38 @@ public class CloseOnFlushWriterWrapperTest {
     Assert.assertEquals(dummyWriters.size(), 2);
     Assert.assertEquals(dummyWriters.get(1).recordsWritten(), 1);
     Assert.assertEquals(dummyWriters.get(1).flushCount, 1);
-    Assert.assertTrue(dummyWriters.get(1).closed);
+    Assert.assertEquals(dummyWriters.get(1).closeCount, 1);
     Assert.assertTrue(dummyWriters.get(1).committed);
     Assert.assertFalse(dummyWriters.get(1).handlerCalled);
   }
 
+  @Test
+  public void testCloseAfterFlush()
+      throws IOException {
+    WorkUnitState state = new WorkUnitState();
+    state.getJobState().setProp(ConfigurationKeys.WRITER_CLOSE_ON_FLUSH_KEY, 
"true");
+    List<DummyWriter> dummyWriters = new ArrayList<>();
+    CloseOnFlushWriterWrapper<byte[]> writer = 
getCloseOnFlushWriter(dummyWriters, state);
+
+    byte[] record = new byte[]{'a', 'b', 'c', 'd'};
+
+    writer.writeEnvelope(new RecordEnvelope(record));
+    writer.getMessageHandler().handleMessage(new FlushControlMessage(new 
FlushControlMessage.FlushReason("flush")));
+
+    Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
+    Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
+    Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
+    Assert.assertTrue(dummyWriters.get(0).committed);
+    // handler from CloseOnFlushWriterWrapper should have been called instead
+    Assert.assertFalse(dummyWriters.get(0).handlerCalled);
+
+    writer.close();
+
+    // writer should not be closed multiple times
+    Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
+  }
+
+
   private CloseOnFlushWriterWrapper getCloseOnFlushWriter(List<DummyWriter> 
dummyWriters, WorkUnitState state) {
     return new CloseOnFlushWriterWrapper<>(new Supplier<DataWriter<byte[]>>() {
       @Override
@@ -119,8 +146,8 @@ public class CloseOnFlushWriterWrapperTest {
     private int recordsSeen = 0;
     private byte[] lastWrittenRecord;
     private int flushCount = 0;
+    private int closeCount = 0;
     private boolean committed = false;
-    private boolean closed = false;
     private boolean handlerCalled = false;
 
     DummyWriter() {
@@ -158,7 +185,7 @@ public class CloseOnFlushWriterWrapperTest {
     @Override
     public void close()
         throws IOException {
-      this.closed = true;
+      this.closeCount++;
     }
 
     @Override

Reply via email to