This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4c1fd44 [GOBBLIN-727] Skip commit in CloseOnFlushWriterWrapper if a
commit has already been invoked on the underlying writer.[]
4c1fd44 is described below
commit 4c1fd444d05ead53d5df9999d2263035f623bdd3
Author: sv2000 <[email protected]>
AuthorDate: Mon Apr 8 17:44:03 2019 -0700
[GOBBLIN-727] Skip commit in CloseOnFlushWriterWrapper if a commit has
already been invoked on the underlying writer.[]
Closes #2594 from sv2000/closeOnFlushWriterWrapper
---
.../org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
index 5e912bb..bc6c9b9 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
@@ -57,6 +57,8 @@ public class CloseOnFlushWriterWrapper<D> extends
WriterWrapper<D> implements De
private DataWriter<D> writer;
private final Supplier<DataWriter<D>> writerSupplier;
private boolean closed;
+ private boolean committed;
+
// is the close functionality enabled?
private final boolean closeOnFlush;
private final ControlMessageHandler controlMessageHandler;
@@ -90,6 +92,7 @@ public class CloseOnFlushWriterWrapper<D> extends
WriterWrapper<D> implements De
if (this.closed) {
this.writer = writerSupplier.get();
this.closed = false;
+ this.committed = false;
}
this.writer.writeEnvelope(record);
}
@@ -104,13 +107,15 @@ public class CloseOnFlushWriterWrapper<D> extends
WriterWrapper<D> implements De
@Override
public void commit() throws IOException {
- writer.commit();
+ if (!this.committed) {
+ writer.commit();
+ this.committed = true;
+ }
}
@Override
public void cleanup() throws IOException {
writer.cleanup();
-
}
@Override