This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d0701fb  partial rollback of PR 3464 (#3465)
d0701fb is described below

commit d0701fbbbd76e1ab4f5db8db35edc39171ddf9e3
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Fri Feb 11 08:15:13 2022 -0800

    partial rollback of PR 3464 (#3465)
---
 .../src/main/java/org/apache/gobblin/writer/RetryWriter.java       | 7 ++++++-
 .../src/test/java/org/apache/gobblin/writer/RetryWriterTest.java   | 4 +++-
 .../management/copy/writer/FileAwareInputStreamDataWriter.java     | 4 +++-
 3 files changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
index 73d0c8f..f484e3f 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
@@ -174,7 +174,12 @@ public class RetryWriter<D> extends 
WatermarkAwareWriterWrapper<D> implements Da
 
     long multiplier = state.getPropAsLong(RETRY_MULTIPLIER, 500L);
     long maxWaitMsPerInterval = 
state.getPropAsLong(RETRY_MAX_WAIT_MS_PER_INTERVAL, 10000);
-    int maxAttempts = state.getPropAsInt(RETRY_MAX_ATTEMPTS, 5);
+    // Setting retry attempts to 1 because Retrying is not possible for every 
kind of source and target record types
+    // e.g. 1) if the source and destination are InputStream and OutputStream 
respectively as in the case of
+    // FileAwareInputStreamDataWriter, we may need to reset the InputStream to 
the beginning, which, depending upon the
+    // implementation of InputStream is not always possible, 2) we need to 
reopen the InputStream which is closed in
+    // the finally block of writeImpl after the first attempt.
+    int maxAttempts = state.getPropAsInt(RETRY_MAX_ATTEMPTS, 1);
     return RetryerBuilder.<Void> newBuilder()
         .retryIfException(transients)
         .withWaitStrategy(WaitStrategies.exponentialWait(multiplier, 
maxWaitMsPerInterval, TimeUnit.MILLISECONDS)) //1, 2, 4, 8, 16 seconds delay
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/writer/RetryWriterTest.java 
b/gobblin-core/src/test/java/org/apache/gobblin/writer/RetryWriterTest.java
index 8d58070..79b1c22 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/RetryWriterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/RetryWriterTest.java
@@ -34,8 +34,10 @@ public class RetryWriterTest {
   public void retryTest() throws IOException {
     DataWriter<Void> writer = mock(DataWriter.class);
     doThrow(new 
RuntimeException()).when(writer).writeEnvelope(any(RecordEnvelope.class));
+    State state = new State();
+    state.setProp(RetryWriter.RETRY_MAX_ATTEMPTS, "5");
 
-    DataWriterWrapperBuilder<Void> builder = new 
DataWriterWrapperBuilder<>(writer, new State());
+    DataWriterWrapperBuilder<Void> builder = new 
DataWriterWrapperBuilder<>(writer, state);
     DataWriter<Void> retryWriter = builder.build();
     try {
       retryWriter.writeEnvelope(new RecordEnvelope<>(null));
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 1b1fec3..7e33ef7 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -85,7 +85,9 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
 
   public static final String GOBBLIN_COPY_BYTES_COPIED_METER = 
"gobblin.copy.bytesCopiedMeter";
   public static final String GOBBLIN_COPY_CHECK_FILESIZE = 
"gobblin.copy.checkFileSize";
-  public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = true;
+  // setting GOBBLIN_COPY_CHECK_FILESIZE to true may result in failures 
because the calculation of
+  // expected bytes to be copied and actual bytes copied may have bugs
+  public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
   public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = 
"gobblin.copy.task.overwrite.on.commit";
   public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = 
false;
 

Reply via email to