This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 23f085a  [GOBBLIN-1379] Fix retries in DataWriter
23f085a is described below

commit 23f085a12e2bc628b1c25582f10c79d987a5f65a
Author: aprokofiev <[email protected]>
AuthorDate: Thu Feb 4 11:56:20 2021 -0800

    [GOBBLIN-1379] Fix retries in DataWriter
    
    When folder creation fails in
    FileAwareInputStreamDataWriter with permission
    error,
    operation is retried. However, the original
    permission error was not logged or
    shown anywhere. Instead users saw a misleading
    error about incorrect writer state.
    
    Since errors can be different on each attempt,
    we're now logging all of them.
    
    In addition, writer logic is updated to handle
    retries correctly when error
    happens on the early stage.
    
    Closes #3219 from aplex/bugfix/writer-retries
---
 .../org/apache/gobblin/writer/RetryWriter.java     | 30 +++++---
 .../writer/FileAwareInputStreamDataWriter.java     | 49 +++++++------
 .../writer/FileAwareInputStreamDataWriterTest.java | 85 +++++++++++++++++-----
 3 files changed, 114 insertions(+), 50 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
index 5fe838c..8e9b4be 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
@@ -16,14 +16,6 @@
  */
 package org.apache.gobblin.writer;
 
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.codahale.metrics.Meter;
 import com.github.rholder.retry.Attempt;
 import com.github.rholder.retry.RetryException;
@@ -34,7 +26,10 @@ import com.github.rholder.retry.StopStrategies;
 import com.github.rholder.retry.WaitStrategies;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
-
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.exception.NonTransientException;
@@ -43,6 +38,8 @@ import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.records.ControlMessageHandler;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.util.FinalState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Retry writer follows decorator pattern that retries on inner writer's 
failure.
@@ -95,7 +92,6 @@ public class RetryWriter<D> extends 
WatermarkAwareWriterWrapper<D> implements Da
         @Override
         public <V> void onRetry(Attempt<V> attempt) {
           if (attempt.hasException()) {
-            LOG.warn("Caught exception. This may be retried.", 
attempt.getExceptionCause());
             Instrumented.markMeter(retryMeter);
             failedWrites++;
           }
@@ -177,7 +173,19 @@ public class RetryWriter<D> extends 
WatermarkAwareWriterWrapper<D> implements Da
     return RetryerBuilder.<Void> newBuilder()
         .retryIfException(transients)
         .withWaitStrategy(WaitStrategies.exponentialWait(multiplier, 
maxWaitMsPerInterval, TimeUnit.MILLISECONDS)) //1, 2, 4, 8, 16 seconds delay
-        .withStopStrategy(StopStrategies.stopAfterAttempt(maxAttempts)); 
//Total 5 attempts and fail.
+        .withStopStrategy(StopStrategies.stopAfterAttempt(maxAttempts)) 
//Total 5 attempts and fail.
+        .withRetryListener(new RetryListener() {
+          @Override
+          public <V> void onRetry(Attempt<V> attempt) {
+            // We can get different exceptions on each attempt. The first one 
can be meaningful, and follow up
+            // exceptions can come from incorrect state of the system, and 
hide the real problem. Logging all of them
+            // to simplify troubleshooting
+            if (attempt.hasException() && attempt.getAttemptNumber() < 
maxAttempts) {
+              LOG.warn("Caught exception. Operation will be retried. Attempt 
#" + attempt.getAttemptNumber(),
+                  attempt.getExceptionCause());
+            }
+          }
+        });
   }
 
   @Override
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 852c9b4..cba3b92 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -17,6 +17,11 @@
 
 package org.apache.gobblin.data.management.copy.writer;
 
+import com.codahale.metrics.Meter;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterators;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -26,25 +31,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-
-import com.codahale.metrics.Meter;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Strings;
-import com.google.common.collect.Iterators;
-
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.gobblin.broker.EmptyKey;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.iface.NotConfiguredException;
@@ -75,6 +62,15 @@ import org.apache.gobblin.util.io.StreamCopier;
 import org.apache.gobblin.util.io.StreamThrottler;
 import org.apache.gobblin.util.io.ThrottledInputStream;
 import org.apache.gobblin.writer.DataWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 
 
 /**
@@ -118,6 +114,11 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
 
   public FileAwareInputStreamDataWriter(State state, int numBranches, int 
branchId, String writerAttemptId)
       throws IOException {
+    this(state, null, numBranches, branchId, writerAttemptId);
+  }
+
+  public FileAwareInputStreamDataWriter(State state, FileSystem fileSystem, 
int numBranches, int branchId, String writerAttemptId)
+      throws IOException {
     super(state);
 
     if (numBranches > 1) {
@@ -139,7 +140,11 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
 
     Configuration conf = WriterUtils.getFsConfiguration(state);
     URI uri = URI.create(uriStr);
-    this.fs = FileSystem.get(uri, conf);
+    if (fileSystem != null) {
+      this.fs = fileSystem;
+    } else {
+      this.fs = FileSystem.get(uri, conf);
+    }
     this.fileContext = FileContext.getFileContext(uri, conf);
 
     /**
@@ -196,11 +201,13 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
     }
     Path stagingFile = getStagingFilePath(copyableFile);
     if (this.actualProcessedCopyableFile.isPresent()) {
-      throw new IOException(this.getClass().getCanonicalName() + " can only 
process one file.");
+      throw new IOException(this.getClass().getCanonicalName() + " can only 
process one file and cannot be reused.");
     }
-    this.actualProcessedCopyableFile = Optional.of(copyableFile);
+
     this.fs.mkdirs(stagingFile.getParent());
     writeImpl(fileAwareInputStream.getInputStream(), stagingFile, 
copyableFile, fileAwareInputStream);
+
+    this.actualProcessedCopyableFile = Optional.of(copyableFile);
     this.filesWritten.incrementAndGet();
   }
 
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
index c710116..f9d7896 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
@@ -16,33 +16,21 @@
  */
 package org.apache.gobblin.data.management.copy.writer;
 
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.AccessDeniedException;
 import java.util.List;
 import java.util.Properties;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.RandomStringUtils;
-import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.crypto.EncryptionConfigParser;
@@ -58,8 +46,23 @@ import 
org.apache.gobblin.data.management.copy.FileAwareInputStream;
 import org.apache.gobblin.data.management.copy.OwnerAndPermission;
 import org.apache.gobblin.data.management.copy.PreserveAttributes;
 import org.apache.gobblin.data.management.copy.TestCopyableDataset;
+import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
 import org.apache.gobblin.util.TestUtils;
+import org.apache.gobblin.util.WriterUtils;
 import org.apache.gobblin.util.io.StreamUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
 
 
 public class FileAwareInputStreamDataWriterTest {
@@ -420,6 +423,52 @@ public class FileAwareInputStreamDataWriterTest {
         new FsPermission("350"));
   }
 
+  @Test
+  public void testRetryingAfterFailure() throws Exception {
+    String streamString1 = "testContents1";
+    FileStatus status = fs.getFileStatus(testTempPath);
+    OwnerAndPermission ownerAndPermission = new 
OwnerAndPermission(status.getOwner(), status.getGroup(),
+        new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+    CopyableFile cf = 
CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+    CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new 
TestCopyableDataset(new Path("/source")));
+    WorkUnitState state = TestUtils.createTestWorkUnitState();
+    state.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG, false);
+    state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, 
"staging").toString());
+    state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, 
"output").toString());
+    state.setProp(ConfigurationKeys.WRITER_FILE_PATH, 
RandomStringUtils.randomAlphabetic(5));
+    CopySource.serializeCopyEntity(state, cf);
+    CopySource.serializeCopyableDataset(state, metadata);
+
+    FileSystem fileSystem = spy(FileSystem.get(URI.create("file:///"), 
WriterUtils.getFsConfiguration(state)));
+
+    FileAwareInputStreamDataWriter dataWriter = new 
FileAwareInputStreamDataWriter(state, fileSystem, 1, 0, null);
+    FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder()
+        .file(cf)
+        
.inputStream(StreamUtils.convertStream(IOUtils.toInputStream(streamString1, 
StandardCharsets.UTF_8)))
+        .build();
+
+    doThrow(new AccessDeniedException("Test")).when(fileSystem).mkdirs(any());
+
+    for (int i = 1; i <= 3; i++) {
+      try {
+        dataWriter.write(fileAwareInputStream);
+        Assert.fail("Expected method to throw AccessDeniedException on call #" 
+ i);
+      } catch (AccessDeniedException e) {
+        // expected exception
+      }
+    }
+
+    doCallRealMethod().when(fileSystem).mkdirs(any());
+    dataWriter.write(fileAwareInputStream);
+
+    dataWriter.commit();
+    Path writtenFilePath = new Path(
+        new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), 
cf.getDatasetAndPartition(metadata).identifier()),
+        cf.getDestination());
+    Assert.assertEquals(IOUtils.toString(new 
FileInputStream(writtenFilePath.toString()), StandardCharsets.UTF_8),
+        streamString1);
+  }
+
   @AfterClass
   public void cleanup() {
     try {

Reply via email to