http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 583af60..8102316 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -33,7 +33,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; -import java.lang.reflect.TypeVariable; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.ArrayList; @@ -50,10 +49,8 @@ import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.StructuredCoder; @@ -76,7 +73,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder; import org.apache.beam.sdk.util.MimeTypes; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; import org.joda.time.Instant; @@ -86,43 +82,43 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Abstract class for file-based output. An implementation of FileBasedSink writes file-based output - * and defines the format of output files (how values are written, headers/footers, MIME type, - * etc.). + * Abstract class for file-based output. An implementation of FileBasedSink writes file-based + * output and defines the format of output files (how values are written, headers/footers, MIME + * type, etc.). * * <p>At pipeline construction time, the methods of FileBasedSink are called to validate the sink * and to create a {@link WriteOperation} that manages the process of writing to the sink. * * <p>The process of writing to file-based sink is as follows: - * * <ol> - * <li>An optional subclass-defined initialization, - * <li>a parallel write of bundles to temporary files, and finally, - * <li>these temporary files are renamed with final output filenames. + * <li>An optional subclass-defined initialization, + * <li>a parallel write of bundles to temporary files, and finally, + * <li>these temporary files are renamed with final output filenames. * </ol> * * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the * event of failure/retry or for redundancy). However, exactly one of these executions will have its - * result passed to the finalize method. Each call to {@link Writer#openWindowed} or {@link - * Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the WriteFiles - * transform, so even redundant or retried bundles will have a unique way of identifying their - * output. + * result passed to the finalize method. Each call to {@link Writer#openWindowed} + * or {@link Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called + * by the WriteFiles transform, so even redundant or retried bundles will have a unique way of + * identifying + * their output. * * <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness * guarantee is important; if a bundle is to be output to a file, for example, the name of the file * will encode the unique bundle id to avoid conflicts with other writers. * - * <p>{@link FileBasedSink} can take a custom {@link FilenamePolicy} object to determine output - * filenames, and this policy object can be used to write windowed or triggered PCollections into - * separate files per window pane. This allows file output from unbounded PCollections, and also - * works for bounded PCollecctions. + * {@link FileBasedSink} can take a custom {@link FilenamePolicy} object to determine output + * filenames, and this policy object can be used to write windowed or triggered + * PCollections into separate files per window pane. This allows file output from unbounded + * PCollections, and also works for bounded PCollecctions. * * <p>Supported file systems are those registered with {@link FileSystems}. * - * @param <OutputT> the type of values written to the sink. + * @param <T> the type of values written to the sink. */ @Experimental(Kind.FILESYSTEM) -public abstract class FileBasedSink<OutputT, DestinationT> implements Serializable, HasDisplayData { +public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class); /** @@ -177,7 +173,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } @Override - public String getSuggestedFilenameSuffix() { + public String getFilenameSuffix() { return filenameSuffix; } @@ -209,8 +205,6 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } } - private final DynamicDestinations<?, DestinationT> dynamicDestinations; - /** * The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the * underlying channel. The default is to not compress the output using @@ -219,70 +213,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab private final WritableByteChannelFactory writableByteChannelFactory; /** - * A class that allows value-dependent writes in {@link FileBasedSink}. - * - * <p>Users can define a custom type to represent destinations, and provide a mapping to turn this - * destination type into an instance of {@link FilenamePolicy}. + * A naming policy for output files. */ - @Experimental(Kind.FILESYSTEM) - public abstract static class DynamicDestinations<UserT, DestinationT> - implements HasDisplayData, Serializable { - /** - * Returns an object that represents at a high level the destination being written to. May not - * return null. - */ - public abstract DestinationT getDestination(UserT element); - - /** - * Returns the default destination. This is used for collections that have no elements as the - * destination to write empty files to. - */ - public abstract DestinationT getDefaultDestination(); - - /** - * Returns the coder for {@link DestinationT}. If this is not overridden, then the coder - * registry will be use to find a suitable coder. This must be a deterministic coder, as {@link - * DestinationT} will be used as a key type in a {@link - * org.apache.beam.sdk.transforms.GroupByKey}. - */ - @Nullable - public Coder<DestinationT> getDestinationCoder() { - return null; - } - - /** Converts a destination into a {@link FilenamePolicy}. May not return null. */ - public abstract FilenamePolicy getFilenamePolicy(DestinationT destination); - - /** Populates the display data. */ - @Override - public void populateDisplayData(DisplayData.Builder builder) {} - - // Gets the destination coder. If the user does not provide one, try to find one in the coder - // registry. If no coder can be found, throws CannotProvideCoderException. - final Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry) - throws CannotProvideCoderException { - Coder<DestinationT> destinationCoder = getDestinationCoder(); - if (destinationCoder != null) { - return destinationCoder; - } - // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry. - // We must first use reflection to figure out what the type parameter is. - TypeDescriptor<?> superDescriptor = - TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class); - if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) { - throw new AssertionError( - "Couldn't find the DynamicDestinations superclass of " + this.getClass()); - } - TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT"); - @SuppressWarnings("unchecked") - TypeDescriptor<DestinationT> descriptor = - (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable); - return registry.getCoder(descriptor); - } - } - - /** A naming policy for output files. */ - @Experimental(Kind.FILESYSTEM) public abstract static class FilenamePolicy implements Serializable { /** * Context used for generating a name based on shard number, and num shards. @@ -355,28 +287,29 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab /** * When a sink has requested windowed or triggered output, this method will be invoked to return * the file {@link ResourceId resource} to be created given the base output directory and a - * {@link OutputFileHints} containing information about the file, including a suggested - * extension (e.g. coming from {@link CompressionType}). + * (possibly empty) extension from {@link FileBasedSink} configuration + * (e.g., {@link CompressionType}). * - * <p>The {@link WindowedContext} object gives access to the window and pane, as well as - * sharding information. The policy must return unique and consistent filenames for different - * windows and panes. + * <p>The {@link WindowedContext} object gives access to the window and pane, + * as well as sharding information. The policy must return unique and consistent filenames + * for different windows and panes. */ @Experimental(Kind.FILESYSTEM) - public abstract ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints); + public abstract ResourceId windowedFilename( + ResourceId outputDirectory, WindowedContext c, String extension); /** * When a sink has not requested windowed or triggered output, this method will be invoked to * return the file {@link ResourceId resource} to be created given the base output directory and - * a {@link OutputFileHints} containing information about the file, including a suggested (e.g. - * coming from {@link CompressionType}). + * a (possibly empty) extension applied by additional {@link FileBasedSink} configuration + * (e.g., {@link CompressionType}). * * <p>The {@link Context} object only provides sharding information, which is used by the policy * to generate unique and consistent filenames. */ @Experimental(Kind.FILESYSTEM) - @Nullable - public abstract ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints); + @Nullable public abstract ResourceId unwindowedFilename( + ResourceId outputDirectory, Context c, String extension); /** * Populates the display data. @@ -385,8 +318,19 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } } + /** The policy used to generate names of files to be produced. */ + private final FilenamePolicy filenamePolicy; /** The directory to which files will be written. */ - private final ValueProvider<ResourceId> tempDirectoryProvider; + private final ValueProvider<ResourceId> baseOutputDirectoryProvider; + + /** + * Construct a {@link FileBasedSink} with the given filename policy, producing uncompressed files. + */ + @Experimental(Kind.FILESYSTEM) + public FileBasedSink( + ValueProvider<ResourceId> baseOutputDirectoryProvider, FilenamePolicy filenamePolicy) { + this(baseOutputDirectoryProvider, filenamePolicy, CompressionType.UNCOMPRESSED); + } private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> { @Override @@ -396,91 +340,95 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } /** - * Construct a {@link FileBasedSink} with the given temp directory, producing uncompressed files. + * Construct a {@link FileBasedSink} with the given filename policy and output channel type. */ @Experimental(Kind.FILESYSTEM) public FileBasedSink( - ValueProvider<ResourceId> tempDirectoryProvider, - DynamicDestinations<?, DestinationT> dynamicDestinations) { - this(tempDirectoryProvider, dynamicDestinations, CompressionType.UNCOMPRESSED); - } - - /** Construct a {@link FileBasedSink} with the given temp directory and output channel type. */ - @Experimental(Kind.FILESYSTEM) - public FileBasedSink( - ValueProvider<ResourceId> tempDirectoryProvider, - DynamicDestinations<?, DestinationT> dynamicDestinations, + ValueProvider<ResourceId> baseOutputDirectoryProvider, + FilenamePolicy filenamePolicy, WritableByteChannelFactory writableByteChannelFactory) { - this.tempDirectoryProvider = - NestedValueProvider.of(tempDirectoryProvider, new ExtractDirectory()); - this.dynamicDestinations = checkNotNull(dynamicDestinations); + this.baseOutputDirectoryProvider = + NestedValueProvider.of(baseOutputDirectoryProvider, new ExtractDirectory()); + this.filenamePolicy = filenamePolicy; this.writableByteChannelFactory = writableByteChannelFactory; } - /** Return the {@link DynamicDestinations} used. */ - @SuppressWarnings("unchecked") - public <UserT> DynamicDestinations<UserT, DestinationT> getDynamicDestinations() { - return (DynamicDestinations<UserT, DestinationT>) dynamicDestinations; + /** + * Returns the base directory inside which files will be written according to the configured + * {@link FilenamePolicy}. + */ + @Experimental(Kind.FILESYSTEM) + public ValueProvider<ResourceId> getBaseOutputDirectoryProvider() { + return baseOutputDirectoryProvider; } /** - * Returns the directory inside which temprary files will be written according to the configured - * {@link FilenamePolicy}. + * Returns the policy by which files will be named inside of the base output directory. Note that + * the {@link FilenamePolicy} may itself specify one or more inner directories before each output + * file, say when writing windowed outputs in a {@code output/YYYY/MM/DD/file.txt} format. */ @Experimental(Kind.FILESYSTEM) - public ValueProvider<ResourceId> getTempDirectoryProvider() { - return tempDirectoryProvider; + public final FilenamePolicy getFilenamePolicy() { + return filenamePolicy; } public void validate(PipelineOptions options) {} - /** Return a subclass of {@link WriteOperation} that will manage the write to the sink. */ - public abstract WriteOperation<OutputT, DestinationT> createWriteOperation(); + /** + * Return a subclass of {@link WriteOperation} that will manage the write + * to the sink. + */ + public abstract WriteOperation<T> createWriteOperation(); public void populateDisplayData(DisplayData.Builder builder) { - getDynamicDestinations().populateDisplayData(builder); + getFilenamePolicy().populateDisplayData(builder); } /** * Abstract operation that manages the process of writing to {@link FileBasedSink}. * - * <p>The primary responsibilities of the WriteOperation is the management of output files. During - * a write, {@link Writer}s write bundles to temporary file locations. After the bundles have been - * written, - * + * <p>The primary responsibilities of the WriteOperation is the management of output + * files. During a write, {@link Writer}s write bundles to temporary file + * locations. After the bundles have been written, * <ol> - * <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the - * output bundles. - * <li>During finalize, these temporary files are copied to final output locations and named - * according to a file naming template. - * <li>Finally, any temporary files that were created during the write are removed. + * <li>{@link WriteOperation#finalize} is given a list of the temporary + * files containing the output bundles. + * <li>During finalize, these temporary files are copied to final output locations and named + * according to a file naming template. + * <li>Finally, any temporary files that were created during the write are removed. * </ol> * - * <p>Subclass implementations of WriteOperation must implement {@link - * WriteOperation#createWriter} to return a concrete FileBasedSinkWriter. + * <p>Subclass implementations of WriteOperation must implement + * {@link WriteOperation#createWriter} to return a concrete + * FileBasedSinkWriter. * - * <h2>Temporary and Output File Naming:</h2> + * <h2>Temporary and Output File Naming:</h2> During the write, bundles are written to temporary + * files using the tempDirectory that can be provided via the constructor of + * WriteOperation. These temporary files will be named + * {@code {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle. + * For example, if tempDirectory is "gs://my-bucket/my_temp_output", the output for a + * bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output/15723". * - * <p>During the write, bundles are written to temporary files using the tempDirectory that can be - * provided via the constructor of WriteOperation. These temporary files will be named {@code - * {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle. For example, if - * tempDirectory is "gs://my-bucket/my_temp_output", the output for a bundle with bundle id 15723 - * will be "gs://my-bucket/my_temp_output/15723". + * <p>Final output files are written to baseOutputFilename with the format + * {@code {baseOutputFilename}-0000i-of-0000n.{extension}} where n is the total number of bundles + * written and extension is the file extension. Both baseOutputFilename and extension are required + * constructor arguments. * - * <p>Final output files are written to the location specified by the {@link FilenamePolicy}. If - * no filename policy is specified, then the {@link DefaultFilenamePolicy} will be used. The - * directory that the files are written to is determined by the {@link FilenamePolicy} instance. + * <p>Subclass implementations can change the file naming template by supplying a value for + * fileNamingTemplate. * * <p>Note that in the case of permanent failure of a bundle's write, no clean up of temporary * files will occur. * * <p>If there are no elements in the PCollection being written, no output will be generated. * - * @param <OutputT> the type of values written to the sink. + * @param <T> the type of values written to the sink. */ - public abstract static class WriteOperation<OutputT, DestinationT> implements Serializable { - /** The Sink that this WriteOperation will write to. */ - protected final FileBasedSink<OutputT, DestinationT> sink; + public abstract static class WriteOperation<T> implements Serializable { + /** + * The Sink that this WriteOperation will write to. + */ + protected final FileBasedSink<T> sink; /** Directory for temporary output files. */ protected final ValueProvider<ResourceId> tempDirectory; @@ -497,19 +445,17 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } /** - * Constructs a WriteOperation using the default strategy for generating a temporary directory - * from the base output filename. + * Constructs a WriteOperation using the default strategy for generating a temporary + * directory from the base output filename. * - * <p>Default is a uniquely named subdirectory of the provided tempDirectory, e.g. if - * tempDirectory is /path/to/foo/, the temporary directory will be - * /path/to/foo/temp-beam-foo-$date. + * <p>Default is a uniquely named sibling of baseOutputFilename, e.g. if baseOutputFilename is + * /path/to/foo, the temporary directory will be /path/to/temp-beam-foo-$date. * * @param sink the FileBasedSink that will be used to configure this write operation. */ - public WriteOperation(FileBasedSink<OutputT, DestinationT> sink) { - this( - sink, - NestedValueProvider.of(sink.getTempDirectoryProvider(), new TemporaryDirectoryBuilder())); + public WriteOperation(FileBasedSink<T> sink) { + this(sink, NestedValueProvider.of( + sink.getBaseOutputDirectoryProvider(), new TemporaryDirectoryBuilder())); } private static class TemporaryDirectoryBuilder @@ -525,12 +471,10 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab private final Long tempId = TEMP_COUNT.getAndIncrement(); @Override - public ResourceId apply(ResourceId tempDirectory) { + public ResourceId apply(ResourceId baseOutputDirectory) { // Temp directory has a timestamp and a unique ID String tempDirName = String.format(".temp-beam-%s-%s", timestamp, tempId); - return tempDirectory - .getCurrentDirectory() - .resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY); + return baseOutputDirectory.resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY); } } @@ -541,22 +485,22 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * @param tempDirectory the base directory to be used for temporary output files. */ @Experimental(Kind.FILESYSTEM) - public WriteOperation(FileBasedSink<OutputT, DestinationT> sink, ResourceId tempDirectory) { + public WriteOperation(FileBasedSink<T> sink, ResourceId tempDirectory) { this(sink, StaticValueProvider.of(tempDirectory)); } private WriteOperation( - FileBasedSink<OutputT, DestinationT> sink, ValueProvider<ResourceId> tempDirectory) { + FileBasedSink<T> sink, ValueProvider<ResourceId> tempDirectory) { this.sink = sink; this.tempDirectory = tempDirectory; this.windowedWrites = false; } /** - * Clients must implement to return a subclass of {@link Writer}. This method must not mutate - * the state of the object. + * Clients must implement to return a subclass of {@link Writer}. This + * method must not mutate the state of the object. */ - public abstract Writer<OutputT, DestinationT> createWriter() throws Exception; + public abstract Writer<T> createWriter() throws Exception; /** * Indicates that the operation will be performing windowed writes. @@ -570,8 +514,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * removing temporary files. * * <p>Finalization may be overridden by subclass implementations to perform customized - * finalization (e.g., initiating some operation on output bundles, merging them, etc.). {@code - * writerResults} contains the filenames of written bundles. + * finalization (e.g., initiating some operation on output bundles, merging them, etc.). + * {@code writerResults} contains the filenames of written bundles. * * <p>If subclasses override this method, they must guarantee that its implementation is * idempotent, as it may be executed multiple times in the case of failure or for redundancy. It @@ -579,7 +523,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * * @param writerResults the results of writes (FileResult). */ - public void finalize(Iterable<FileResult<DestinationT>> writerResults) throws Exception { + public void finalize(Iterable<FileResult> writerResults) throws Exception { // Collect names of temporary files and rename them. Map<ResourceId, ResourceId> outputFilenames = buildOutputFilenames(writerResults); copyToOutputFiles(outputFilenames); @@ -598,14 +542,17 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab @Experimental(Kind.FILESYSTEM) protected final Map<ResourceId, ResourceId> buildOutputFilenames( - Iterable<FileResult<DestinationT>> writerResults) { + Iterable<FileResult> writerResults) { int numShards = Iterables.size(writerResults); Map<ResourceId, ResourceId> outputFilenames = new HashMap<>(); + FilenamePolicy policy = getSink().getFilenamePolicy(); + ResourceId baseOutputDir = getSink().getBaseOutputDirectoryProvider().get(); + // Either all results have a shard number set (if the sink is configured with a fixed // number of shards), or they all don't (otherwise). Boolean isShardNumberSetEverywhere = null; - for (FileResult<DestinationT> result : writerResults) { + for (FileResult result : writerResults) { boolean isShardNumberSetHere = (result.getShard() != UNKNOWN_SHARDNUM); if (isShardNumberSetEverywhere == null) { isShardNumberSetEverywhere = isShardNumberSetHere; @@ -621,7 +568,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab isShardNumberSetEverywhere = true; } - List<FileResult<DestinationT>> resultsWithShardNumbers = Lists.newArrayList(); + List<FileResult> resultsWithShardNumbers = Lists.newArrayList(); if (isShardNumberSetEverywhere) { resultsWithShardNumbers = Lists.newArrayList(writerResults); } else { @@ -630,32 +577,29 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab // case of triggers, the list of FileResult objects in the Finalize iterable is not // deterministic, and might change over retries. This breaks the assumption below that // sorting the FileResult objects provides idempotency. - List<FileResult<DestinationT>> sortedByTempFilename = + List<FileResult> sortedByTempFilename = Ordering.from( - new Comparator<FileResult<DestinationT>>() { - @Override - public int compare( - FileResult<DestinationT> first, FileResult<DestinationT> second) { - String firstFilename = first.getTempFilename().toString(); - String secondFilename = second.getTempFilename().toString(); - return firstFilename.compareTo(secondFilename); - } - }) + new Comparator<FileResult>() { + @Override + public int compare(FileResult first, FileResult second) { + String firstFilename = first.getTempFilename().toString(); + String secondFilename = second.getTempFilename().toString(); + return firstFilename.compareTo(secondFilename); + } + }) .sortedCopy(writerResults); for (int i = 0; i < sortedByTempFilename.size(); i++) { resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i)); } } - for (FileResult<DestinationT> result : resultsWithShardNumbers) { + for (FileResult result : resultsWithShardNumbers) { checkArgument( result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard number on %s", result); outputFilenames.put( result.getTempFilename(), result.getDestinationFile( - getSink().getDynamicDestinations(), - numShards, - getSink().getWritableByteChannelFactory())); + policy, baseOutputDir, numShards, getSink().getExtension())); } int numDistinctShards = new HashSet<>(outputFilenames.values()).size(); @@ -671,18 +615,18 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * * <p>Can be called from subclasses that override {@link WriteOperation#finalize}. * - * <p>Files will be named according to the {@link FilenamePolicy}. The order of the output files - * will be the same as the sorted order of the input filenames. In other words (when using - * {@link DefaultFilenamePolicy}), if the input filenames are ["C", "A", "B"], baseFilename (int - * the policy) is "dir/file", the extension is ".txt", and the fileNamingTemplate is - * "-SSS-of-NNN", the contents of A will be copied to dir/file-000-of-003.txt, the contents of B - * will be copied to dir/file-001-of-003.txt, etc. + * <p>Files will be named according to the file naming template. The order of the output files + * will be the same as the sorted order of the input filenames. In other words, if the input + * filenames are ["C", "A", "B"], baseOutputFilename is "file", the extension is ".txt", and + * the fileNamingTemplate is "-SSS-of-NNN", the contents of A will be copied to + * file-000-of-003.txt, the contents of B will be copied to file-001-of-003.txt, etc. * * @param filenames the filenames of temporary files. */ @VisibleForTesting @Experimental(Kind.FILESYSTEM) - final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames) throws IOException { + final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames) + throws IOException { int numFiles = filenames.size(); if (numFiles > 0) { LOG.debug("Copying {} files.", numFiles); @@ -754,8 +698,10 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } } - /** Returns the FileBasedSink for this write operation. */ - public FileBasedSink<OutputT, DestinationT> getSink() { + /** + * Returns the FileBasedSink for this write operation. + */ + public FileBasedSink<T> getSink() { return sink; } @@ -773,28 +719,33 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } } - /** Returns the {@link WritableByteChannelFactory} used. */ - protected final WritableByteChannelFactory getWritableByteChannelFactory() { - return writableByteChannelFactory; + /** Returns the extension that will be written to the produced files. */ + protected final String getExtension() { + String extension = MoreObjects.firstNonNull(writableByteChannelFactory.getFilenameSuffix(), ""); + if (!extension.isEmpty() && !extension.startsWith(".")) { + extension = "." + extension; + } + return extension; } /** - * Abstract writer that writes a bundle to a {@link FileBasedSink}. Subclass implementations - * provide a method that can write a single value to a {@link WritableByteChannel}. + * Abstract writer that writes a bundle to a {@link FileBasedSink}. Subclass + * implementations provide a method that can write a single value to a + * {@link WritableByteChannel}. * * <p>Subclass implementations may also override methods that write headers and footers before and * after the values in a bundle, respectively, as well as provide a MIME type for the output * channel. * - * <p>Multiple {@link Writer} instances may be created on the same worker, and therefore any - * access to static members or methods should be thread safe. + * <p>Multiple {@link Writer} instances may be created on the same worker, and therefore + * any access to static members or methods should be thread safe. * - * @param <OutputT> the type of values to write. + * @param <T> the type of values to write. */ - public abstract static class Writer<OutputT, DestinationT> { + public abstract static class Writer<T> { private static final Logger LOG = LoggerFactory.getLogger(Writer.class); - private final WriteOperation<OutputT, DestinationT> writeOperation; + private final WriteOperation<T> writeOperation; /** Unique id for this output bundle. */ private String id; @@ -802,7 +753,6 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab private BoundedWindow window; private PaneInfo paneInfo; private int shard = -1; - private DestinationT destination; /** The output file for this bundle. May be null if opening failed. */ private @Nullable ResourceId outputFile; @@ -822,8 +772,10 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab */ private final String mimeType; - /** Construct a new {@link Writer} that will produce files of the given MIME type. */ - public Writer(WriteOperation<OutputT, DestinationT> writeOperation, String mimeType) { + /** + * Construct a new {@link Writer} that will produce files of the given MIME type. + */ + public Writer(WriteOperation<T> writeOperation, String mimeType) { checkNotNull(writeOperation); this.writeOperation = writeOperation; this.mimeType = mimeType; @@ -866,29 +818,28 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * id populated for the case of static sharding. In cases where the runner is dynamically * picking sharding, shard might be set to -1. */ - public final void openWindowed( - String uId, BoundedWindow window, PaneInfo paneInfo, int shard, DestinationT destination) + public final void openWindowed(String uId, BoundedWindow window, PaneInfo paneInfo, int shard) throws Exception { if (!getWriteOperation().windowedWrites) { throw new IllegalStateException("openWindowed called a non-windowed sink."); } - open(uId, window, paneInfo, shard, destination); + open(uId, window, paneInfo, shard); } /** * Called for each value in the bundle. */ - public abstract void write(OutputT value) throws Exception; + public abstract void write(T value) throws Exception; /** - * Similar to {@link #openWindowed} however for the case where unwindowed writes were requested. + * Similar to {@link #openWindowed} however for the case where unwindowed writes were + * requested. */ - public final void openUnwindowed(String uId, int shard, DestinationT destination) - throws Exception { + public final void openUnwindowed(String uId, int shard) throws Exception { if (getWriteOperation().windowedWrites) { throw new IllegalStateException("openUnwindowed called a windowed sink."); } - open(uId, null, null, shard, destination); + open(uId, null, null, shard); } // Helper function to close a channel, on exception cases. @@ -904,18 +855,14 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } } - private void open( - String uId, - @Nullable BoundedWindow window, - @Nullable PaneInfo paneInfo, - int shard, - DestinationT destination) - throws Exception { + private void open(String uId, + @Nullable BoundedWindow window, + @Nullable PaneInfo paneInfo, + int shard) throws Exception { this.id = uId; this.window = window; this.paneInfo = paneInfo; this.shard = shard; - this.destination = destination; ResourceId tempDirectory = getWriteOperation().tempDirectory.get(); outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE); verifyNotNull( @@ -961,7 +908,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } /** Closes the channel and returns the bundle result. */ - public final FileResult<DestinationT> close() throws Exception { + public final FileResult close() throws Exception { checkState(outputFile != null, "FileResult.close cannot be called with a null outputFile"); LOG.debug("Writing footer to {}.", outputFile); @@ -991,41 +938,35 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab throw new IOException(String.format("Failed closing channel to %s", outputFile), e); } - FileResult<DestinationT> result = - new FileResult<>(outputFile, shard, window, paneInfo, destination); + FileResult result = new FileResult(outputFile, shard, window, paneInfo); LOG.debug("Result for bundle {}: {}", this.id, outputFile); return result; } - /** Return the WriteOperation that this Writer belongs to. */ - public WriteOperation<OutputT, DestinationT> getWriteOperation() { + /** + * Return the WriteOperation that this Writer belongs to. + */ + public WriteOperation<T> getWriteOperation() { return writeOperation; } } /** - * Result of a single bundle write. Contains the filename produced by the bundle, and if known the - * final output filename. + * Result of a single bundle write. Contains the filename produced by the bundle, and if known + * the final output filename. */ - public static final class FileResult<DestinationT> { + public static final class FileResult { private final ResourceId tempFilename; private final int shard; private final BoundedWindow window; private final PaneInfo paneInfo; - private final DestinationT destination; @Experimental(Kind.FILESYSTEM) - public FileResult( - ResourceId tempFilename, - int shard, - BoundedWindow window, - PaneInfo paneInfo, - DestinationT destination) { + public FileResult(ResourceId tempFilename, int shard, BoundedWindow window, PaneInfo paneInfo) { this.tempFilename = tempFilename; this.shard = shard; this.window = window; this.paneInfo = paneInfo; - this.destination = destination; } @Experimental(Kind.FILESYSTEM) @@ -1037,8 +978,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab return shard; } - public FileResult<DestinationT> withShard(int shard) { - return new FileResult<>(tempFilename, shard, window, paneInfo, destination); + public FileResult withShard(int shard) { + return new FileResult(tempFilename, shard, window, paneInfo); } public BoundedWindow getWindow() { @@ -1049,24 +990,17 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab return paneInfo; } - public DestinationT getDestination() { - return destination; - } - @Experimental(Kind.FILESYSTEM) - public ResourceId getDestinationFile( - DynamicDestinations<?, DestinationT> dynamicDestinations, - int numShards, - OutputFileHints outputFileHints) { + public ResourceId getDestinationFile(FilenamePolicy policy, ResourceId outputDirectory, + int numShards, String extension) { checkArgument(getShard() != UNKNOWN_SHARDNUM); checkArgument(numShards > 0); - FilenamePolicy policy = dynamicDestinations.getFilenamePolicy(destination); if (getWindow() != null) { - return policy.windowedFilename( - new WindowedContext(getWindow(), getPaneInfo(), getShard(), numShards), - outputFileHints); + return policy.windowedFilename(outputDirectory, new WindowedContext( + getWindow(), getPaneInfo(), getShard(), numShards), extension); } else { - return policy.unwindowedFilename(new Context(getShard(), numShards), outputFileHints); + return policy.unwindowedFilename(outputDirectory, new Context(getShard(), numShards), + extension); } } @@ -1080,24 +1014,22 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } } - /** A coder for {@link FileResult} objects. */ - public static final class FileResultCoder<DestinationT> - extends StructuredCoder<FileResult<DestinationT>> { + /** + * A coder for {@link FileResult} objects. + */ + public static final class FileResultCoder extends StructuredCoder<FileResult> { private static final Coder<String> FILENAME_CODER = StringUtf8Coder.of(); private static final Coder<Integer> SHARD_CODER = VarIntCoder.of(); private static final Coder<PaneInfo> PANE_INFO_CODER = NullableCoder.of(PaneInfoCoder.INSTANCE); + private final Coder<BoundedWindow> windowCoder; - private final Coder<DestinationT> destinationCoder; - protected FileResultCoder( - Coder<BoundedWindow> windowCoder, Coder<DestinationT> destinationCoder) { + protected FileResultCoder(Coder<BoundedWindow> windowCoder) { this.windowCoder = NullableCoder.of(windowCoder); - this.destinationCoder = destinationCoder; } - public static <DestinationT> FileResultCoder<DestinationT> of( - Coder<BoundedWindow> windowCoder, Coder<DestinationT> destinationCoder) { - return new FileResultCoder<>(windowCoder, destinationCoder); + public static FileResultCoder of(Coder<BoundedWindow> windowCoder) { + return new FileResultCoder(windowCoder); } @Override @@ -1106,7 +1038,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } @Override - public void encode(FileResult<DestinationT> value, OutputStream outStream) throws IOException { + public void encode(FileResult value, OutputStream outStream) + throws IOException { if (value == null) { throw new CoderException("cannot encode a null value"); } @@ -1114,22 +1047,17 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab windowCoder.encode(value.getWindow(), outStream); PANE_INFO_CODER.encode(value.getPaneInfo(), outStream); SHARD_CODER.encode(value.getShard(), outStream); - destinationCoder.encode(value.getDestination(), outStream); } @Override - public FileResult<DestinationT> decode(InputStream inStream) throws IOException { + public FileResult decode(InputStream inStream) + throws IOException { String tempFilename = FILENAME_CODER.decode(inStream); BoundedWindow window = windowCoder.decode(inStream); PaneInfo paneInfo = PANE_INFO_CODER.decode(inStream); int shard = SHARD_CODER.decode(inStream); - DestinationT destination = destinationCoder.decode(inStream); - return new FileResult<>( - FileSystems.matchNewResource(tempFilename, false /* isDirectory */), - shard, - window, - paneInfo, - destination); + return new FileResult(FileSystems.matchNewResource(tempFilename, false /* isDirectory */), + shard, window, paneInfo); } @Override @@ -1138,15 +1066,25 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab windowCoder.verifyDeterministic(); PANE_INFO_CODER.verifyDeterministic(); SHARD_CODER.verifyDeterministic(); - destinationCoder.verifyDeterministic(); } } /** - * Provides hints about how to generate output files, such as a suggested filename suffix (e.g. - * based on the compression type), and the file MIME type. + * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink} + * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that + * would normally be written directly to the {@link WritableByteChannel} passed into + * {@link WritableByteChannelFactory#create(WritableByteChannel)}. + * + * <p>Subclasses should override {@link #toString()} with something meaningful, as it is used when + * building {@link DisplayData}. */ - public interface OutputFileHints extends Serializable { + public interface WritableByteChannelFactory extends Serializable { + /** + * @param channel the {@link WritableByteChannel} to wrap + * @return the {@link WritableByteChannel} to be used during output + */ + WritableByteChannel create(WritableByteChannel channel) throws IOException; + /** * Returns the MIME type that should be used for the files that will hold the output data. May * return {@code null} if this {@code WritableByteChannelFactory} does not meaningfully change @@ -1163,23 +1101,6 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * @return an optional filename suffix, eg, ".gz" is returned by {@link CompressionType#GZIP} */ @Nullable - String getSuggestedFilenameSuffix(); - } - - /** - * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink} - * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that - * would normally be written directly to the {@link WritableByteChannel} passed into {@link - * WritableByteChannelFactory#create(WritableByteChannel)}. - * - * <p>Subclasses should override {@link #toString()} with something meaningful, as it is used when - * building {@link DisplayData}. - */ - public interface WritableByteChannelFactory extends OutputFileHints { - /** - * @param channel the {@link WritableByteChannel} to wrap - * @return the {@link WritableByteChannel} to be used during output - */ - WritableByteChannel create(WritableByteChannel channel) throws IOException; + String getFilenameSuffix(); } }
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java index c3687a9..05f0d97 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; -import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.io.range.OffsetRangeTracker; import org.apache.beam.sdk.io.range.RangeTracker; import org.apache.beam.sdk.options.PipelineOptions; @@ -111,7 +110,8 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> { @Override public List<? extends OffsetBasedSource<T>> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { - // Split the range into bundles based on the desiredBundleSizeBytes. If the desired bundle + // Split the range into bundles based on the desiredBundleSizeBytes. Final bundle is adjusted to + // make sure that we do not end up with a too small bundle at the end. If the desired bundle // size is smaller than the minBundleSize of the source then minBundleSize will be used instead. long desiredBundleSizeOffsetUnits = Math.max( @@ -119,10 +119,20 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> { minBundleSize); List<OffsetBasedSource<T>> subSources = new ArrayList<>(); - for (OffsetRange range : - new OffsetRange(startOffset, Math.min(endOffset, getMaxEndOffset(options))) - .split(desiredBundleSizeOffsetUnits, minBundleSize)) { - subSources.add(createSourceForSubrange(range.getFrom(), range.getTo())); + long start = startOffset; + long maxEnd = Math.min(endOffset, getMaxEndOffset(options)); + + while (start < maxEnd) { + long end = start + desiredBundleSizeOffsetUnits; + end = Math.min(end, maxEnd); + // Avoid having a too small bundle at the end and ensure that we respect minBundleSize. + long remaining = maxEnd - end; + if ((remaining < desiredBundleSizeOffsetUnits / 4) || (remaining < minBundleSize)) { + end = maxEnd; + } + subSources.add(createSourceForSubrange(start, end)); + + start = end; } return subSources; } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java index 6e7b243..e288075 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java @@ -45,7 +45,6 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PBegin; @@ -356,11 +355,12 @@ public class TFRecordIO { public PDone expand(PCollection<byte[]> input) { checkState(getOutputPrefix() != null, "need to set the output prefix of a TFRecordIO.Write transform"); - WriteFiles<byte[], Void, byte[]> write = - WriteFiles.<byte[], Void, byte[]>to( + WriteFiles<byte[]> write = WriteFiles.to( new TFRecordSink( - getOutputPrefix(), getShardTemplate(), getFilenameSuffix(), getCompressionType()), - SerializableFunctions.<byte[]>identity()); + getOutputPrefix(), + getShardTemplate(), + getFilenameSuffix(), + getCompressionType())); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -546,20 +546,20 @@ public class TFRecordIO { } } - /** A {@link FileBasedSink} for TFRecord files. Produces TFRecord files. */ + /** + * A {@link FileBasedSink} for TFRecord files. Produces TFRecord files. + */ @VisibleForTesting - static class TFRecordSink extends FileBasedSink<byte[], Void> { + static class TFRecordSink extends FileBasedSink<byte[]> { @VisibleForTesting - TFRecordSink( - ValueProvider<ResourceId> outputPrefix, + TFRecordSink(ValueProvider<ResourceId> outputPrefix, @Nullable String shardTemplate, @Nullable String suffix, TFRecordIO.CompressionType compressionType) { super( outputPrefix, - DynamicFileDestinations.constant( - DefaultFilenamePolicy.fromStandardParameters( - outputPrefix, shardTemplate, suffix, false)), + DefaultFilenamePolicy.constructUsingStandardParameters( + outputPrefix, shardTemplate, suffix, false), writableByteChannelFactory(compressionType)); } @@ -571,7 +571,7 @@ public class TFRecordIO { } @Override - public WriteOperation<byte[], Void> createWriteOperation() { + public WriteOperation<byte[]> createWriteOperation() { return new TFRecordWriteOperation(this); } @@ -590,24 +590,30 @@ public class TFRecordIO { return CompressionType.UNCOMPRESSED; } - /** A {@link WriteOperation WriteOperation} for TFRecord files. */ - private static class TFRecordWriteOperation extends WriteOperation<byte[], Void> { + /** + * A {@link WriteOperation + * WriteOperation} for TFRecord files. + */ + private static class TFRecordWriteOperation extends WriteOperation<byte[]> { private TFRecordWriteOperation(TFRecordSink sink) { super(sink); } @Override - public Writer<byte[], Void> createWriter() throws Exception { + public Writer<byte[]> createWriter() throws Exception { return new TFRecordWriter(this); } } - /** A {@link Writer Writer} for TFRecord files. */ - private static class TFRecordWriter extends Writer<byte[], Void> { + /** + * A {@link Writer Writer} + * for TFRecord files. + */ + private static class TFRecordWriter extends Writer<byte[]> { private WritableByteChannel outChannel; private TFRecordCodec codec; - private TFRecordWriter(WriteOperation<byte[], Void> writeOperation) { + private TFRecordWriter(WriteOperation<byte[]> writeOperation) { super(writeOperation, MimeTypes.BINARY); }
