This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d0701fb partial rollback of PR 3464 (#3465)
d0701fb is described below
commit d0701fbbbd76e1ab4f5db8db35edc39171ddf9e3
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Fri Feb 11 08:15:13 2022 -0800
partial rollback of PR 3464 (#3465)
---
.../src/main/java/org/apache/gobblin/writer/RetryWriter.java | 7 ++++++-
.../src/test/java/org/apache/gobblin/writer/RetryWriterTest.java | 4 +++-
.../management/copy/writer/FileAwareInputStreamDataWriter.java | 4 +++-
3 files changed, 12 insertions(+), 3 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
index 73d0c8f..f484e3f 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
@@ -174,7 +174,12 @@ public class RetryWriter<D> extends
WatermarkAwareWriterWrapper<D> implements Da
long multiplier = state.getPropAsLong(RETRY_MULTIPLIER, 500L);
long maxWaitMsPerInterval =
state.getPropAsLong(RETRY_MAX_WAIT_MS_PER_INTERVAL, 10000);
- int maxAttempts = state.getPropAsInt(RETRY_MAX_ATTEMPTS, 5);
+ // Setting retry attempts to 1 because Retrying is not possible for every
kind of source and target record types
+ // e.g. 1) if the source and destination are InputStream and OutputStream
respectively as in the case of
+ // FileAwareInputStreamDataWriter, we may need to reset the InputStream to
the beginning, which, depending upon the
+ // implementation of InputStream is not always possible, 2) we need to
reopen the InputStream which is closed in
+ // the finally block of writeImpl after the first attempt.
+ int maxAttempts = state.getPropAsInt(RETRY_MAX_ATTEMPTS, 1);
return RetryerBuilder.<Void> newBuilder()
.retryIfException(transients)
.withWaitStrategy(WaitStrategies.exponentialWait(multiplier,
maxWaitMsPerInterval, TimeUnit.MILLISECONDS)) //1, 2, 4, 8, 16 seconds delay
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/writer/RetryWriterTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/writer/RetryWriterTest.java
index 8d58070..79b1c22 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/RetryWriterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/RetryWriterTest.java
@@ -34,8 +34,10 @@ public class RetryWriterTest {
public void retryTest() throws IOException {
DataWriter<Void> writer = mock(DataWriter.class);
doThrow(new
RuntimeException()).when(writer).writeEnvelope(any(RecordEnvelope.class));
+ State state = new State();
+ state.setProp(RetryWriter.RETRY_MAX_ATTEMPTS, "5");
- DataWriterWrapperBuilder<Void> builder = new
DataWriterWrapperBuilder<>(writer, new State());
+ DataWriterWrapperBuilder<Void> builder = new
DataWriterWrapperBuilder<>(writer, state);
DataWriter<Void> retryWriter = builder.build();
try {
retryWriter.writeEnvelope(new RecordEnvelope<>(null));
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 1b1fec3..7e33ef7 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -85,7 +85,9 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
public static final String GOBBLIN_COPY_BYTES_COPIED_METER =
"gobblin.copy.bytesCopiedMeter";
public static final String GOBBLIN_COPY_CHECK_FILESIZE =
"gobblin.copy.checkFileSize";
- public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = true;
+ // setting GOBBLIN_COPY_CHECK_FILESIZE to true may result in failures
because the calculation of
+ // expected bytes to be copied and actual bytes copied may have bugs
+ public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT =
"gobblin.copy.task.overwrite.on.commit";
public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT =
false;