http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java new file mode 100644 index 0000000..e7ef0f6 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io; + +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; +import org.apache.beam.sdk.io.DefaultFilenamePolicy.ParamsCoder; +import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; + +/** Some helper classes that derive from {@link FileBasedSink.DynamicDestinations}. */ +public class DynamicFileDestinations { + /** Always returns a constant {@link FilenamePolicy}. */ + private static class ConstantFilenamePolicy<T> extends DynamicDestinations<T, Void> { + private final FilenamePolicy filenamePolicy; + + public ConstantFilenamePolicy(FilenamePolicy filenamePolicy) { + this.filenamePolicy = filenamePolicy; + } + + @Override + public Void getDestination(T element) { + return (Void) null; + } + + @Override + public Coder<Void> getDestinationCoder() { + return null; + } + + @Override + public Void getDefaultDestination() { + return (Void) null; + } + + @Override + public FilenamePolicy getFilenamePolicy(Void destination) { + return filenamePolicy; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + filenamePolicy.populateDisplayData(builder); + } + } + + /** + * A base class for a {@link DynamicDestinations} object that returns differently-configured + * instances of {@link DefaultFilenamePolicy}. + */ + private static class DefaultPolicyDestinations<UserT> extends DynamicDestinations<UserT, Params> { + SerializableFunction<UserT, Params> destinationFunction; + Params emptyDestination; + + public DefaultPolicyDestinations( + SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) { + this.destinationFunction = destinationFunction; + this.emptyDestination = emptyDestination; + } + + @Override + public Params getDestination(UserT element) { + return destinationFunction.apply(element); + } + + @Override + public Params getDefaultDestination() { + return emptyDestination; + } + + @Nullable + @Override + public Coder<Params> getDestinationCoder() { + return ParamsCoder.of(); + } + + @Override + public FilenamePolicy getFilenamePolicy(DefaultFilenamePolicy.Params params) { + return DefaultFilenamePolicy.fromParams(params); + } + } + + /** Returns a {@link DynamicDestinations} that always returns the same {@link FilenamePolicy}. */ + public static <T> DynamicDestinations<T, Void> constant(FilenamePolicy filenamePolicy) { + return new ConstantFilenamePolicy<>(filenamePolicy); + } + + /** + * Returns a {@link DynamicDestinations} that returns instances of {@link DefaultFilenamePolicy} + * configured with the given {@link Params}. + */ + public static <UserT> DynamicDestinations<UserT, Params> toDefaultPolicies( + SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) { + return new DefaultPolicyDestinations<>(destinationFunction, emptyDestination); + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 8102316..583af60 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; +import java.lang.reflect.TypeVariable; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.ArrayList; @@ -49,8 +50,10 @@ import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.StructuredCoder; @@ -73,6 +76,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder; import org.apache.beam.sdk.util.MimeTypes; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; import org.joda.time.Instant; @@ -82,43 +86,43 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Abstract class for file-based output. An implementation of FileBasedSink writes file-based - * output and defines the format of output files (how values are written, headers/footers, MIME - * type, etc.). + * Abstract class for file-based output. An implementation of FileBasedSink writes file-based output + * and defines the format of output files (how values are written, headers/footers, MIME type, + * etc.). * * <p>At pipeline construction time, the methods of FileBasedSink are called to validate the sink * and to create a {@link WriteOperation} that manages the process of writing to the sink. * * <p>The process of writing to file-based sink is as follows: + * * <ol> - * <li>An optional subclass-defined initialization, - * <li>a parallel write of bundles to temporary files, and finally, - * <li>these temporary files are renamed with final output filenames. + * <li>An optional subclass-defined initialization, + * <li>a parallel write of bundles to temporary files, and finally, + * <li>these temporary files are renamed with final output filenames. * </ol> * * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the * event of failure/retry or for redundancy). However, exactly one of these executions will have its - * result passed to the finalize method. Each call to {@link Writer#openWindowed} - * or {@link Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called - * by the WriteFiles transform, so even redundant or retried bundles will have a unique way of - * identifying - * their output. + * result passed to the finalize method. Each call to {@link Writer#openWindowed} or {@link + * Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the WriteFiles + * transform, so even redundant or retried bundles will have a unique way of identifying their + * output. * * <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness * guarantee is important; if a bundle is to be output to a file, for example, the name of the file * will encode the unique bundle id to avoid conflicts with other writers. * - * {@link FileBasedSink} can take a custom {@link FilenamePolicy} object to determine output - * filenames, and this policy object can be used to write windowed or triggered - * PCollections into separate files per window pane. This allows file output from unbounded - * PCollections, and also works for bounded PCollecctions. + * <p>{@link FileBasedSink} can take a custom {@link FilenamePolicy} object to determine output + * filenames, and this policy object can be used to write windowed or triggered PCollections into + * separate files per window pane. This allows file output from unbounded PCollections, and also + * works for bounded PCollecctions. * * <p>Supported file systems are those registered with {@link FileSystems}. * - * @param <T> the type of values written to the sink. + * @param <OutputT> the type of values written to the sink. */ @Experimental(Kind.FILESYSTEM) -public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { +public abstract class FileBasedSink<OutputT, DestinationT> implements Serializable, HasDisplayData { private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class); /** @@ -173,7 +177,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { } @Override - public String getFilenameSuffix() { + public String getSuggestedFilenameSuffix() { return filenameSuffix; } @@ -205,6 +209,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { } } + private final DynamicDestinations<?, DestinationT> dynamicDestinations; + /** * The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the * underlying channel. The default is to not compress the output using @@ -213,8 +219,70 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { private final WritableByteChannelFactory writableByteChannelFactory; /** - * A naming policy for output files. + * A class that allows value-dependent writes in {@link FileBasedSink}. + * + * <p>Users can define a custom type to represent destinations, and provide a mapping to turn this + * destination type into an instance of {@link FilenamePolicy}. */ + @Experimental(Kind.FILESYSTEM) + public abstract static class DynamicDestinations<UserT, DestinationT> + implements HasDisplayData, Serializable { + /** + * Returns an object that represents at a high level the destination being written to. May not + * return null. + */ + public abstract DestinationT getDestination(UserT element); + + /** + * Returns the default destination. This is used for collections that have no elements as the + * destination to write empty files to. + */ + public abstract DestinationT getDefaultDestination(); + + /** + * Returns the coder for {@link DestinationT}. If this is not overridden, then the coder + * registry will be use to find a suitable coder. This must be a deterministic coder, as {@link + * DestinationT} will be used as a key type in a {@link + * org.apache.beam.sdk.transforms.GroupByKey}. + */ + @Nullable + public Coder<DestinationT> getDestinationCoder() { + return null; + } + + /** Converts a destination into a {@link FilenamePolicy}. May not return null. */ + public abstract FilenamePolicy getFilenamePolicy(DestinationT destination); + + /** Populates the display data. */ + @Override + public void populateDisplayData(DisplayData.Builder builder) {} + + // Gets the destination coder. If the user does not provide one, try to find one in the coder + // registry. If no coder can be found, throws CannotProvideCoderException. + final Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry) + throws CannotProvideCoderException { + Coder<DestinationT> destinationCoder = getDestinationCoder(); + if (destinationCoder != null) { + return destinationCoder; + } + // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry. + // We must first use reflection to figure out what the type parameter is. + TypeDescriptor<?> superDescriptor = + TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class); + if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) { + throw new AssertionError( + "Couldn't find the DynamicDestinations superclass of " + this.getClass()); + } + TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT"); + @SuppressWarnings("unchecked") + TypeDescriptor<DestinationT> descriptor = + (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable); + return registry.getCoder(descriptor); + } + } + + /** A naming policy for output files. */ + @Experimental(Kind.FILESYSTEM) public abstract static class FilenamePolicy implements Serializable { /** * Context used for generating a name based on shard number, and num shards. @@ -287,29 +355,28 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { /** * When a sink has requested windowed or triggered output, this method will be invoked to return * the file {@link ResourceId resource} to be created given the base output directory and a - * (possibly empty) extension from {@link FileBasedSink} configuration - * (e.g., {@link CompressionType}). + * {@link OutputFileHints} containing information about the file, including a suggested + * extension (e.g. coming from {@link CompressionType}). * - * <p>The {@link WindowedContext} object gives access to the window and pane, - * as well as sharding information. The policy must return unique and consistent filenames - * for different windows and panes. + * <p>The {@link WindowedContext} object gives access to the window and pane, as well as + * sharding information. The policy must return unique and consistent filenames for different + * windows and panes. */ @Experimental(Kind.FILESYSTEM) - public abstract ResourceId windowedFilename( - ResourceId outputDirectory, WindowedContext c, String extension); + public abstract ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints); /** * When a sink has not requested windowed or triggered output, this method will be invoked to * return the file {@link ResourceId resource} to be created given the base output directory and - * a (possibly empty) extension applied by additional {@link FileBasedSink} configuration - * (e.g., {@link CompressionType}). + * a {@link OutputFileHints} containing information about the file, including a suggested (e.g. + * coming from {@link CompressionType}). * * <p>The {@link Context} object only provides sharding information, which is used by the policy * to generate unique and consistent filenames. */ @Experimental(Kind.FILESYSTEM) - @Nullable public abstract ResourceId unwindowedFilename( - ResourceId outputDirectory, Context c, String extension); + @Nullable + public abstract ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints); /** * Populates the display data. @@ -318,19 +385,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { } } - /** The policy used to generate names of files to be produced. */ - private final FilenamePolicy filenamePolicy; /** The directory to which files will be written. */ - private final ValueProvider<ResourceId> baseOutputDirectoryProvider; - - /** - * Construct a {@link FileBasedSink} with the given filename policy, producing uncompressed files. - */ - @Experimental(Kind.FILESYSTEM) - public FileBasedSink( - ValueProvider<ResourceId> baseOutputDirectoryProvider, FilenamePolicy filenamePolicy) { - this(baseOutputDirectoryProvider, filenamePolicy, CompressionType.UNCOMPRESSED); - } + private final ValueProvider<ResourceId> tempDirectoryProvider; private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> { @Override @@ -340,95 +396,91 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { } /** - * Construct a {@link FileBasedSink} with the given filename policy and output channel type. + * Construct a {@link FileBasedSink} with the given temp directory, producing uncompressed files. */ @Experimental(Kind.FILESYSTEM) public FileBasedSink( - ValueProvider<ResourceId> baseOutputDirectoryProvider, - FilenamePolicy filenamePolicy, + ValueProvider<ResourceId> tempDirectoryProvider, + DynamicDestinations<?, DestinationT> dynamicDestinations) { + this(tempDirectoryProvider, dynamicDestinations, CompressionType.UNCOMPRESSED); + } + + /** Construct a {@link FileBasedSink} with the given temp directory and output channel type. */ + @Experimental(Kind.FILESYSTEM) + public FileBasedSink( + ValueProvider<ResourceId> tempDirectoryProvider, + DynamicDestinations<?, DestinationT> dynamicDestinations, WritableByteChannelFactory writableByteChannelFactory) { - this.baseOutputDirectoryProvider = - NestedValueProvider.of(baseOutputDirectoryProvider, new ExtractDirectory()); - this.filenamePolicy = filenamePolicy; + this.tempDirectoryProvider = + NestedValueProvider.of(tempDirectoryProvider, new ExtractDirectory()); + this.dynamicDestinations = checkNotNull(dynamicDestinations); this.writableByteChannelFactory = writableByteChannelFactory; } - /** - * Returns the base directory inside which files will be written according to the configured - * {@link FilenamePolicy}. - */ - @Experimental(Kind.FILESYSTEM) - public ValueProvider<ResourceId> getBaseOutputDirectoryProvider() { - return baseOutputDirectoryProvider; + /** Return the {@link DynamicDestinations} used. */ + @SuppressWarnings("unchecked") + public <UserT> DynamicDestinations<UserT, DestinationT> getDynamicDestinations() { + return (DynamicDestinations<UserT, DestinationT>) dynamicDestinations; } /** - * Returns the policy by which files will be named inside of the base output directory. Note that - * the {@link FilenamePolicy} may itself specify one or more inner directories before each output - * file, say when writing windowed outputs in a {@code output/YYYY/MM/DD/file.txt} format. + * Returns the directory inside which temprary files will be written according to the configured + * {@link FilenamePolicy}. */ @Experimental(Kind.FILESYSTEM) - public final FilenamePolicy getFilenamePolicy() { - return filenamePolicy; + public ValueProvider<ResourceId> getTempDirectoryProvider() { + return tempDirectoryProvider; } public void validate(PipelineOptions options) {} - /** - * Return a subclass of {@link WriteOperation} that will manage the write - * to the sink. - */ - public abstract WriteOperation<T> createWriteOperation(); + /** Return a subclass of {@link WriteOperation} that will manage the write to the sink. */ + public abstract WriteOperation<OutputT, DestinationT> createWriteOperation(); public void populateDisplayData(DisplayData.Builder builder) { - getFilenamePolicy().populateDisplayData(builder); + getDynamicDestinations().populateDisplayData(builder); } /** * Abstract operation that manages the process of writing to {@link FileBasedSink}. * - * <p>The primary responsibilities of the WriteOperation is the management of output - * files. During a write, {@link Writer}s write bundles to temporary file - * locations. After the bundles have been written, + * <p>The primary responsibilities of the WriteOperation is the management of output files. During + * a write, {@link Writer}s write bundles to temporary file locations. After the bundles have been + * written, + * * <ol> - * <li>{@link WriteOperation#finalize} is given a list of the temporary - * files containing the output bundles. - * <li>During finalize, these temporary files are copied to final output locations and named - * according to a file naming template. - * <li>Finally, any temporary files that were created during the write are removed. + * <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the + * output bundles. + * <li>During finalize, these temporary files are copied to final output locations and named + * according to a file naming template. + * <li>Finally, any temporary files that were created during the write are removed. * </ol> * - * <p>Subclass implementations of WriteOperation must implement - * {@link WriteOperation#createWriter} to return a concrete - * FileBasedSinkWriter. + * <p>Subclass implementations of WriteOperation must implement {@link + * WriteOperation#createWriter} to return a concrete FileBasedSinkWriter. * - * <h2>Temporary and Output File Naming:</h2> During the write, bundles are written to temporary - * files using the tempDirectory that can be provided via the constructor of - * WriteOperation. These temporary files will be named - * {@code {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle. - * For example, if tempDirectory is "gs://my-bucket/my_temp_output", the output for a - * bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output/15723". + * <h2>Temporary and Output File Naming:</h2> * - * <p>Final output files are written to baseOutputFilename with the format - * {@code {baseOutputFilename}-0000i-of-0000n.{extension}} where n is the total number of bundles - * written and extension is the file extension. Both baseOutputFilename and extension are required - * constructor arguments. + * <p>During the write, bundles are written to temporary files using the tempDirectory that can be + * provided via the constructor of WriteOperation. These temporary files will be named {@code + * {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle. For example, if + * tempDirectory is "gs://my-bucket/my_temp_output", the output for a bundle with bundle id 15723 + * will be "gs://my-bucket/my_temp_output/15723". * - * <p>Subclass implementations can change the file naming template by supplying a value for - * fileNamingTemplate. + * <p>Final output files are written to the location specified by the {@link FilenamePolicy}. If + * no filename policy is specified, then the {@link DefaultFilenamePolicy} will be used. The + * directory that the files are written to is determined by the {@link FilenamePolicy} instance. * * <p>Note that in the case of permanent failure of a bundle's write, no clean up of temporary * files will occur. * * <p>If there are no elements in the PCollection being written, no output will be generated. * - * @param <T> the type of values written to the sink. + * @param <OutputT> the type of values written to the sink. */ - public abstract static class WriteOperation<T> implements Serializable { - /** - * The Sink that this WriteOperation will write to. - */ - protected final FileBasedSink<T> sink; + public abstract static class WriteOperation<OutputT, DestinationT> implements Serializable { + /** The Sink that this WriteOperation will write to. */ + protected final FileBasedSink<OutputT, DestinationT> sink; /** Directory for temporary output files. */ protected final ValueProvider<ResourceId> tempDirectory; @@ -445,17 +497,19 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { } /** - * Constructs a WriteOperation using the default strategy for generating a temporary - * directory from the base output filename. + * Constructs a WriteOperation using the default strategy for generating a temporary directory + * from the base output filename. * - * <p>Default is a uniquely named sibling of baseOutputFilename, e.g. if baseOutputFilename is - * /path/to/foo, the temporary directory will be /path/to/temp-beam-foo-$date. + * <p>Default is a uniquely named subdirectory of the provided tempDirectory, e.g. if + * tempDirectory is /path/to/foo/, the temporary directory will be + * /path/to/foo/temp-beam-foo-$date. * * @param sink the FileBasedSink that will be used to configure this write operation. */ - public WriteOperation(FileBasedSink<T> sink) { - this(sink, NestedValueProvider.of( - sink.getBaseOutputDirectoryProvider(), new TemporaryDirectoryBuilder())); + public WriteOperation(FileBasedSink<OutputT, DestinationT> sink) { + this( + sink, + NestedValueProvider.of(sink.getTempDirectoryProvider(), new TemporaryDirectoryBuilder())); } private static class TemporaryDirectoryBuilder @@ -471,10 +525,12 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { private final Long tempId = TEMP_COUNT.getAndIncrement(); @Override - public ResourceId apply(ResourceId baseOutputDirectory) { + public ResourceId apply(ResourceId tempDirectory) { // Temp directory has a timestamp and a unique ID String tempDirName = String.format(".temp-beam-%s-%s", timestamp, tempId); - return baseOutputDirectory.resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY); + return tempDirectory + .getCurrentDirectory() + .resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY); } } @@ -485,22 +541,22 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { * @param tempDirectory the base directory to be used for temporary output files. */ @Experimental(Kind.FILESYSTEM) - public WriteOperation(FileBasedSink<T> sink, ResourceId tempDirectory) { + public WriteOperation(FileBasedSink<OutputT, DestinationT> sink, ResourceId tempDirectory) { this(sink, StaticValueProvider.of(tempDirectory)); } private WriteOperation( - FileBasedSink<T> sink, ValueProvider<ResourceId> tempDirectory) { + FileBasedSink<OutputT, DestinationT> sink, ValueProvider<ResourceId> tempDirectory) { this.sink = sink; this.tempDirectory = tempDirectory; this.windowedWrites = false; } /** - * Clients must implement to return a subclass of {@link Writer}. This - * method must not mutate the state of the object. + * Clients must implement to return a subclass of {@link Writer}. This method must not mutate + * the state of the object. */ - public abstract Writer<T> createWriter() throws Exception; + public abstract Writer<OutputT, DestinationT> createWriter() throws Exception; /** * Indicates that the operation will be performing windowed writes. @@ -514,8 +570,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { * removing temporary files. * * <p>Finalization may be overridden by subclass implementations to perform customized - * finalization (e.g., initiating some operation on output bundles, merging them, etc.). - * {@code writerResults} contains the filenames of written bundles. + * finalization (e.g., initiating some operation on output bundles, merging them, etc.). {@code + * writerResults} contains the filenames of written bundles. * * <p>If subclasses override this method, they must guarantee that its implementation is * idempotent, as it may be executed multiple times in the case of failure or for redundancy. It @@ -523,7 +579,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { * * @param writerResults the results of writes (FileResult). */ - public void finalize(Iterable<FileResult> writerResults) throws Exception { + public void finalize(Iterable<FileResult<DestinationT>> writerResults) throws Exception { // Collect names of temporary files and rename them. Map<ResourceId, ResourceId> outputFilenames = buildOutputFilenames(writerResults); copyToOutputFiles(outputFilenames); @@ -542,17 +598,14 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { @Experimental(Kind.FILESYSTEM) protected final Map<ResourceId, ResourceId> buildOutputFilenames( - Iterable<FileResult> writerResults) { + Iterable<FileResult<DestinationT>> writerResults) { int numShards = Iterables.size(writerResults); Map<ResourceId, ResourceId> outputFilenames = new HashMap<>(); - FilenamePolicy policy = getSink().getFilenamePolicy(); - ResourceId baseOutputDir = getSink().getBaseOutputDirectoryProvider().get(); - // Either all results have a shard number set (if the sink is configured with a fixed // number of shards), or they all don't (otherwise). Boolean isShardNumberSetEverywhere = null; - for (FileResult result : writerResults) { + for (FileResult<DestinationT> result : writerResults) { boolean isShardNumberSetHere = (result.getShard() != UNKNOWN_SHARDNUM); if (isShardNumberSetEverywhere == null) { isShardNumberSetEverywhere = isShardNumberSetHere; @@ -568,7 +621,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { isShardNumberSetEverywhere = true; } - List<FileResult> resultsWithShardNumbers = Lists.newArrayList(); + List<FileResult<DestinationT>> resultsWithShardNumbers = Lists.newArrayList(); if (isShardNumberSetEverywhere) { resultsWithShardNumbers = Lists.newArrayList(writerResults); } else { @@ -577,29 +630,32 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { // case of triggers, the list of FileResult objects in the Finalize iterable is not // deterministic, and might change over retries. This breaks the assumption below that // sorting the FileResult objects provides idempotency. - List<FileResult> sortedByTempFilename = + List<FileResult<DestinationT>> sortedByTempFilename = Ordering.from( - new Comparator<FileResult>() { - @Override - public int compare(FileResult first, FileResult second) { - String firstFilename = first.getTempFilename().toString(); - String secondFilename = second.getTempFilename().toString(); - return firstFilename.compareTo(secondFilename); - } - }) + new Comparator<FileResult<DestinationT>>() { + @Override + public int compare( + FileResult<DestinationT> first, FileResult<DestinationT> second) { + String firstFilename = first.getTempFilename().toString(); + String secondFilename = second.getTempFilename().toString(); + return firstFilename.compareTo(secondFilename); + } + }) .sortedCopy(writerResults); for (int i = 0; i < sortedByTempFilename.size(); i++) { resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i)); } } - for (FileResult result : resultsWithShardNumbers) { + for (FileResult<DestinationT> result : resultsWithShardNumbers) { checkArgument( result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard number on %s", result); outputFilenames.put( result.getTempFilename(), result.getDestinationFile( - policy, baseOutputDir, numShards, getSink().getExtension())); + getSink().getDynamicDestinations(), + numShards, + getSink().getWritableByteChannelFactory())); } int numDistinctShards = new HashSet<>(outputFilenames.values()).size(); @@ -615,18 +671,18 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { * * <p>Can be called from subclasses that override {@link WriteOperation#finalize}. * - * <p>Files will be named according to the file naming template. The order of the output files - * will be the same as the sorted order of the input filenames. In other words, if the input - * filenames are ["C", "A", "B"], baseOutputFilename is "file", the extension is ".txt", and - * the fileNamingTemplate is "-SSS-of-NNN", the contents of A will be copied to - * file-000-of-003.txt, the contents of B will be copied to file-001-of-003.txt, etc. + * <p>Files will be named according to the {@link FilenamePolicy}. The order of the output files + * will be the same as the sorted order of the input filenames. In other words (when using + * {@link DefaultFilenamePolicy}), if the input filenames are ["C", "A", "B"], baseFilename (int + * the policy) is "dir/file", the extension is ".txt", and the fileNamingTemplate is + * "-SSS-of-NNN", the contents of A will be copied to dir/file-000-of-003.txt, the contents of B + * will be copied to dir/file-001-of-003.txt, etc. * * @param filenames the filenames of temporary files. */ @VisibleForTesting @Experimental(Kind.FILESYSTEM) - final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames) - throws IOException { + final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames) throws IOException { int numFiles = filenames.size(); if (numFiles > 0) { LOG.debug("Copying {} files.", numFiles); @@ -698,10 +754,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { } } - /** - * Returns the FileBasedSink for this write operation. - */ - public FileBasedSink<T> getSink() { + /** Returns the FileBasedSink for this write operation. */ + public FileBasedSink<OutputT, DestinationT> getSink() { return sink; } @@ -719,33 +773,28 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { } } - /** Returns the extension that will be written to the produced files. */ - protected final String getExtension() { - String extension = MoreObjects.firstNonNull(writableByteChannelFactory.getFilenameSuffix(), ""); - if (!extension.isEmpty() && !extension.startsWith(".")) { - extension = "." + extension; - } - return extension; + /** Returns the {@link WritableByteChannelFactory} used. */ + protected final WritableByteChannelFactory getWritableByteChannelFactory() { + return writableByteChannelFactory; } /** - * Abstract writer that writes a bundle to a {@link FileBasedSink}. Subclass - * implementations provide a method that can write a single value to a - * {@link WritableByteChannel}. + * Abstract writer that writes a bundle to a {@link FileBasedSink}. Subclass implementations + * provide a method that can write a single value to a {@link WritableByteChannel}. * * <p>Subclass implementations may also override methods that write headers and footers before and * after the values in a bundle, respectively, as well as provide a MIME type for the output * channel. * - * <p>Multiple {@link Writer} instances may be created on the same worker, and therefore - * any access to static members or methods should be thread safe. + * <p>Multiple {@link Writer} instances may be created on the same worker, and therefore any + * access to static members or methods should be thread safe. * - * @param <T> the type of values to write. + * @param <OutputT> the type of values to write. */ - public abstract static class Writer<T> { + public abstract static class Writer<OutputT, DestinationT> { private static final Logger LOG = LoggerFactory.getLogger(Writer.class); - private final WriteOperation<T> writeOperation; + private final WriteOperation<OutputT, DestinationT> writeOperation; /** Unique id for this output bundle. */ private String id; @@ -753,6 +802,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { private BoundedWindow window; private PaneInfo paneInfo; private int shard = -1; + private DestinationT destination; /** The output file for this bundle. May be null if opening failed. */ private @Nullable ResourceId outputFile; @@ -772,10 +822,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { */ private final String mimeType; - /** - * Construct a new {@link Writer} that will produce files of the given MIME type. - */ - public Writer(WriteOperation<T> writeOperation, String mimeType) { + /** Construct a new {@link Writer} that will produce files of the given MIME type. */ + public Writer(WriteOperation<OutputT, DestinationT> writeOperation, String mimeType) { checkNotNull(writeOperation); this.writeOperation = writeOperation; this.mimeType = mimeType; @@ -818,28 +866,29 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { * id populated for the case of static sharding. In cases where the runner is dynamically * picking sharding, shard might be set to -1. */ - public final void openWindowed(String uId, BoundedWindow window, PaneInfo paneInfo, int shard) + public final void openWindowed( + String uId, BoundedWindow window, PaneInfo paneInfo, int shard, DestinationT destination) throws Exception { if (!getWriteOperation().windowedWrites) { throw new IllegalStateException("openWindowed called a non-windowed sink."); } - open(uId, window, paneInfo, shard); + open(uId, window, paneInfo, shard, destination); } /** * Called for each value in the bundle. */ - public abstract void write(T value) throws Exception; + public abstract void write(OutputT value) throws Exception; /** - * Similar to {@link #openWindowed} however for the case where unwindowed writes were - * requested. + * Similar to {@link #openWindowed} however for the case where unwindowed writes were requested. */ - public final void openUnwindowed(String uId, int shard) throws Exception { + public final void openUnwindowed(String uId, int shard, DestinationT destination) + throws Exception { if (getWriteOperation().windowedWrites) { throw new IllegalStateException("openUnwindowed called a windowed sink."); } - open(uId, null, null, shard); + open(uId, null, null, shard, destination); } // Helper function to close a channel, on exception cases. @@ -855,14 +904,18 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { } } - private void open(String uId, - @Nullable BoundedWindow window, - @Nullable PaneInfo paneInfo, - int shard) throws Exception { + private void open( + String uId, + @Nullable BoundedWindow window, + @Nullable PaneInfo paneInfo, + int shard, + DestinationT destination) + throws Exception { this.id = uId; this.window = window; this.paneInfo = paneInfo; this.shard = shard; + this.destination = destination; ResourceId tempDirectory = getWriteOperation().tempDirectory.get(); outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE); verifyNotNull( @@ -908,7 +961,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { } /** Closes the channel and returns the bundle result. */ - public final FileResult close() throws Exception { + public final FileResult<DestinationT> close() throws Exception { checkState(outputFile != null, "FileResult.close cannot be called with a null outputFile"); LOG.debug("Writing footer to {}.", outputFile); @@ -938,35 +991,41 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { throw new IOException(String.format("Failed closing channel to %s", outputFile), e); } - FileResult result = new FileResult(outputFile, shard, window, paneInfo); + FileResult<DestinationT> result = + new FileResult<>(outputFile, shard, window, paneInfo, destination); LOG.debug("Result for bundle {}: {}", this.id, outputFile); return result; } - /** - * Return the WriteOperation that this Writer belongs to. - */ - public WriteOperation<T> getWriteOperation() { + /** Return the WriteOperation that this Writer belongs to. */ + public WriteOperation<OutputT, DestinationT> getWriteOperation() { return writeOperation; } } /** - * Result of a single bundle write. Contains the filename produced by the bundle, and if known - * the final output filename. + * Result of a single bundle write. Contains the filename produced by the bundle, and if known the + * final output filename. */ - public static final class FileResult { + public static final class FileResult<DestinationT> { private final ResourceId tempFilename; private final int shard; private final BoundedWindow window; private final PaneInfo paneInfo; + private final DestinationT destination; @Experimental(Kind.FILESYSTEM) - public FileResult(ResourceId tempFilename, int shard, BoundedWindow window, PaneInfo paneInfo) { + public FileResult( + ResourceId tempFilename, + int shard, + BoundedWindow window, + PaneInfo paneInfo, + DestinationT destination) { this.tempFilename = tempFilename; this.shard = shard; this.window = window; this.paneInfo = paneInfo; + this.destination = destination; } @Experimental(Kind.FILESYSTEM) @@ -978,8 +1037,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { return shard; } - public FileResult withShard(int shard) { - return new FileResult(tempFilename, shard, window, paneInfo); + public FileResult<DestinationT> withShard(int shard) { + return new FileResult<>(tempFilename, shard, window, paneInfo, destination); } public BoundedWindow getWindow() { @@ -990,17 +1049,24 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { return paneInfo; } + public DestinationT getDestination() { + return destination; + } + @Experimental(Kind.FILESYSTEM) - public ResourceId getDestinationFile(FilenamePolicy policy, ResourceId outputDirectory, - int numShards, String extension) { + public ResourceId getDestinationFile( + DynamicDestinations<?, DestinationT> dynamicDestinations, + int numShards, + OutputFileHints outputFileHints) { checkArgument(getShard() != UNKNOWN_SHARDNUM); checkArgument(numShards > 0); + FilenamePolicy policy = dynamicDestinations.getFilenamePolicy(destination); if (getWindow() != null) { - return policy.windowedFilename(outputDirectory, new WindowedContext( - getWindow(), getPaneInfo(), getShard(), numShards), extension); + return policy.windowedFilename( + new WindowedContext(getWindow(), getPaneInfo(), getShard(), numShards), + outputFileHints); } else { - return policy.unwindowedFilename(outputDirectory, new Context(getShard(), numShards), - extension); + return policy.unwindowedFilename(new Context(getShard(), numShards), outputFileHints); } } @@ -1014,22 +1080,24 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { } } - /** - * A coder for {@link FileResult} objects. - */ - public static final class FileResultCoder extends StructuredCoder<FileResult> { + /** A coder for {@link FileResult} objects. */ + public static final class FileResultCoder<DestinationT> + extends StructuredCoder<FileResult<DestinationT>> { private static final Coder<String> FILENAME_CODER = StringUtf8Coder.of(); private static final Coder<Integer> SHARD_CODER = VarIntCoder.of(); private static final Coder<PaneInfo> PANE_INFO_CODER = NullableCoder.of(PaneInfoCoder.INSTANCE); - private final Coder<BoundedWindow> windowCoder; + private final Coder<DestinationT> destinationCoder; - protected FileResultCoder(Coder<BoundedWindow> windowCoder) { + protected FileResultCoder( + Coder<BoundedWindow> windowCoder, Coder<DestinationT> destinationCoder) { this.windowCoder = NullableCoder.of(windowCoder); + this.destinationCoder = destinationCoder; } - public static FileResultCoder of(Coder<BoundedWindow> windowCoder) { - return new FileResultCoder(windowCoder); + public static <DestinationT> FileResultCoder<DestinationT> of( + Coder<BoundedWindow> windowCoder, Coder<DestinationT> destinationCoder) { + return new FileResultCoder<>(windowCoder, destinationCoder); } @Override @@ -1038,8 +1106,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { } @Override - public void encode(FileResult value, OutputStream outStream) - throws IOException { + public void encode(FileResult<DestinationT> value, OutputStream outStream) throws IOException { if (value == null) { throw new CoderException("cannot encode a null value"); } @@ -1047,17 +1114,22 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { windowCoder.encode(value.getWindow(), outStream); PANE_INFO_CODER.encode(value.getPaneInfo(), outStream); SHARD_CODER.encode(value.getShard(), outStream); + destinationCoder.encode(value.getDestination(), outStream); } @Override - public FileResult decode(InputStream inStream) - throws IOException { + public FileResult<DestinationT> decode(InputStream inStream) throws IOException { String tempFilename = FILENAME_CODER.decode(inStream); BoundedWindow window = windowCoder.decode(inStream); PaneInfo paneInfo = PANE_INFO_CODER.decode(inStream); int shard = SHARD_CODER.decode(inStream); - return new FileResult(FileSystems.matchNewResource(tempFilename, false /* isDirectory */), - shard, window, paneInfo); + DestinationT destination = destinationCoder.decode(inStream); + return new FileResult<>( + FileSystems.matchNewResource(tempFilename, false /* isDirectory */), + shard, + window, + paneInfo, + destination); } @Override @@ -1066,25 +1138,15 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { windowCoder.verifyDeterministic(); PANE_INFO_CODER.verifyDeterministic(); SHARD_CODER.verifyDeterministic(); + destinationCoder.verifyDeterministic(); } } /** - * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink} - * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that - * would normally be written directly to the {@link WritableByteChannel} passed into - * {@link WritableByteChannelFactory#create(WritableByteChannel)}. - * - * <p>Subclasses should override {@link #toString()} with something meaningful, as it is used when - * building {@link DisplayData}. + * Provides hints about how to generate output files, such as a suggested filename suffix (e.g. + * based on the compression type), and the file MIME type. */ - public interface WritableByteChannelFactory extends Serializable { - /** - * @param channel the {@link WritableByteChannel} to wrap - * @return the {@link WritableByteChannel} to be used during output - */ - WritableByteChannel create(WritableByteChannel channel) throws IOException; - + public interface OutputFileHints extends Serializable { /** * Returns the MIME type that should be used for the files that will hold the output data. May * return {@code null} if this {@code WritableByteChannelFactory} does not meaningfully change @@ -1101,6 +1163,23 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { * @return an optional filename suffix, eg, ".gz" is returned by {@link CompressionType#GZIP} */ @Nullable - String getFilenameSuffix(); + String getSuggestedFilenameSuffix(); + } + + /** + * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink} + * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that + * would normally be written directly to the {@link WritableByteChannel} passed into {@link + * WritableByteChannelFactory#create(WritableByteChannel)}. + * + * <p>Subclasses should override {@link #toString()} with something meaningful, as it is used when + * building {@link DisplayData}. + */ + public interface WritableByteChannelFactory extends OutputFileHints { + /** + * @param channel the {@link WritableByteChannel} to wrap + * @return the {@link WritableByteChannel} to be used during output + */ + WritableByteChannel create(WritableByteChannel channel) throws IOException; } } http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java index e288075..6e7b243 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PBegin; @@ -355,12 +356,11 @@ public class TFRecordIO { public PDone expand(PCollection<byte[]> input) { checkState(getOutputPrefix() != null, "need to set the output prefix of a TFRecordIO.Write transform"); - WriteFiles<byte[]> write = WriteFiles.to( + WriteFiles<byte[], Void, byte[]> write = + WriteFiles.<byte[], Void, byte[]>to( new TFRecordSink( - getOutputPrefix(), - getShardTemplate(), - getFilenameSuffix(), - getCompressionType())); + getOutputPrefix(), getShardTemplate(), getFilenameSuffix(), getCompressionType()), + SerializableFunctions.<byte[]>identity()); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -546,20 +546,20 @@ public class TFRecordIO { } } - /** - * A {@link FileBasedSink} for TFRecord files. Produces TFRecord files. - */ + /** A {@link FileBasedSink} for TFRecord files. Produces TFRecord files. */ @VisibleForTesting - static class TFRecordSink extends FileBasedSink<byte[]> { + static class TFRecordSink extends FileBasedSink<byte[], Void> { @VisibleForTesting - TFRecordSink(ValueProvider<ResourceId> outputPrefix, + TFRecordSink( + ValueProvider<ResourceId> outputPrefix, @Nullable String shardTemplate, @Nullable String suffix, TFRecordIO.CompressionType compressionType) { super( outputPrefix, - DefaultFilenamePolicy.constructUsingStandardParameters( - outputPrefix, shardTemplate, suffix, false), + DynamicFileDestinations.constant( + DefaultFilenamePolicy.fromStandardParameters( + outputPrefix, shardTemplate, suffix, false)), writableByteChannelFactory(compressionType)); } @@ -571,7 +571,7 @@ public class TFRecordIO { } @Override - public WriteOperation<byte[]> createWriteOperation() { + public WriteOperation<byte[], Void> createWriteOperation() { return new TFRecordWriteOperation(this); } @@ -590,30 +590,24 @@ public class TFRecordIO { return CompressionType.UNCOMPRESSED; } - /** - * A {@link WriteOperation - * WriteOperation} for TFRecord files. - */ - private static class TFRecordWriteOperation extends WriteOperation<byte[]> { + /** A {@link WriteOperation WriteOperation} for TFRecord files. */ + private static class TFRecordWriteOperation extends WriteOperation<byte[], Void> { private TFRecordWriteOperation(TFRecordSink sink) { super(sink); } @Override - public Writer<byte[]> createWriter() throws Exception { + public Writer<byte[], Void> createWriter() throws Exception { return new TFRecordWriter(this); } } - /** - * A {@link Writer Writer} - * for TFRecord files. - */ - private static class TFRecordWriter extends Writer<byte[]> { + /** A {@link Writer Writer} for TFRecord files. */ + private static class TFRecordWriter extends Writer<byte[], Void> { private WritableByteChannel outChannel; private TFRecordCodec codec; - private TFRecordWriter(WriteOperation<byte[]> writeOperation) { + private TFRecordWriter(WriteOperation<byte[], Void> writeOperation) { super(writeOperation, MimeTypes.BINARY); } http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index f1eb7c0..5241589 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -22,12 +22,15 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; +import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.Read.Bounded; @@ -37,6 +40,7 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -65,19 +69,8 @@ import org.apache.beam.sdk.values.PDone; * <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using * {@link TextIO.Write#to(String)} to specify the output prefix of the files to write. * - * <p>By default, all input is put into the global window before writing. If per-window writes are - * desired - for example, when using a streaming runner - - * {@link TextIO.Write#withWindowedWrites()} will cause windowing and triggering to be - * preserved. When producing windowed writes, the number of output shards must be set explicitly - * using {@link TextIO.Write#withNumShards(int)}; some runners may set this for you to a - * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} can also be - * set in case you need better control over naming files created by unique windows. - * {@link DefaultFilenamePolicy} policy for producing unique filenames might not be appropriate - * for your use case. - * - * <p>Any existing files with the same names as generated output files will be overwritten. - * * <p>For example: + * * <pre>{@code * // A simple Write to a local file (only runs locally): * PCollection<String> lines = ...; @@ -85,10 +78,49 @@ import org.apache.beam.sdk.values.PDone; * * // Same as above, only with Gzip compression: * PCollection<String> lines = ...; - * lines.apply(TextIO.write().to("/path/to/file.txt")); + * lines.apply(TextIO.write().to("/path/to/file.txt")) * .withSuffix(".txt") * .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP)); * }</pre> + * + * <p>By default, all input is put into the global window before writing. If per-window writes are + * desired - for example, when using a streaming runner - {@link TextIO.Write#withWindowedWrites()} + * will cause windowing and triggering to be preserved. When producing windowed writes with a + * streaming runner that supports triggers, the number of output shards must be set explicitly using + * {@link TextIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen + * value, so you may need not set it yourself. If setting an explicit template using {@link + * TextIO.Write#withShardNameTemplate(String)}, make sure that the template contains placeholders + * for the window and the pane; W is expanded into the window text, and P into the pane; the default + * template will include both the window and the pane in the filename. + * + * <p>If you want better control over how filenames are generated than the default policy allows, a + * custom {@link FilenamePolicy} can also be set using {@link TextIO.Write#to(FilenamePolicy)}. + * + * <p>TextIO also supports dynamic, value-dependent file destinations. The most general form of this + * is done via {@link TextIO.Write#to(DynamicDestinations)}. A {@link DynamicDestinations} class + * allows you to convert any input value into a custom destination object, and map that destination + * object to a {@link FilenamePolicy}. This allows using different filename policies (or more + * commonly, differently-configured instances of the same policy) based on the input record. Often + * this is used in conjunction with {@link TextIO#writeCustomType(SerializableFunction)}, which + * allows your {@link DynamicDestinations} object to examine the input type and takes a format + * function to convert that type to a string for writing. + * + * <p>A convenience shortcut is provided for the case where the default naming policy is used, but + * different configurations of this policy are wanted based on the input record. Default naming + * policies can be configured using the {@link DefaultFilenamePolicy.Params} object. + * + * <pre>{@code + * PCollection<UserEvent>> lines = ...; + * lines.apply(TextIO.<UserEvent>writeCustomType(new FormatEvent()) + * .to(new SerializableFunction<UserEvent, Params>() { + * public String apply(UserEvent value) { + * return new Params().withBaseFilename(baseDirectory + "/" + value.country()); + * } + * }), + * new Params().withBaseFilename(baseDirectory + "/empty"); + * }</pre> + * + * <p>Any existing files with the same names as generated output files will be overwritten. */ public class TextIO { /** @@ -105,11 +137,29 @@ public class TextIO { * line. */ public static Write write() { - return new AutoValue_TextIO_Write.Builder() + return new TextIO.Write(); + } + + /** + * A {@link PTransform} that writes a {@link PCollection} to a text file (or multiple text files + * matching a sharding pattern), with each element of the input collection encoded into its own + * line. + * + * <p>This version allows you to apply {@link TextIO} writes to a PCollection of a custom type + * {@link T}, along with a format function that converts the input type {@link T} to the String + * that will be written to the file. The advantage of this is it allows a user-provided {@link + * DynamicDestinations} object, set via {@link Write#to(DynamicDestinations)} to examine the + * user's custom type when choosing a destination. + */ + public static <T> TypedWrite<T> writeCustomType(SerializableFunction<T, String> formatFunction) { + return new AutoValue_TextIO_TypedWrite.Builder<T>() .setFilenamePrefix(null) + .setTempDirectory(null) .setShardTemplate(null) .setFilenameSuffix(null) .setFilenamePolicy(null) + .setDynamicDestinations(null) + .setFormatFunction(formatFunction) .setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED) .setWindowedWrites(false) .setNumShards(0) @@ -223,18 +273,21 @@ public class TextIO { } } - - ///////////////////////////////////////////////////////////////////////////// + // /////////////////////////////////////////////////////////////////////////// /** Implementation of {@link #write}. */ @AutoValue - public abstract static class Write extends PTransform<PCollection<String>, PDone> { + public abstract static class TypedWrite<T> extends PTransform<PCollection<T>, PDone> { /** The prefix of each file written, combined with suffix and shardTemplate. */ @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix(); /** The suffix of each file written, combined with prefix and shardTemplate. */ @Nullable abstract String getFilenameSuffix(); + /** The base directory used for generating temporary files. */ + @Nullable + abstract ValueProvider<ResourceId> getTempDirectory(); + /** An optional header to add to each file. */ @Nullable abstract String getHeader(); @@ -250,6 +303,13 @@ public class TextIO { /** A policy for naming output files. */ @Nullable abstract FilenamePolicy getFilenamePolicy(); + /** Allows for value-dependent {@link DynamicDestinations} to be vended. */ + @Nullable + abstract DynamicDestinations<T, ?> getDynamicDestinations(); + + /** A function that converts T to a String, for writing to the file. */ + abstract SerializableFunction<T, String> getFormatFunction(); + /** Whether to write windowed output files. */ abstract boolean getWindowedWrites(); @@ -259,66 +319,68 @@ public class TextIO { */ abstract WritableByteChannelFactory getWritableByteChannelFactory(); - abstract Builder toBuilder(); + abstract Builder<T> toBuilder(); @AutoValue.Builder - abstract static class Builder { - abstract Builder setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix); - abstract Builder setShardTemplate(@Nullable String shardTemplate); - abstract Builder setFilenameSuffix(@Nullable String filenameSuffix); - abstract Builder setHeader(@Nullable String header); - abstract Builder setFooter(@Nullable String footer); - abstract Builder setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy); - abstract Builder setNumShards(int numShards); - abstract Builder setWindowedWrites(boolean windowedWrites); - abstract Builder setWritableByteChannelFactory( + abstract static class Builder<T> { + abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix); + + abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> tempDirectory); + + abstract Builder<T> setShardTemplate(@Nullable String shardTemplate); + + abstract Builder<T> setFilenameSuffix(@Nullable String filenameSuffix); + + abstract Builder<T> setHeader(@Nullable String header); + + abstract Builder<T> setFooter(@Nullable String footer); + + abstract Builder<T> setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy); + + abstract Builder<T> setDynamicDestinations( + @Nullable DynamicDestinations<T, ?> dynamicDestinations); + + abstract Builder<T> setFormatFunction(SerializableFunction<T, String> formatFunction); + + abstract Builder<T> setNumShards(int numShards); + + abstract Builder<T> setWindowedWrites(boolean windowedWrites); + + abstract Builder<T> setWritableByteChannelFactory( WritableByteChannelFactory writableByteChannelFactory); - abstract Write build(); + abstract TypedWrite<T> build(); } /** - * Writes to text files with the given prefix. The given {@code prefix} can reference any - * {@link FileSystem} on the classpath. - * - * <p>The name of the output files will be determined by the {@link FilenamePolicy} used. + * Writes to text files with the given prefix. The given {@code prefix} can reference any {@link + * FileSystem} on the classpath. This prefix is used by the {@link DefaultFilenamePolicy} to + * generate filenames. * * <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix - * to define the base output directory and file prefix, a shard identifier (see - * {@link #withNumShards(int)}), and a common suffix (if supplied using - * {@link #withSuffix(String)}). + * to define the base output directory and file prefix, a shard identifier (see {@link + * #withNumShards(int)}), and a common suffix (if supplied using {@link #withSuffix(String)}). + * + * <p>This default policy can be overridden using {@link #to(FilenamePolicy)}, in which case + * {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should not be set. + * Custom filename policies do not automatically see this prefix - you should explicitly pass + * the prefix into your {@link FilenamePolicy} object if you need this. * - * <p>This default policy can be overridden using {@link #withFilenamePolicy(FilenamePolicy)}, - * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should - * not be set. + * <p>If {@link #withTempDirectory} has not been called, this filename prefix will be used to + * infer a directory for temporary files. */ - public Write to(String filenamePrefix) { + public TypedWrite<T> to(String filenamePrefix) { return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix)); } - /** - * Writes to text files with prefix from the given resource. - * - * <p>The name of the output files will be determined by the {@link FilenamePolicy} used. - * - * <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix - * to define the base output directory and file prefix, a shard identifier (see - * {@link #withNumShards(int)}), and a common suffix (if supplied using - * {@link #withSuffix(String)}). - * - * <p>This default policy can be overridden using {@link #withFilenamePolicy(FilenamePolicy)}, - * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should - * not be set. - */ + /** Like {@link #to(String)}. */ @Experimental(Kind.FILESYSTEM) - public Write to(ResourceId filenamePrefix) { + public TypedWrite<T> to(ResourceId filenamePrefix) { return toResource(StaticValueProvider.of(filenamePrefix)); } - /** - * Like {@link #to(String)}. - */ - public Write to(ValueProvider<String> outputPrefix) { + /** Like {@link #to(String)}. */ + public TypedWrite<T> to(ValueProvider<String> outputPrefix) { return toResource(NestedValueProvider.of(outputPrefix, new SerializableFunction<String, ResourceId>() { @Override @@ -329,43 +391,77 @@ public class TextIO { } /** - * Like {@link #to(ResourceId)}. + * Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A + * directory for temporary files must be specified using {@link #withTempDirectory}. */ + public TypedWrite<T> to(FilenamePolicy filenamePolicy) { + return toBuilder().setFilenamePolicy(filenamePolicy).build(); + } + + /** + * Use a {@link DynamicDestinations} object to vend {@link FilenamePolicy} objects. These + * objects can examine the input record when creating a {@link FilenamePolicy}. A directory for + * temporary files must be specified using {@link #withTempDirectory}. + */ + public TypedWrite<T> to(DynamicDestinations<T, ?> dynamicDestinations) { + return toBuilder().setDynamicDestinations(dynamicDestinations).build(); + } + + /** + * Write to dynamic destinations using the default filename policy. The destinationFunction maps + * the input record to a {@link DefaultFilenamePolicy.Params} object that specifies where the + * records should be written (base filename, file suffix, and shard template). The + * emptyDestination parameter specified where empty files should be written for when the written + * {@link PCollection} is empty. + */ + public TypedWrite<T> to( + SerializableFunction<T, Params> destinationFunction, Params emptyDestination) { + return to(DynamicFileDestinations.toDefaultPolicies(destinationFunction, emptyDestination)); + } + + /** Like {@link #to(ResourceId)}. */ @Experimental(Kind.FILESYSTEM) - public Write toResource(ValueProvider<ResourceId> filenamePrefix) { + public TypedWrite<T> toResource(ValueProvider<ResourceId> filenamePrefix) { return toBuilder().setFilenamePrefix(filenamePrefix).build(); } + /** Set the base directory used to generate temporary files. */ + @Experimental(Kind.FILESYSTEM) + public TypedWrite<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) { + return toBuilder().setTempDirectory(tempDirectory).build(); + } + + /** Set the base directory used to generate temporary files. */ + @Experimental(Kind.FILESYSTEM) + public TypedWrite<T> withTempDirectory(ResourceId tempDirectory) { + return withTempDirectory(StaticValueProvider.of(tempDirectory)); + } + /** * Uses the given {@link ShardNameTemplate} for naming output files. This option may only be - * used when {@link #withFilenamePolicy(FilenamePolicy)} has not been configured. + * used when using one of the default filename-prefix to() overrides - i.e. not when using + * either {@link #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}. * * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are * used. */ - public Write withShardNameTemplate(String shardTemplate) { + public TypedWrite<T> withShardNameTemplate(String shardTemplate) { return toBuilder().setShardTemplate(shardTemplate).build(); } /** - * Configures the filename suffix for written files. This option may only be used when - * {@link #withFilenamePolicy(FilenamePolicy)} has not been configured. + * Configures the filename suffix for written files. This option may only be used when using one + * of the default filename-prefix to() overrides - i.e. not when using either {@link + * #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}. * * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are * used. */ - public Write withSuffix(String filenameSuffix) { + public TypedWrite<T> withSuffix(String filenameSuffix) { return toBuilder().setFilenameSuffix(filenameSuffix).build(); } /** - * Configures the {@link FileBasedSink.FilenamePolicy} that will be used to name written files. - */ - public Write withFilenamePolicy(FilenamePolicy filenamePolicy) { - return toBuilder().setFilenamePolicy(filenamePolicy).build(); - } - - /** * Configures the number of output shards produced overall (when using unwindowed writes) or * per-window (when using windowed writes). * @@ -375,14 +471,13 @@ public class TextIO { * * @param numShards the number of shards to use, or 0 to let the system decide. */ - public Write withNumShards(int numShards) { + public TypedWrite<T> withNumShards(int numShards) { checkArgument(numShards >= 0); return toBuilder().setNumShards(numShards).build(); } /** - * Forces a single file as output and empty shard name template. This option is only compatible - * with unwindowed writes. + * Forces a single file as output and empty shard name template. * * <p>For unwindowed writes, constraining the number of shards is likely to reduce the * performance of a pipeline. Setting this value is not recommended unless you require a @@ -390,7 +485,7 @@ public class TextIO { * * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")} */ - public Write withoutSharding() { + public TypedWrite<T> withoutSharding() { return withNumShards(1).withShardNameTemplate(""); } @@ -399,7 +494,7 @@ public class TextIO { * * <p>A {@code null} value will clear any previously configured header. */ - public Write withHeader(@Nullable String header) { + public TypedWrite<T> withHeader(@Nullable String header) { return toBuilder().setHeader(header).build(); } @@ -408,48 +503,82 @@ public class TextIO { * * <p>A {@code null} value will clear any previously configured footer. */ - public Write withFooter(@Nullable String footer) { + public TypedWrite<T> withFooter(@Nullable String footer) { return toBuilder().setFooter(footer).build(); } /** - * Returns a transform for writing to text files like this one but that has the given - * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. - * The default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}. + * Returns a transform for writing to text files like this one but that has the given {@link + * WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The + * default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}. * * <p>A {@code null} value will reset the value to the default value mentioned above. */ - public Write withWritableByteChannelFactory( + public TypedWrite<T> withWritableByteChannelFactory( WritableByteChannelFactory writableByteChannelFactory) { return toBuilder().setWritableByteChannelFactory(writableByteChannelFactory).build(); } - public Write withWindowedWrites() { + /** + * Preserves windowing of input elements and writes them to files based on the element's window. + * + * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using + * {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}. + */ + public TypedWrite<T> withWindowedWrites() { return toBuilder().setWindowedWrites(true).build(); } + private DynamicDestinations<T, ?> resolveDynamicDestinations() { + DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations(); + if (dynamicDestinations == null) { + FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); + if (usedFilenamePolicy == null) { + usedFilenamePolicy = + DefaultFilenamePolicy.fromStandardParameters( + getFilenamePrefix(), + getShardTemplate(), + getFilenameSuffix(), + getWindowedWrites()); + } + dynamicDestinations = DynamicFileDestinations.constant(usedFilenamePolicy); + } + return dynamicDestinations; + } + @Override - public PDone expand(PCollection<String> input) { - checkState(getFilenamePrefix() != null, - "Need to set the filename prefix of a TextIO.Write transform."); + public PDone expand(PCollection<T> input) { + checkState( + getFilenamePrefix() != null || getTempDirectory() != null, + "Need to set either the filename prefix or the tempDirectory of a TextIO.Write " + + "transform."); checkState( - (getFilenamePolicy() == null) - || (getShardTemplate() == null && getFilenameSuffix() == null), - "Cannot set a filename policy and also a filename template or suffix."); - - FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); - if (usedFilenamePolicy == null) { - usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters( - getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites()); + getFilenamePolicy() == null || getDynamicDestinations() == null, + "Cannot specify both a filename policy and dynamic destinations"); + if (getFilenamePolicy() != null || getDynamicDestinations() != null) { + checkState( + getShardTemplate() == null && getFilenameSuffix() == null, + "shardTemplate and filenameSuffix should only be used with the default " + + "filename policy"); } - WriteFiles<String> write = + return expandTyped(input, resolveDynamicDestinations()); + } + + public <DestinationT> PDone expandTyped( + PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) { + ValueProvider<ResourceId> tempDirectory = getTempDirectory(); + if (tempDirectory == null) { + tempDirectory = getFilenamePrefix(); + } + WriteFiles<T, DestinationT, String> write = WriteFiles.to( - new TextSink( - getFilenamePrefix(), - usedFilenamePolicy, + new TextSink<>( + tempDirectory, + dynamicDestinations, getHeader(), getFooter(), - getWritableByteChannelFactory())); + getWritableByteChannelFactory()), + getFormatFunction()); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -463,27 +592,26 @@ public class TextIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - String prefixString = ""; - if (getFilenamePrefix() != null) { - prefixString = getFilenamePrefix().isAccessible() - ? getFilenamePrefix().get().toString() : getFilenamePrefix().toString(); + resolveDynamicDestinations().populateDisplayData(builder); + String tempDirectory = null; + if (getTempDirectory() != null) { + tempDirectory = + getTempDirectory().isAccessible() + ? getTempDirectory().get().toString() + : getTempDirectory().toString(); } builder - .addIfNotNull(DisplayData.item("filePrefix", prefixString) - .withLabel("Output File Prefix")) - .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix()) - .withLabel("Output File Suffix")) - .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate()) - .withLabel("Output Shard Name Template")) - .addIfNotDefault(DisplayData.item("numShards", getNumShards()) - .withLabel("Maximum Output Shards"), 0) - .addIfNotNull(DisplayData.item("fileHeader", getHeader()) - .withLabel("File Header")) - .addIfNotNull(DisplayData.item("fileFooter", getFooter()) - .withLabel("File Footer")) - .add(DisplayData - .item("writableByteChannelFactory", getWritableByteChannelFactory().toString()) - .withLabel("Compression/Transformation Type")); + .addIfNotDefault( + DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0) + .addIfNotNull( + DisplayData.item("tempDirectory", tempDirectory) + .withLabel("Directory for temporary files")) + .addIfNotNull(DisplayData.item("fileHeader", getHeader()).withLabel("File Header")) + .addIfNotNull(DisplayData.item("fileFooter", getFooter()).withLabel("File Footer")) + .add( + DisplayData.item( + "writableByteChannelFactory", getWritableByteChannelFactory().toString()) + .withLabel("Compression/Transformation Type")); } @Override @@ -493,6 +621,128 @@ public class TextIO { } /** + * This class is used as the default return value of {@link TextIO#write()}. + * + * <p>All methods in this class delegate to the appropriate method of {@link TextIO.TypedWrite}. + * This class exists for backwards compatibility, and will be removed in Beam 3.0. + */ + public static class Write extends PTransform<PCollection<String>, PDone> { + @VisibleForTesting TypedWrite<String> inner; + + Write() { + this(TextIO.writeCustomType(SerializableFunctions.<String>identity())); + } + + Write(TypedWrite<String> inner) { + this.inner = inner; + } + + /** See {@link TypedWrite#to(String)}. */ + public Write to(String filenamePrefix) { + return new Write(inner.to(filenamePrefix)); + } + + /** See {@link TypedWrite#to(ResourceId)}. */ + @Experimental(Kind.FILESYSTEM) + public Write to(ResourceId filenamePrefix) { + return new Write(inner.to(filenamePrefix)); + } + + /** See {@link TypedWrite#to(ValueProvider)}. */ + public Write to(ValueProvider<String> outputPrefix) { + return new Write(inner.to(outputPrefix)); + } + + /** See {@link TypedWrite#toResource(ValueProvider)}. */ + @Experimental(Kind.FILESYSTEM) + public Write toResource(ValueProvider<ResourceId> filenamePrefix) { + return new Write(inner.toResource(filenamePrefix)); + } + + /** See {@link TypedWrite#to(FilenamePolicy)}. */ + @Experimental(Kind.FILESYSTEM) + public Write to(FilenamePolicy filenamePolicy) { + return new Write(inner.to(filenamePolicy)); + } + + /** See {@link TypedWrite#to(DynamicDestinations)}. */ + @Experimental(Kind.FILESYSTEM) + public Write to(DynamicDestinations<String, ?> dynamicDestinations) { + return new Write(inner.to(dynamicDestinations)); + } + + /** See {@link TypedWrite#to(SerializableFunction, Params)}. */ + @Experimental(Kind.FILESYSTEM) + public Write to( + SerializableFunction<String, Params> destinationFunction, Params emptyDestination) { + return new Write(inner.to(destinationFunction, emptyDestination)); + } + + /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */ + @Experimental(Kind.FILESYSTEM) + public Write withTempDirectory(ValueProvider<ResourceId> tempDirectory) { + return new Write(inner.withTempDirectory(tempDirectory)); + } + + /** See {@link TypedWrite#withTempDirectory(ResourceId)}. */ + @Experimental(Kind.FILESYSTEM) + public Write withTempDirectory(ResourceId tempDirectory) { + return new Write(inner.withTempDirectory(tempDirectory)); + } + + /** See {@link TypedWrite#withShardNameTemplate(String)}. */ + public Write withShardNameTemplate(String shardTemplate) { + return new Write(inner.withShardNameTemplate(shardTemplate)); + } + + /** See {@link TypedWrite#withSuffix(String)}. */ + public Write withSuffix(String filenameSuffix) { + return new Write(inner.withSuffix(filenameSuffix)); + } + + /** See {@link TypedWrite#withNumShards(int)}. */ + public Write withNumShards(int numShards) { + return new Write(inner.withNumShards(numShards)); + } + + /** See {@link TypedWrite#withoutSharding()}. */ + public Write withoutSharding() { + return new Write(inner.withoutSharding()); + } + + /** See {@link TypedWrite#withHeader(String)}. */ + public Write withHeader(@Nullable String header) { + return new Write(inner.withHeader(header)); + } + + /** See {@link TypedWrite#withFooter(String)}. */ + public Write withFooter(@Nullable String footer) { + return new Write(inner.withFooter(footer)); + } + + /** See {@link TypedWrite#withWritableByteChannelFactory(WritableByteChannelFactory)}. */ + public Write withWritableByteChannelFactory( + WritableByteChannelFactory writableByteChannelFactory) { + return new Write(inner.withWritableByteChannelFactory(writableByteChannelFactory)); + } + + /** See {@link TypedWrite#withWindowedWrites}. */ + public Write withWindowedWrites() { + return new Write(inner.withWindowedWrites()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + inner.populateDisplayData(builder); + } + + @Override + public PDone expand(PCollection<String> input) { + return inner.expand(input); + } + } + + /** * Possible text file compression types. */ public enum CompressionType {
