This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 23f085a [GOBBLIN-1379] Fix retries in DataWriter
23f085a is described below
commit 23f085a12e2bc628b1c25582f10c79d987a5f65a
Author: aprokofiev <[email protected]>
AuthorDate: Thu Feb 4 11:56:20 2021 -0800
[GOBBLIN-1379] Fix retries in DataWriter
When folder creation fails in
FileAwareInputStreamDataWriter with permission
error,
operation is retried. However, the original
permission error was not logged or
shown anywhere. Instead users saw a misleading
error about incorrect writer state.
Since errors can be different on each attempt,
we're now logging all of them.
In addition, writer logic is updated to handle
retries correctly when error
happens on the early stage.
Closes #3219 from aplex/bugfix/writer-retries
---
.../org/apache/gobblin/writer/RetryWriter.java | 30 +++++---
.../writer/FileAwareInputStreamDataWriter.java | 49 +++++++------
.../writer/FileAwareInputStreamDataWriterTest.java | 85 +++++++++++++++++-----
3 files changed, 114 insertions(+), 50 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
index 5fe838c..8e9b4be 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
@@ -16,14 +16,6 @@
*/
package org.apache.gobblin.writer;
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.codahale.metrics.Meter;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
@@ -34,7 +26,10 @@ import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
-
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.exception.NonTransientException;
@@ -43,6 +38,8 @@ import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.FinalState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Retry writer follows decorator pattern that retries on inner writer's
failure.
@@ -95,7 +92,6 @@ public class RetryWriter<D> extends
WatermarkAwareWriterWrapper<D> implements Da
@Override
public <V> void onRetry(Attempt<V> attempt) {
if (attempt.hasException()) {
- LOG.warn("Caught exception. This may be retried.",
attempt.getExceptionCause());
Instrumented.markMeter(retryMeter);
failedWrites++;
}
@@ -177,7 +173,19 @@ public class RetryWriter<D> extends
WatermarkAwareWriterWrapper<D> implements Da
return RetryerBuilder.<Void> newBuilder()
.retryIfException(transients)
.withWaitStrategy(WaitStrategies.exponentialWait(multiplier,
maxWaitMsPerInterval, TimeUnit.MILLISECONDS)) //1, 2, 4, 8, 16 seconds delay
- .withStopStrategy(StopStrategies.stopAfterAttempt(maxAttempts));
//Total 5 attempts and fail.
+ .withStopStrategy(StopStrategies.stopAfterAttempt(maxAttempts))
//Total 5 attempts and fail.
+ .withRetryListener(new RetryListener() {
+ @Override
+ public <V> void onRetry(Attempt<V> attempt) {
+ // We can get different exceptions on each attempt. The first one
can be meaningful, and follow up
+ // exceptions can come from incorrect state of the system, and
hide the real problem. Logging all of them
+ // to simplify troubleshooting
+ if (attempt.hasException() && attempt.getAttemptNumber() <
maxAttempts) {
+ LOG.warn("Caught exception. Operation will be retried. Attempt
#" + attempt.getAttemptNumber(),
+ attempt.getExceptionCause());
+ }
+ }
+ });
}
@Override
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 852c9b4..cba3b92 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -17,6 +17,11 @@
package org.apache.gobblin.data.management.copy.writer;
+import com.codahale.metrics.Meter;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterators;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -26,25 +31,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-
-import com.codahale.metrics.Meter;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Strings;
-import com.google.common.collect.Iterators;
-
import lombok.extern.slf4j.Slf4j;
-
import org.apache.gobblin.broker.EmptyKey;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.NotConfiguredException;
@@ -75,6 +62,15 @@ import org.apache.gobblin.util.io.StreamCopier;
import org.apache.gobblin.util.io.StreamThrottler;
import org.apache.gobblin.util.io.ThrottledInputStream;
import org.apache.gobblin.writer.DataWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
/**
@@ -118,6 +114,11 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
public FileAwareInputStreamDataWriter(State state, int numBranches, int
branchId, String writerAttemptId)
throws IOException {
+ this(state, null, numBranches, branchId, writerAttemptId);
+ }
+
+ public FileAwareInputStreamDataWriter(State state, FileSystem fileSystem,
int numBranches, int branchId, String writerAttemptId)
+ throws IOException {
super(state);
if (numBranches > 1) {
@@ -139,7 +140,11 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
Configuration conf = WriterUtils.getFsConfiguration(state);
URI uri = URI.create(uriStr);
- this.fs = FileSystem.get(uri, conf);
+ if (fileSystem != null) {
+ this.fs = fileSystem;
+ } else {
+ this.fs = FileSystem.get(uri, conf);
+ }
this.fileContext = FileContext.getFileContext(uri, conf);
/**
@@ -196,11 +201,13 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
}
Path stagingFile = getStagingFilePath(copyableFile);
if (this.actualProcessedCopyableFile.isPresent()) {
- throw new IOException(this.getClass().getCanonicalName() + " can only
process one file.");
+ throw new IOException(this.getClass().getCanonicalName() + " can only
process one file and cannot be reused.");
}
- this.actualProcessedCopyableFile = Optional.of(copyableFile);
+
this.fs.mkdirs(stagingFile.getParent());
writeImpl(fileAwareInputStream.getInputStream(), stagingFile,
copyableFile, fileAwareInputStream);
+
+ this.actualProcessedCopyableFile = Optional.of(copyableFile);
this.filesWritten.incrementAndGet();
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
index c710116..f9d7896 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
@@ -16,33 +16,21 @@
*/
package org.apache.gobblin.data.management.copy.writer;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.AccessDeniedException;
import java.util.List;
import java.util.Properties;
-
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.RandomStringUtils;
-import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.crypto.EncryptionConfigParser;
@@ -58,8 +46,23 @@ import
org.apache.gobblin.data.management.copy.FileAwareInputStream;
import org.apache.gobblin.data.management.copy.OwnerAndPermission;
import org.apache.gobblin.data.management.copy.PreserveAttributes;
import org.apache.gobblin.data.management.copy.TestCopyableDataset;
+import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
import org.apache.gobblin.util.TestUtils;
+import org.apache.gobblin.util.WriterUtils;
import org.apache.gobblin.util.io.StreamUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
public class FileAwareInputStreamDataWriterTest {
@@ -420,6 +423,52 @@ public class FileAwareInputStreamDataWriterTest {
new FsPermission("350"));
}
+ @Test
+ public void testRetryingAfterFailure() throws Exception {
+ String streamString1 = "testContents1";
+ FileStatus status = fs.getFileStatus(testTempPath);
+ OwnerAndPermission ownerAndPermission = new
OwnerAndPermission(status.getOwner(), status.getGroup(),
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+ CopyableFile cf =
CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+ CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new
TestCopyableDataset(new Path("/source")));
+ WorkUnitState state = TestUtils.createTestWorkUnitState();
+ state.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG, false);
+ state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath,
"staging").toString());
+ state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath,
"output").toString());
+ state.setProp(ConfigurationKeys.WRITER_FILE_PATH,
RandomStringUtils.randomAlphabetic(5));
+ CopySource.serializeCopyEntity(state, cf);
+ CopySource.serializeCopyableDataset(state, metadata);
+
+ FileSystem fileSystem = spy(FileSystem.get(URI.create("file:///"),
WriterUtils.getFsConfiguration(state)));
+
+ FileAwareInputStreamDataWriter dataWriter = new
FileAwareInputStreamDataWriter(state, fileSystem, 1, 0, null);
+ FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder()
+ .file(cf)
+
.inputStream(StreamUtils.convertStream(IOUtils.toInputStream(streamString1,
StandardCharsets.UTF_8)))
+ .build();
+
+ doThrow(new AccessDeniedException("Test")).when(fileSystem).mkdirs(any());
+
+ for (int i = 1; i <= 3; i++) {
+ try {
+ dataWriter.write(fileAwareInputStream);
+ Assert.fail("Expected method to throw AccessDeniedException on call #"
+ i);
+ } catch (AccessDeniedException e) {
+ // expected exception
+ }
+ }
+
+ doCallRealMethod().when(fileSystem).mkdirs(any());
+ dataWriter.write(fileAwareInputStream);
+
+ dataWriter.commit();
+ Path writtenFilePath = new Path(
+ new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
cf.getDatasetAndPartition(metadata).identifier()),
+ cf.getDestination());
+ Assert.assertEquals(IOUtils.toString(new
FileInputStream(writtenFilePath.toString()), StandardCharsets.UTF_8),
+ streamString1);
+ }
+
@AfterClass
public void cleanup() {
try {