[BEAM-951] FileBasedSink: merge FileOperations into IOChannelFactory.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a7be3f2f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a7be3f2f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a7be3f2f Branch: refs/heads/master Commit: a7be3f2f19d9112d690997f4571ccfadd6a0de25 Parents: 479c19a Author: Pei He <[email protected]> Authored: Wed Nov 9 17:09:13 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Nov 17 10:54:55 2016 -0800 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/FileBasedSink.java | 154 +------------------ .../beam/sdk/util/FileIOChannelFactory.java | 49 ++++++ .../beam/sdk/util/GcsIOChannelFactory.java | 10 ++ .../apache/beam/sdk/util/IOChannelFactory.java | 22 +++ 4 files changed, 85 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7be3f2f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index f11fbee..5375b90 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; @@ -25,16 +24,11 @@ import static com.google.common.base.Strings.isNullOrEmpty; import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; -import java.io.File; import java.io.IOException; import java.io.Serializable; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; -import java.nio.file.Files; -import java.nio.file.NoSuchFileException; import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -48,10 +42,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.FileIOChannelFactory; -import org.apache.beam.sdk.util.GcsIOChannelFactory; -import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; @@ -415,9 +405,8 @@ public abstract class FileBasedSink<T> extends Sink<T> { if (numFiles > 0) { LOG.debug("Copying {} files.", numFiles); - FileOperations fileOperations = - FileOperationsFactory.getFileOperations(destFilenames.get(0), options); - fileOperations.copy(srcFilenames, destFilenames); + IOChannelUtils.getFactory(destFilenames.get(0)) + .copy(srcFilenames, destFilenames); } else { LOG.info("No output files to write."); } @@ -458,8 +447,6 @@ public abstract class FileBasedSink<T> extends Sink<T> { protected final void removeTemporaryFiles(List<String> knownFiles, PipelineOptions options) throws IOException { LOG.debug("Removing temporary bundle output files in {}.", tempDirectory); - FileOperations fileOperations = - FileOperationsFactory.getFileOperations(tempDirectory, options); IOChannelFactory factory = IOChannelUtils.getFactory(tempDirectory); // To partially mitigate the effects of filesystems with eventually-consistent @@ -477,8 +464,8 @@ public abstract class FileBasedSink<T> extends Sink<T> { tempDirectory, matches.size(), allMatches.size() - matches.size()); - fileOperations.remove(allMatches); - fileOperations.remove(ImmutableList.of(tempDirectory)); + factory.remove(allMatches); + factory.remove(ImmutableList.of(tempDirectory)); } /** @@ -640,139 +627,6 @@ public abstract class FileBasedSink<T> extends Sink<T> { } } - // File system operations - // Warning: These class are purposefully private and will be replaced by more robust file I/O - // utilities. Not for use outside FileBasedSink. - - /** - * Factory for FileOperations. - */ - private static class FileOperationsFactory { - /** - * Return a FileOperations implementation based on which IOChannel would be used to write to a - * location specification (not necessarily a filename, as it may contain wildcards). - * - * <p>Only supports File and GCS locations (currently, the only factories registered with - * IOChannelUtils). For other locations, an exception is thrown. - */ - public static FileOperations getFileOperations(String spec, PipelineOptions options) - throws IOException { - IOChannelFactory factory = IOChannelUtils.getFactory(spec); - if (factory instanceof GcsIOChannelFactory) { - return new GcsOperations(options); - } else if (factory instanceof FileIOChannelFactory) { - return new LocalFileOperations(factory); - } else { - throw new IOException("Unrecognized file system."); - } - } - } - - /** - * Copy and Remove operations for files. Operations behave like remove-if-existing and - * copy-if-existing and do not throw exceptions on file not found to enable retries of these - * operations in the case of transient error. - */ - private interface FileOperations { - /** - * Copies a collection of files from one location to another. - * - * <p>The number of source filenames must equal the number of destination filenames. - * - * @param srcFilenames the source filenames. - * @param destFilenames the destination filenames. - */ - void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException; - - /** - * Removes a collection of files or directories. - * - * <p>Directories are required to be empty. Non-empty directories will not be deleted, - * and this method may return silently or throw an exception. - */ - void remove(Collection<String> filesOrDirs) throws IOException; - } - - /** - * GCS file system operations. - */ - private static class GcsOperations implements FileOperations { - private final GcsUtil gcsUtil; - - GcsOperations(PipelineOptions options) { - gcsUtil = new GcsUtilFactory().create(options); - } - - @Override - public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException { - gcsUtil.copy(srcFilenames, destFilenames); - } - - @Override - public void remove(Collection<String> filesOrDirs) throws IOException { - gcsUtil.remove(filesOrDirs); - } - } - - /** - * File systems supported by {@link Files}. - */ - private static class LocalFileOperations implements FileOperations { - private static final Logger LOG = LoggerFactory.getLogger(LocalFileOperations.class); - - private final IOChannelFactory factory; - - LocalFileOperations(IOChannelFactory factory) { - this.factory = factory; - } - - @Override - public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException { - checkArgument( - srcFilenames.size() == destFilenames.size(), - "Number of source files %s must equal number of destination files %s", - srcFilenames.size(), - destFilenames.size()); - int numFiles = srcFilenames.size(); - for (int i = 0; i < numFiles; i++) { - String src = srcFilenames.get(i); - String dst = destFilenames.get(i); - LOG.debug("Copying {} to {}", src, dst); - copyOne(src, dst); - } - } - - private void copyOne(String source, String destination) throws IOException { - try { - // Copy the source file, replacing the existing destination. - // Paths.get(x) will not work on win cause of the ":" after the drive letter - Files.copy( - new File(source).toPath(), - new File(destination).toPath(), - StandardCopyOption.REPLACE_EXISTING); - } catch (NoSuchFileException e) { - LOG.debug("{} does not exist.", source); - // Suppress exception if file does not exist. - } - } - - @Override - public void remove(Collection<String> filesOrDirs) throws IOException { - for (String fileOrDir : filesOrDirs) { - LOG.debug("Removing file {}", fileOrDir); - removeOne(fileOrDir); - } - } - - private void removeOne(String fileOrDir) throws IOException { - // Delete the file if it exists. - boolean exists = Files.deleteIfExists(Paths.get(fileOrDir)); - if (!exists) { - LOG.debug("Tried to delete {}, but it did not exist", fileOrDir); - } - } - } - /** * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink} * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7be3f2f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java index b5d85fc..5cba970 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.util; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Iterables; @@ -36,6 +38,7 @@ import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.PathMatcher; import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; import java.util.Collection; import java.util.Collections; import java.util.LinkedList; @@ -159,4 +162,50 @@ public class FileIOChannelFactory implements IOChannelFactory { public Path toPath(String path) { return specToFile(path).toPath(); } + + @Override + public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException { + checkArgument( + srcFilenames.size() == destFilenames.size(), + "Number of source files %s must equal number of destination files %s", + srcFilenames.size(), + destFilenames.size()); + int numFiles = srcFilenames.size(); + for (int i = 0; i < numFiles; i++) { + String src = srcFilenames.get(i); + String dst = destFilenames.get(i); + LOG.debug("Copying {} to {}", src, dst); + copyOne(src, dst); + } + } + + private void copyOne(String source, String destination) throws IOException { + try { + // Copy the source file, replacing the existing destination. + // Paths.get(x) will not work on win cause of the ":" after the drive letter + Files.copy( + new File(source).toPath(), + new File(destination).toPath(), + StandardCopyOption.REPLACE_EXISTING); + } catch (NoSuchFileException e) { + LOG.debug("{} does not exist.", source); + // Suppress exception if file does not exist. + } + } + + @Override + public void remove(Collection<String> filesOrDirs) throws IOException { + for (String fileOrDir : filesOrDirs) { + LOG.debug("Removing file {}", fileOrDir); + removeOne(fileOrDir); + } + } + + private void removeOne(String fileOrDir) throws IOException { + // Delete the file if it exists. + boolean exists = Files.deleteIfExists(Paths.get(fileOrDir)); + if (!exists) { + LOG.debug("Tried to delete {}, but it did not exist", fileOrDir); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7be3f2f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java index 652e468..bd2ec4e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java @@ -89,4 +89,14 @@ public class GcsIOChannelFactory implements IOChannelFactory { public Path toPath(String path) { return GcsPath.fromUri(path); } + + @Override + public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException { + options.getGcsUtil().copy(srcFilenames, destFilenames); + } + + @Override + public void remove(Collection<String> filesOrDirs) throws IOException { + options.getGcsUtil().remove(filesOrDirs); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7be3f2f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java index 4e55036..9504f45 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java @@ -23,6 +23,7 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.Path; import java.util.Collection; +import java.util.List; /** * Defines a factory for working with read and write channels. @@ -31,7 +32,10 @@ import java.util.Collection; * * <p>See <a href="http://docs.oracle.com/javase/7/docs/api/java/nio/channels/package-summary.html" * >Java NIO Channels</a> + * + * @deprecated This is under redesign, see: https://issues.apache.org/jira/browse/BEAM-59. */ +@Deprecated public interface IOChannelFactory { /** @@ -103,4 +107,22 @@ public interface IOChannelFactory { /** Converts the given string to a {@link Path}. */ Path toPath(String path); + + /** + * Copies a collection of files from one location to another. + * + * <p>The number of source filenames must equal the number of destination filenames. + * + * @param srcFilenames the source filenames. + * @param destFilenames the destination filenames. + */ + void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException; + + /** + * Removes a collection of files or directories. + * + * <p>Directories are required to be empty. Non-empty directories will not be deleted, + * and this method may return silently or throw an exception. + */ + void remove(Collection<String> filesOrDirs) throws IOException; }
