MLHR-1889 #resolve #comment moved atomic renaming to its own method
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5d8382d5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5d8382d5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5d8382d5 Branch: refs/heads/master Commit: 5d8382d51b12f0e47e83271b327d03cfe88b49b0 Parents: 73c8abf Author: Chandni Singh <[email protected]> Authored: Wed Nov 4 11:40:39 2015 -0800 Committer: Chandni Singh <[email protected]> Committed: Thu Nov 5 17:34:03 2015 -0800 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileOutputOperator.java | 61 +++++++++++++++----- 1 file changed, 46 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5d8382d5/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index 744f024..fcbe1e8 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -21,7 +21,11 @@ package com.datatorrent.lib.io.fs; import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.util.*; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -29,31 +33,43 @@ import javax.annotation.Nonnull; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; -import com.google.common.cache.*; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.AbstractFileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.permission.FsPermission; -import com.datatorrent.lib.counters.BasicCounters; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.Operator; import com.datatorrent.api.StreamCodec; import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.counters.BasicCounters; /** * This base implementation for a fault tolerant HDFS output operator, @@ -179,6 +195,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp * The file system used to write to. */ protected transient FileSystem fs; + protected transient FileContext fileContext; protected short filePermission = 0777; @@ -455,7 +472,6 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp fsOutput.close(); inputStream.close(); - FileContext fileContext = FileContext.getFileContext(fs.getUri()); LOG.debug("active {} recovery {} ", filepath, recoveryFilePath); if (alwaysWriteToTmp) { @@ -464,7 +480,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp fileNameToTmpName.put(partFileName, recoveryFileName); } else { LOG.debug("recovery path {} actual path {} ", recoveryFilePath, status.getPath()); - fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE); + rename(recoveryFilePath, status.getPath()); } } else { if (alwaysWriteToTmp && filesWithOpenStreams.contains(filename)) { @@ -642,6 +658,22 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp } /** + * Renames source path to destination atomically. This relies on the FileContext api. If + * the underlying filesystem doesn't have an {@link AbstractFileSystem} then this should be overridden. + * + * @param source source path + * @param destination destination path + * @throws IOException + */ + protected void rename(Path source, Path destination) throws IOException + { + if (fileContext == null) { + fileContext = FileContext.getFileContext(fs.getUri()); + } + fileContext.rename(source, destination, Options.Rename.OVERWRITE); + } + + /** * Requests a file to be finalized. When it is writing to a rolling file, this will * request for finalizing the current open part and all the prev parts which weren't requested yet. * @@ -1206,13 +1238,12 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp protected void finalizeFile(String fileName) throws IOException { String tmpFileName = fileNameToTmpName.get(fileName); - FileContext fileContext = FileContext.getFileContext(fs.getUri()); Path srcPath = new Path(filePath + Path.SEPARATOR + tmpFileName); Path destPath = new Path(filePath + Path.SEPARATOR + fileName); if (!fs.exists(destPath)) { LOG.debug("rename from tmp {} actual {} ", tmpFileName, fileName); - fileContext.rename(srcPath, destPath); + rename(srcPath, destPath); } else if (fs.exists(srcPath)) { //if the destination and src both exists that means there was a failure between file rename and clearing the endOffset so //we just delete the tmp file.
