Repository: incubator-gobblin Updated Branches: refs/heads/master 71184cdb1 -> 83dd2c572
[GOBBLIN-582] Allow file overwrite when copying from task staging to task output for FileAwareInputStreamDataWriter. Closes #2446 from sv2000/distcpTaskCommit Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/83dd2c57 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/83dd2c57 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/83dd2c57 Branch: refs/heads/master Commit: 83dd2c572d7dd117cdb6af3e896ffecc6fde66ca Parents: 71184cd Author: sv2000 <[email protected]> Authored: Tue Oct 2 09:34:03 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Oct 2 09:34:03 2018 -0700 ---------------------------------------------------------------------- .../writer/FileAwareInputStreamDataWriter.java | 30 ++++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/83dd2c57/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java index cda144e..bb9819e 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java @@ -29,8 +29,10 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; @@ -40,7 +42,6 @@ import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.collect.Iterators; -import com.google.common.io.Closer; import lombok.extern.slf4j.Slf4j; @@ -64,7 +65,6 @@ import org.apache.gobblin.data.management.copy.OwnerAndPermission; import org.apache.gobblin.data.management.copy.PreserveAttributes; import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper; import org.apache.gobblin.instrumented.writer.InstrumentedDataWriter; -import org.apache.gobblin.metrics.event.sla.SlaEventKeys; import org.apache.gobblin.state.ConstructState; import org.apache.gobblin.util.FileListUtils; import org.apache.gobblin.util.FinalState; @@ -86,6 +86,8 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA public static final String GOBBLIN_COPY_BYTES_COPIED_METER = "gobblin.copy.bytesCopiedMeter"; public static final String GOBBLIN_COPY_CHECK_FILESIZE = "gobblin.copy.checkFileSize"; public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false; + public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = "gobblin.copy.task.overwrite.on.commit"; + public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = false; protected final AtomicLong bytesWritten = new AtomicLong(); protected final AtomicLong filesWritten = new AtomicLong(); @@ -99,6 +101,8 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA protected final SharedResourcesBroker<GobblinScopeTypes> taskBroker; protected final int bufferSize; private final boolean checkFileSize; + private final Options.Rename renameOptions; + private final FileContext fileContext; protected final Meter copySpeedMeter; @@ -127,11 +131,15 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA this.writerAttemptIdOptional = Optional.fromNullable(writerAttemptId); - String uri = this.state.getProp( + String uriStr = this.state.getProp( ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, numBranches, branchId), ConfigurationKeys.LOCAL_FS_URI); - this.fs = FileSystem.get(URI.create(uri), WriterUtils.getFsConfiguration(state)); + Configuration conf = WriterUtils.getFsConfiguration(state); + URI uri = URI.create(uriStr); + this.fs = FileSystem.get(uri, conf); + this.fileContext = FileContext.getFileContext(uri, conf); + this.stagingDir = this.writerAttemptIdOptional.isPresent() ? WriterUtils .getWriterStagingDir(state, numBranches, branchId, this.writerAttemptIdOptional.get()) : WriterUtils.getWriterStagingDir(state, numBranches, branchId); @@ -148,6 +156,12 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA .getConfigForBranch(EncryptionConfigParser.EntityType.WRITER, this.state, numBranches, branchId); this.checkFileSize = state.getPropAsBoolean(GOBBLIN_COPY_CHECK_FILESIZE, DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE); + boolean taskOverwriteOnCommit = state.getPropAsBoolean(GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT, DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT); + if (taskOverwriteOnCommit) { + this.renameOptions = Options.Rename.OVERWRITE; + } else { + this.renameOptions = Options.Rename.NONE; + } } public FileAwareInputStreamDataWriter(State state, int numBranches, int branchId) @@ -369,7 +383,7 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA * * {@inheritDoc} * - * @see org.apache.gobblin.writer.DataWriter#commit() + * @see DataWriter#commit() */ @Override public void commit() @@ -394,11 +408,9 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA ensureDirectoryExists(this.fs, outputFilePath.getParent(), ancestorOwnerAndPermissionIt); - if (!this.fs.rename(stagingFilePath, outputFilePath)) { - // target exists - throw new IOException(String.format("Could not commit file %s.", outputFilePath)); - } + this.fileContext.rename(stagingFilePath, outputFilePath, renameOptions); } catch (IOException ioe) { + log.error("Could not commit file %s.", outputFilePath); // persist file this.recoveryHelper.persistFile(this.state, copyableFile, stagingFilePath); throw ioe;
