Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 71184cdb1 -> 83dd2c572


[GOBBLIN-582] Allow file overwrite when copying from task staging to task 
output for FileAwareInputStreamDataWriter.

Closes #2446 from sv2000/distcpTaskCommit


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/83dd2c57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/83dd2c57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/83dd2c57

Branch: refs/heads/master
Commit: 83dd2c572d7dd117cdb6af3e896ffecc6fde66ca
Parents: 71184cd
Author: sv2000 <[email protected]>
Authored: Tue Oct 2 09:34:03 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Tue Oct 2 09:34:03 2018 -0700

----------------------------------------------------------------------
 .../writer/FileAwareInputStreamDataWriter.java  | 30 ++++++++++++++------
 1 file changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/83dd2c57/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index cda144e..bb9819e 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -29,8 +29,10 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -40,7 +42,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.base.Strings;
 import com.google.common.collect.Iterators;
-import com.google.common.io.Closer;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -64,7 +65,6 @@ import 
org.apache.gobblin.data.management.copy.OwnerAndPermission;
 import org.apache.gobblin.data.management.copy.PreserveAttributes;
 import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper;
 import org.apache.gobblin.instrumented.writer.InstrumentedDataWriter;
-import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
 import org.apache.gobblin.state.ConstructState;
 import org.apache.gobblin.util.FileListUtils;
 import org.apache.gobblin.util.FinalState;
@@ -86,6 +86,8 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
   public static final String GOBBLIN_COPY_BYTES_COPIED_METER = 
"gobblin.copy.bytesCopiedMeter";
   public static final String GOBBLIN_COPY_CHECK_FILESIZE = 
"gobblin.copy.checkFileSize";
   public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
+  public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = 
"gobblin.copy.task.overwrite.on.commit";
+  public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = 
false;
 
   protected final AtomicLong bytesWritten = new AtomicLong();
   protected final AtomicLong filesWritten = new AtomicLong();
@@ -99,6 +101,8 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
   protected final SharedResourcesBroker<GobblinScopeTypes> taskBroker;
   protected final int bufferSize;
   private final boolean checkFileSize;
+  private final Options.Rename renameOptions;
+  private final FileContext fileContext;
 
   protected final Meter copySpeedMeter;
 
@@ -127,11 +131,15 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
 
     this.writerAttemptIdOptional = Optional.fromNullable(writerAttemptId);
 
-    String uri = this.state.getProp(
+    String uriStr = this.state.getProp(
         
ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_SYSTEM_URI,
 numBranches, branchId),
         ConfigurationKeys.LOCAL_FS_URI);
 
-    this.fs = FileSystem.get(URI.create(uri), 
WriterUtils.getFsConfiguration(state));
+    Configuration conf = WriterUtils.getFsConfiguration(state);
+    URI uri = URI.create(uriStr);
+    this.fs = FileSystem.get(uri, conf);
+    this.fileContext = FileContext.getFileContext(uri, conf);
+
     this.stagingDir = this.writerAttemptIdOptional.isPresent() ? WriterUtils
         .getWriterStagingDir(state, numBranches, branchId, 
this.writerAttemptIdOptional.get())
         : WriterUtils.getWriterStagingDir(state, numBranches, branchId);
@@ -148,6 +156,12 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
         .getConfigForBranch(EncryptionConfigParser.EntityType.WRITER, 
this.state, numBranches, branchId);
 
     this.checkFileSize = state.getPropAsBoolean(GOBBLIN_COPY_CHECK_FILESIZE, 
DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE);
+    boolean taskOverwriteOnCommit = 
state.getPropAsBoolean(GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT, 
DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT);
+    if (taskOverwriteOnCommit) {
+      this.renameOptions = Options.Rename.OVERWRITE;
+    } else {
+      this.renameOptions = Options.Rename.NONE;
+    }
   }
 
   public FileAwareInputStreamDataWriter(State state, int numBranches, int 
branchId)
@@ -369,7 +383,7 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
    *
    * {@inheritDoc}
    *
-   * @see org.apache.gobblin.writer.DataWriter#commit()
+   * @see DataWriter#commit()
    */
   @Override
   public void commit()
@@ -394,11 +408,9 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
 
       ensureDirectoryExists(this.fs, outputFilePath.getParent(), 
ancestorOwnerAndPermissionIt);
 
-      if (!this.fs.rename(stagingFilePath, outputFilePath)) {
-        // target exists
-        throw new IOException(String.format("Could not commit file %s.", 
outputFilePath));
-      }
+      this.fileContext.rename(stagingFilePath, outputFilePath, renameOptions);
     } catch (IOException ioe) {
+      log.error("Could not commit file %s.", outputFilePath);
       // persist file
       this.recoveryHelper.persistFile(this.state, copyableFile, 
stagingFilePath);
       throw ioe;

Reply via email to