http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 78340f3..f1eb7c0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -22,38 +22,22 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; -import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.CompressedSource.CompressionMode; -import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; -import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.Read.Bounded; -import org.apache.beam.sdk.io.fs.MatchResult; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.io.fs.MatchResult.Status; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SerializableFunctions; -import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -63,14 +47,13 @@ import org.apache.beam.sdk.values.PDone; * * <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to * instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the - * file(s) to be read. Alternatively, if the filenames to be read are themselves in a - * {@link PCollection}, apply {@link TextIO#readAll()}. + * file(s) to be read. * * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, each * corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', * or '\r\n'). * - * <p>Example 1: reading a file or filepattern. + * <p>Example: * * <pre>{@code * Pipeline p = ...; @@ -79,24 +62,22 @@ import org.apache.beam.sdk.values.PDone; * PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt")); * }</pre> * - * <p>Example 2: reading a PCollection of filenames. - * - * <pre>{@code - * Pipeline p = ...; - * - * // E.g. the filenames might be computed from other data in the pipeline, or - * // read from a data source. - * PCollection<String> filenames = ...; - * - * // Read all files in the collection. - * PCollection<String> lines = filenames.apply(TextIO.readAll()); - * }</pre> - * * <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using * {@link TextIO.Write#to(String)} to specify the output prefix of the files to write. * - * <p>For example: + * <p>By default, all input is put into the global window before writing. If per-window writes are + * desired - for example, when using a streaming runner - + * {@link TextIO.Write#withWindowedWrites()} will cause windowing and triggering to be + * preserved. When producing windowed writes, the number of output shards must be set explicitly + * using {@link TextIO.Write#withNumShards(int)}; some runners may set this for you to a + * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} can also be + * set in case you need better control over naming files created by unique windows. + * {@link DefaultFilenamePolicy} policy for producing unique filenames might not be appropriate + * for your use case. + * + * <p>Any existing files with the same names as generated output files will be overwritten. * + * <p>For example: * <pre>{@code * // A simple Write to a local file (only runs locally): * PCollection<String> lines = ...; @@ -104,49 +85,10 @@ import org.apache.beam.sdk.values.PDone; * * // Same as above, only with Gzip compression: * PCollection<String> lines = ...; - * lines.apply(TextIO.write().to("/path/to/file.txt")) + * lines.apply(TextIO.write().to("/path/to/file.txt")); * .withSuffix(".txt") * .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP)); * }</pre> - * - * <p>By default, all input is put into the global window before writing. If per-window writes are - * desired - for example, when using a streaming runner - {@link TextIO.Write#withWindowedWrites()} - * will cause windowing and triggering to be preserved. When producing windowed writes with a - * streaming runner that supports triggers, the number of output shards must be set explicitly using - * {@link TextIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen - * value, so you may need not set it yourself. If setting an explicit template using {@link - * TextIO.Write#withShardNameTemplate(String)}, make sure that the template contains placeholders - * for the window and the pane; W is expanded into the window text, and P into the pane; the default - * template will include both the window and the pane in the filename. - * - * <p>If you want better control over how filenames are generated than the default policy allows, a - * custom {@link FilenamePolicy} can also be set using {@link TextIO.Write#to(FilenamePolicy)}. - * - * <p>TextIO also supports dynamic, value-dependent file destinations. The most general form of this - * is done via {@link TextIO.Write#to(DynamicDestinations)}. A {@link DynamicDestinations} class - * allows you to convert any input value into a custom destination object, and map that destination - * object to a {@link FilenamePolicy}. This allows using different filename policies (or more - * commonly, differently-configured instances of the same policy) based on the input record. Often - * this is used in conjunction with {@link TextIO#writeCustomType(SerializableFunction)}, which - * allows your {@link DynamicDestinations} object to examine the input type and takes a format - * function to convert that type to a string for writing. - * - * <p>A convenience shortcut is provided for the case where the default naming policy is used, but - * different configurations of this policy are wanted based on the input record. Default naming - * policies can be configured using the {@link DefaultFilenamePolicy.Params} object. - * - * <pre>{@code - * PCollection<UserEvent>> lines = ...; - * lines.apply(TextIO.<UserEvent>writeCustomType(new FormatEvent()) - * .to(new SerializableFunction<UserEvent, Params>() { - * public String apply(UserEvent value) { - * return new Params().withBaseFilename(baseDirectory + "/" + value.country()); - * } - * }), - * new Params().withBaseFilename(baseDirectory + "/empty"); - * }</pre> - * - * <p>Any existing files with the same names as generated output files will be overwritten. */ public class TextIO { /** @@ -158,54 +100,16 @@ public class TextIO { } /** - * A {@link PTransform} that works like {@link #read}, but reads each file in a {@link - * PCollection} of filepatterns. - * - * <p>Can be applied to both bounded and unbounded {@link PCollection PCollections}, so this is - * suitable for reading a {@link PCollection} of filepatterns arriving as a stream. However, every - * filepattern is expanded once at the moment it is processed, rather than watched for new files - * matching the filepattern to appear. Likewise, every file is read once, rather than watched for - * new entries. - */ - public static ReadAll readAll() { - return new AutoValue_TextIO_ReadAll.Builder() - .setCompressionType(CompressionType.AUTO) - // 64MB is a reasonable value that allows to amortize the cost of opening files, - // but is not so large as to exhaust a typical runner's maximum amount of output per - // ProcessElement call. - .setDesiredBundleSizeBytes(64 * 1024 * 1024L) - .build(); - } - - /** * A {@link PTransform} that writes a {@link PCollection} to a text file (or multiple text files * matching a sharding pattern), with each element of the input collection encoded into its own * line. */ public static Write write() { - return new TextIO.Write(); - } - - /** - * A {@link PTransform} that writes a {@link PCollection} to a text file (or multiple text files - * matching a sharding pattern), with each element of the input collection encoded into its own - * line. - * - * <p>This version allows you to apply {@link TextIO} writes to a PCollection of a custom type - * {@link T}, along with a format function that converts the input type {@link T} to the String - * that will be written to the file. The advantage of this is it allows a user-provided {@link - * DynamicDestinations} object, set via {@link Write#to(DynamicDestinations)} to examine the - * user's custom type when choosing a destination. - */ - public static <T> TypedWrite<T> writeCustomType(SerializableFunction<T, String> formatFunction) { - return new AutoValue_TextIO_TypedWrite.Builder<T>() + return new AutoValue_TextIO_Write.Builder() .setFilenamePrefix(null) - .setTempDirectory(null) .setShardTemplate(null) .setFilenameSuffix(null) .setFilenamePolicy(null) - .setDynamicDestinations(null) - .setFormatFunction(formatFunction) .setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED) .setWindowedWrites(false) .setNumShards(0) @@ -274,34 +178,29 @@ public class TextIO { // Helper to create a source specific to the requested compression type. protected FileBasedSource<String> getSource() { - return wrapWithCompression(new TextSource(getFilepattern()), getCompressionType()); - } - - private static FileBasedSource<String> wrapWithCompression( - FileBasedSource<String> source, CompressionType compressionType) { - switch (compressionType) { + switch (getCompressionType()) { case UNCOMPRESSED: - return source; + return new TextSource(getFilepattern()); case AUTO: - return CompressedSource.from(source); + return CompressedSource.from(new TextSource(getFilepattern())); case BZIP2: return - CompressedSource.from(source) - .withDecompression(CompressionMode.BZIP2); + CompressedSource.from(new TextSource(getFilepattern())) + .withDecompression(CompressedSource.CompressionMode.BZIP2); case GZIP: return - CompressedSource.from(source) - .withDecompression(CompressionMode.GZIP); + CompressedSource.from(new TextSource(getFilepattern())) + .withDecompression(CompressedSource.CompressionMode.GZIP); case ZIP: return - CompressedSource.from(source) - .withDecompression(CompressionMode.ZIP); + CompressedSource.from(new TextSource(getFilepattern())) + .withDecompression(CompressedSource.CompressionMode.ZIP); case DEFLATE: return - CompressedSource.from(source) - .withDecompression(CompressionMode.DEFLATE); + CompressedSource.from(new TextSource(getFilepattern())) + .withDecompression(CompressedSource.CompressionMode.DEFLATE); default: - throw new IllegalArgumentException("Unknown compression type: " + compressionType); + throw new IllegalArgumentException("Unknown compression type: " + getFilepattern()); } } @@ -324,170 +223,18 @@ public class TextIO { } } - ///////////////////////////////////////////////////////////////////////////// - - /** Implementation of {@link #readAll}. */ - @AutoValue - public abstract static class ReadAll - extends PTransform<PCollection<String>, PCollection<String>> { - abstract CompressionType getCompressionType(); - abstract long getDesiredBundleSizeBytes(); - - abstract Builder toBuilder(); - - @AutoValue.Builder - abstract static class Builder { - abstract Builder setCompressionType(CompressionType compressionType); - abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes); - - abstract ReadAll build(); - } - - /** Same as {@link Read#withCompressionType(CompressionType)}. */ - public ReadAll withCompressionType(CompressionType compressionType) { - return toBuilder().setCompressionType(compressionType).build(); - } - - @VisibleForTesting - ReadAll withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { - return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); - } - - @Override - public PCollection<String> expand(PCollection<String> input) { - return input - .apply("Expand glob", ParDo.of(new ExpandGlobFn())) - .apply( - "Split into ranges", - ParDo.of(new SplitIntoRangesFn(getCompressionType(), getDesiredBundleSizeBytes()))) - .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<Metadata, OffsetRange>>()) - .apply("Read", ParDo.of(new ReadTextFn(this))); - } - - private static class ReshuffleWithUniqueKey<T> - extends PTransform<PCollection<T>, PCollection<T>> { - @Override - public PCollection<T> expand(PCollection<T> input) { - return input - .apply("Unique key", ParDo.of(new AssignUniqueKeyFn<T>())) - .apply("Reshuffle", Reshuffle.<Integer, T>of()) - .apply("Values", Values.<T>create()); - } - } - - private static class AssignUniqueKeyFn<T> extends DoFn<T, KV<Integer, T>> { - private int index; - - @Setup - public void setup() { - this.index = ThreadLocalRandom.current().nextInt(); - } - - @ProcessElement - public void process(ProcessContext c) { - c.output(KV.of(++index, c.element())); - } - } - - private static class ExpandGlobFn extends DoFn<String, Metadata> { - @ProcessElement - public void process(ProcessContext c) throws Exception { - MatchResult match = FileSystems.match(c.element()); - checkArgument( - match.status().equals(Status.OK), - "Failed to match filepattern %s: %s", - c.element(), - match.status()); - for (Metadata metadata : match.metadata()) { - c.output(metadata); - } - } - } - - private static class SplitIntoRangesFn extends DoFn<Metadata, KV<Metadata, OffsetRange>> { - private final CompressionType compressionType; - private final long desiredBundleSize; - - private SplitIntoRangesFn(CompressionType compressionType, long desiredBundleSize) { - this.compressionType = compressionType; - this.desiredBundleSize = desiredBundleSize; - } - - @ProcessElement - public void process(ProcessContext c) { - Metadata metadata = c.element(); - final boolean isSplittable = isSplittable(metadata, compressionType); - if (!isSplittable) { - c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes()))); - return; - } - for (OffsetRange range : - new OffsetRange(0, metadata.sizeBytes()).split(desiredBundleSize, 0)) { - c.output(KV.of(metadata, range)); - } - } - - static boolean isSplittable(Metadata metadata, CompressionType compressionType) { - if (!metadata.isReadSeekEfficient()) { - return false; - } - switch (compressionType) { - case AUTO: - return !CompressionMode.isCompressed(metadata.resourceId().toString()); - case UNCOMPRESSED: - return true; - case GZIP: - case BZIP2: - case ZIP: - case DEFLATE: - return false; - default: - throw new UnsupportedOperationException("Unknown compression type: " + compressionType); - } - } - } - - private static class ReadTextFn extends DoFn<KV<Metadata, OffsetRange>, String> { - private final TextIO.ReadAll spec; - - private ReadTextFn(ReadAll spec) { - this.spec = spec; - } - - @ProcessElement - public void process(ProcessContext c) throws IOException { - Metadata metadata = c.element().getKey(); - OffsetRange range = c.element().getValue(); - FileBasedSource<String> source = - TextIO.Read.wrapWithCompression( - new TextSource(StaticValueProvider.of(metadata.toString())), - spec.getCompressionType()); - BoundedSource.BoundedReader<String> reader = - source - .createForSubrangeOfFile(metadata, range.getFrom(), range.getTo()) - .createReader(c.getPipelineOptions()); - for (boolean more = reader.start(); more; more = reader.advance()) { - c.output(reader.getCurrent()); - } - } - } - } ///////////////////////////////////////////////////////////////////////////// /** Implementation of {@link #write}. */ @AutoValue - public abstract static class TypedWrite<T> extends PTransform<PCollection<T>, PDone> { + public abstract static class Write extends PTransform<PCollection<String>, PDone> { /** The prefix of each file written, combined with suffix and shardTemplate. */ @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix(); /** The suffix of each file written, combined with prefix and shardTemplate. */ @Nullable abstract String getFilenameSuffix(); - /** The base directory used for generating temporary files. */ - @Nullable - abstract ValueProvider<ResourceId> getTempDirectory(); - /** An optional header to add to each file. */ @Nullable abstract String getHeader(); @@ -503,13 +250,6 @@ public class TextIO { /** A policy for naming output files. */ @Nullable abstract FilenamePolicy getFilenamePolicy(); - /** Allows for value-dependent {@link DynamicDestinations} to be vended. */ - @Nullable - abstract DynamicDestinations<T, ?> getDynamicDestinations(); - - /** A function that converts T to a String, for writing to the file. */ - abstract SerializableFunction<T, String> getFormatFunction(); - /** Whether to write windowed output files. */ abstract boolean getWindowedWrites(); @@ -519,68 +259,66 @@ public class TextIO { */ abstract WritableByteChannelFactory getWritableByteChannelFactory(); - abstract Builder<T> toBuilder(); + abstract Builder toBuilder(); @AutoValue.Builder - abstract static class Builder<T> { - abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix); - - abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> tempDirectory); - - abstract Builder<T> setShardTemplate(@Nullable String shardTemplate); - - abstract Builder<T> setFilenameSuffix(@Nullable String filenameSuffix); - - abstract Builder<T> setHeader(@Nullable String header); - - abstract Builder<T> setFooter(@Nullable String footer); - - abstract Builder<T> setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy); - - abstract Builder<T> setDynamicDestinations( - @Nullable DynamicDestinations<T, ?> dynamicDestinations); - - abstract Builder<T> setFormatFunction(SerializableFunction<T, String> formatFunction); - - abstract Builder<T> setNumShards(int numShards); - - abstract Builder<T> setWindowedWrites(boolean windowedWrites); - - abstract Builder<T> setWritableByteChannelFactory( + abstract static class Builder { + abstract Builder setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix); + abstract Builder setShardTemplate(@Nullable String shardTemplate); + abstract Builder setFilenameSuffix(@Nullable String filenameSuffix); + abstract Builder setHeader(@Nullable String header); + abstract Builder setFooter(@Nullable String footer); + abstract Builder setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy); + abstract Builder setNumShards(int numShards); + abstract Builder setWindowedWrites(boolean windowedWrites); + abstract Builder setWritableByteChannelFactory( WritableByteChannelFactory writableByteChannelFactory); - abstract TypedWrite<T> build(); + abstract Write build(); } /** - * Writes to text files with the given prefix. The given {@code prefix} can reference any {@link - * FileSystem} on the classpath. This prefix is used by the {@link DefaultFilenamePolicy} to - * generate filenames. + * Writes to text files with the given prefix. The given {@code prefix} can reference any + * {@link FileSystem} on the classpath. * - * <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix - * to define the base output directory and file prefix, a shard identifier (see {@link - * #withNumShards(int)}), and a common suffix (if supplied using {@link #withSuffix(String)}). + * <p>The name of the output files will be determined by the {@link FilenamePolicy} used. * - * <p>This default policy can be overridden using {@link #to(FilenamePolicy)}, in which case - * {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should not be set. - * Custom filename policies do not automatically see this prefix - you should explicitly pass - * the prefix into your {@link FilenamePolicy} object if you need this. + * <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix + * to define the base output directory and file prefix, a shard identifier (see + * {@link #withNumShards(int)}), and a common suffix (if supplied using + * {@link #withSuffix(String)}). * - * <p>If {@link #withTempDirectory} has not been called, this filename prefix will be used to - * infer a directory for temporary files. + * <p>This default policy can be overridden using {@link #withFilenamePolicy(FilenamePolicy)}, + * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should + * not be set. */ - public TypedWrite<T> to(String filenamePrefix) { + public Write to(String filenamePrefix) { return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix)); } - /** Like {@link #to(String)}. */ + /** + * Writes to text files with prefix from the given resource. + * + * <p>The name of the output files will be determined by the {@link FilenamePolicy} used. + * + * <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix + * to define the base output directory and file prefix, a shard identifier (see + * {@link #withNumShards(int)}), and a common suffix (if supplied using + * {@link #withSuffix(String)}). + * + * <p>This default policy can be overridden using {@link #withFilenamePolicy(FilenamePolicy)}, + * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should + * not be set. + */ @Experimental(Kind.FILESYSTEM) - public TypedWrite<T> to(ResourceId filenamePrefix) { + public Write to(ResourceId filenamePrefix) { return toResource(StaticValueProvider.of(filenamePrefix)); } - /** Like {@link #to(String)}. */ - public TypedWrite<T> to(ValueProvider<String> outputPrefix) { + /** + * Like {@link #to(String)}. + */ + public Write to(ValueProvider<String> outputPrefix) { return toResource(NestedValueProvider.of(outputPrefix, new SerializableFunction<String, ResourceId>() { @Override @@ -591,77 +329,43 @@ public class TextIO { } /** - * Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A - * directory for temporary files must be specified using {@link #withTempDirectory}. + * Like {@link #to(ResourceId)}. */ - public TypedWrite<T> to(FilenamePolicy filenamePolicy) { - return toBuilder().setFilenamePolicy(filenamePolicy).build(); - } - - /** - * Use a {@link DynamicDestinations} object to vend {@link FilenamePolicy} objects. These - * objects can examine the input record when creating a {@link FilenamePolicy}. A directory for - * temporary files must be specified using {@link #withTempDirectory}. - */ - public TypedWrite<T> to(DynamicDestinations<T, ?> dynamicDestinations) { - return toBuilder().setDynamicDestinations(dynamicDestinations).build(); - } - - /** - * Write to dynamic destinations using the default filename policy. The destinationFunction maps - * the input record to a {@link DefaultFilenamePolicy.Params} object that specifies where the - * records should be written (base filename, file suffix, and shard template). The - * emptyDestination parameter specified where empty files should be written for when the written - * {@link PCollection} is empty. - */ - public TypedWrite<T> to( - SerializableFunction<T, Params> destinationFunction, Params emptyDestination) { - return to(DynamicFileDestinations.toDefaultPolicies(destinationFunction, emptyDestination)); - } - - /** Like {@link #to(ResourceId)}. */ @Experimental(Kind.FILESYSTEM) - public TypedWrite<T> toResource(ValueProvider<ResourceId> filenamePrefix) { + public Write toResource(ValueProvider<ResourceId> filenamePrefix) { return toBuilder().setFilenamePrefix(filenamePrefix).build(); } - /** Set the base directory used to generate temporary files. */ - @Experimental(Kind.FILESYSTEM) - public TypedWrite<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) { - return toBuilder().setTempDirectory(tempDirectory).build(); - } - - /** Set the base directory used to generate temporary files. */ - @Experimental(Kind.FILESYSTEM) - public TypedWrite<T> withTempDirectory(ResourceId tempDirectory) { - return withTempDirectory(StaticValueProvider.of(tempDirectory)); - } - /** * Uses the given {@link ShardNameTemplate} for naming output files. This option may only be - * used when using one of the default filename-prefix to() overrides - i.e. not when using - * either {@link #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}. + * used when {@link #withFilenamePolicy(FilenamePolicy)} has not been configured. * * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are * used. */ - public TypedWrite<T> withShardNameTemplate(String shardTemplate) { + public Write withShardNameTemplate(String shardTemplate) { return toBuilder().setShardTemplate(shardTemplate).build(); } /** - * Configures the filename suffix for written files. This option may only be used when using one - * of the default filename-prefix to() overrides - i.e. not when using either {@link - * #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}. + * Configures the filename suffix for written files. This option may only be used when + * {@link #withFilenamePolicy(FilenamePolicy)} has not been configured. * * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are * used. */ - public TypedWrite<T> withSuffix(String filenameSuffix) { + public Write withSuffix(String filenameSuffix) { return toBuilder().setFilenameSuffix(filenameSuffix).build(); } /** + * Configures the {@link FileBasedSink.FilenamePolicy} that will be used to name written files. + */ + public Write withFilenamePolicy(FilenamePolicy filenamePolicy) { + return toBuilder().setFilenamePolicy(filenamePolicy).build(); + } + + /** * Configures the number of output shards produced overall (when using unwindowed writes) or * per-window (when using windowed writes). * @@ -671,13 +375,14 @@ public class TextIO { * * @param numShards the number of shards to use, or 0 to let the system decide. */ - public TypedWrite<T> withNumShards(int numShards) { + public Write withNumShards(int numShards) { checkArgument(numShards >= 0); return toBuilder().setNumShards(numShards).build(); } /** - * Forces a single file as output and empty shard name template. + * Forces a single file as output and empty shard name template. This option is only compatible + * with unwindowed writes. * * <p>For unwindowed writes, constraining the number of shards is likely to reduce the * performance of a pipeline. Setting this value is not recommended unless you require a @@ -685,7 +390,7 @@ public class TextIO { * * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")} */ - public TypedWrite<T> withoutSharding() { + public Write withoutSharding() { return withNumShards(1).withShardNameTemplate(""); } @@ -694,7 +399,7 @@ public class TextIO { * * <p>A {@code null} value will clear any previously configured header. */ - public TypedWrite<T> withHeader(@Nullable String header) { + public Write withHeader(@Nullable String header) { return toBuilder().setHeader(header).build(); } @@ -703,82 +408,48 @@ public class TextIO { * * <p>A {@code null} value will clear any previously configured footer. */ - public TypedWrite<T> withFooter(@Nullable String footer) { + public Write withFooter(@Nullable String footer) { return toBuilder().setFooter(footer).build(); } /** - * Returns a transform for writing to text files like this one but that has the given {@link - * WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The - * default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}. + * Returns a transform for writing to text files like this one but that has the given + * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. + * The default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}. * * <p>A {@code null} value will reset the value to the default value mentioned above. */ - public TypedWrite<T> withWritableByteChannelFactory( + public Write withWritableByteChannelFactory( WritableByteChannelFactory writableByteChannelFactory) { return toBuilder().setWritableByteChannelFactory(writableByteChannelFactory).build(); } - /** - * Preserves windowing of input elements and writes them to files based on the element's window. - * - * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using - * {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}. - */ - public TypedWrite<T> withWindowedWrites() { + public Write withWindowedWrites() { return toBuilder().setWindowedWrites(true).build(); } - private DynamicDestinations<T, ?> resolveDynamicDestinations() { - DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations(); - if (dynamicDestinations == null) { - FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); - if (usedFilenamePolicy == null) { - usedFilenamePolicy = - DefaultFilenamePolicy.fromStandardParameters( - getFilenamePrefix(), - getShardTemplate(), - getFilenameSuffix(), - getWindowedWrites()); - } - dynamicDestinations = DynamicFileDestinations.constant(usedFilenamePolicy); - } - return dynamicDestinations; - } - @Override - public PDone expand(PCollection<T> input) { - checkState( - getFilenamePrefix() != null || getTempDirectory() != null, - "Need to set either the filename prefix or the tempDirectory of a TextIO.Write " - + "transform."); + public PDone expand(PCollection<String> input) { + checkState(getFilenamePrefix() != null, + "Need to set the filename prefix of a TextIO.Write transform."); checkState( - getFilenamePolicy() == null || getDynamicDestinations() == null, - "Cannot specify both a filename policy and dynamic destinations"); - if (getFilenamePolicy() != null || getDynamicDestinations() != null) { - checkState( - getShardTemplate() == null && getFilenameSuffix() == null, - "shardTemplate and filenameSuffix should only be used with the default " - + "filename policy"); + (getFilenamePolicy() == null) + || (getShardTemplate() == null && getFilenameSuffix() == null), + "Cannot set a filename policy and also a filename template or suffix."); + + FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); + if (usedFilenamePolicy == null) { + usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters( + getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites()); } - return expandTyped(input, resolveDynamicDestinations()); - } - - public <DestinationT> PDone expandTyped( - PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) { - ValueProvider<ResourceId> tempDirectory = getTempDirectory(); - if (tempDirectory == null) { - tempDirectory = getFilenamePrefix(); - } - WriteFiles<T, DestinationT, String> write = + WriteFiles<String> write = WriteFiles.to( - new TextSink<>( - tempDirectory, - dynamicDestinations, + new TextSink( + getFilenamePrefix(), + usedFilenamePolicy, getHeader(), getFooter(), - getWritableByteChannelFactory()), - getFormatFunction()); + getWritableByteChannelFactory())); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -792,26 +463,27 @@ public class TextIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - resolveDynamicDestinations().populateDisplayData(builder); - String tempDirectory = null; - if (getTempDirectory() != null) { - tempDirectory = - getTempDirectory().isAccessible() - ? getTempDirectory().get().toString() - : getTempDirectory().toString(); + String prefixString = ""; + if (getFilenamePrefix() != null) { + prefixString = getFilenamePrefix().isAccessible() + ? getFilenamePrefix().get().toString() : getFilenamePrefix().toString(); } builder - .addIfNotDefault( - DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0) - .addIfNotNull( - DisplayData.item("tempDirectory", tempDirectory) - .withLabel("Directory for temporary files")) - .addIfNotNull(DisplayData.item("fileHeader", getHeader()).withLabel("File Header")) - .addIfNotNull(DisplayData.item("fileFooter", getFooter()).withLabel("File Footer")) - .add( - DisplayData.item( - "writableByteChannelFactory", getWritableByteChannelFactory().toString()) - .withLabel("Compression/Transformation Type")); + .addIfNotNull(DisplayData.item("filePrefix", prefixString) + .withLabel("Output File Prefix")) + .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix()) + .withLabel("Output File Suffix")) + .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate()) + .withLabel("Output Shard Name Template")) + .addIfNotDefault(DisplayData.item("numShards", getNumShards()) + .withLabel("Maximum Output Shards"), 0) + .addIfNotNull(DisplayData.item("fileHeader", getHeader()) + .withLabel("File Header")) + .addIfNotNull(DisplayData.item("fileFooter", getFooter()) + .withLabel("File Footer")) + .add(DisplayData + .item("writableByteChannelFactory", getWritableByteChannelFactory().toString()) + .withLabel("Compression/Transformation Type")); } @Override @@ -821,128 +493,6 @@ public class TextIO { } /** - * This class is used as the default return value of {@link TextIO#write()}. - * - * <p>All methods in this class delegate to the appropriate method of {@link TextIO.TypedWrite}. - * This class exists for backwards compatibility, and will be removed in Beam 3.0. - */ - public static class Write extends PTransform<PCollection<String>, PDone> { - @VisibleForTesting TypedWrite<String> inner; - - Write() { - this(TextIO.writeCustomType(SerializableFunctions.<String>identity())); - } - - Write(TypedWrite<String> inner) { - this.inner = inner; - } - - /** See {@link TypedWrite#to(String)}. */ - public Write to(String filenamePrefix) { - return new Write(inner.to(filenamePrefix)); - } - - /** See {@link TypedWrite#to(ResourceId)}. */ - @Experimental(Kind.FILESYSTEM) - public Write to(ResourceId filenamePrefix) { - return new Write(inner.to(filenamePrefix)); - } - - /** See {@link TypedWrite#to(ValueProvider)}. */ - public Write to(ValueProvider<String> outputPrefix) { - return new Write(inner.to(outputPrefix)); - } - - /** See {@link TypedWrite#toResource(ValueProvider)}. */ - @Experimental(Kind.FILESYSTEM) - public Write toResource(ValueProvider<ResourceId> filenamePrefix) { - return new Write(inner.toResource(filenamePrefix)); - } - - /** See {@link TypedWrite#to(FilenamePolicy)}. */ - @Experimental(Kind.FILESYSTEM) - public Write to(FilenamePolicy filenamePolicy) { - return new Write(inner.to(filenamePolicy)); - } - - /** See {@link TypedWrite#to(DynamicDestinations)}. */ - @Experimental(Kind.FILESYSTEM) - public Write to(DynamicDestinations<String, ?> dynamicDestinations) { - return new Write(inner.to(dynamicDestinations)); - } - - /** See {@link TypedWrite#to(SerializableFunction, Params)}. */ - @Experimental(Kind.FILESYSTEM) - public Write to( - SerializableFunction<String, Params> destinationFunction, Params emptyDestination) { - return new Write(inner.to(destinationFunction, emptyDestination)); - } - - /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */ - @Experimental(Kind.FILESYSTEM) - public Write withTempDirectory(ValueProvider<ResourceId> tempDirectory) { - return new Write(inner.withTempDirectory(tempDirectory)); - } - - /** See {@link TypedWrite#withTempDirectory(ResourceId)}. */ - @Experimental(Kind.FILESYSTEM) - public Write withTempDirectory(ResourceId tempDirectory) { - return new Write(inner.withTempDirectory(tempDirectory)); - } - - /** See {@link TypedWrite#withShardNameTemplate(String)}. */ - public Write withShardNameTemplate(String shardTemplate) { - return new Write(inner.withShardNameTemplate(shardTemplate)); - } - - /** See {@link TypedWrite#withSuffix(String)}. */ - public Write withSuffix(String filenameSuffix) { - return new Write(inner.withSuffix(filenameSuffix)); - } - - /** See {@link TypedWrite#withNumShards(int)}. */ - public Write withNumShards(int numShards) { - return new Write(inner.withNumShards(numShards)); - } - - /** See {@link TypedWrite#withoutSharding()}. */ - public Write withoutSharding() { - return new Write(inner.withoutSharding()); - } - - /** See {@link TypedWrite#withHeader(String)}. */ - public Write withHeader(@Nullable String header) { - return new Write(inner.withHeader(header)); - } - - /** See {@link TypedWrite#withFooter(String)}. */ - public Write withFooter(@Nullable String footer) { - return new Write(inner.withFooter(footer)); - } - - /** See {@link TypedWrite#withWritableByteChannelFactory(WritableByteChannelFactory)}. */ - public Write withWritableByteChannelFactory( - WritableByteChannelFactory writableByteChannelFactory) { - return new Write(inner.withWritableByteChannelFactory(writableByteChannelFactory)); - } - - /** See {@link TypedWrite#withWindowedWrites}. */ - public Write withWindowedWrites() { - return new Write(inner.withWindowedWrites()); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - inner.populateDisplayData(builder); - } - - @Override - public PDone expand(PCollection<String> input) { - return inner.expand(input); - } - } - - /** * Possible text file compression types. */ public enum CompressionType {
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java index b57b28c..511d697 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java @@ -34,29 +34,27 @@ import org.apache.beam.sdk.util.MimeTypes; * '\n'} represented in {@code UTF-8} format as the record separator. Each record (including the * last) is terminated. */ -class TextSink<UserT, DestinationT> extends FileBasedSink<String, DestinationT> { +class TextSink extends FileBasedSink<String> { @Nullable private final String header; @Nullable private final String footer; TextSink( ValueProvider<ResourceId> baseOutputFilename, - DynamicDestinations<UserT, DestinationT> dynamicDestinations, + FilenamePolicy filenamePolicy, @Nullable String header, @Nullable String footer, WritableByteChannelFactory writableByteChannelFactory) { - super(baseOutputFilename, dynamicDestinations, writableByteChannelFactory); + super(baseOutputFilename, filenamePolicy, writableByteChannelFactory); this.header = header; this.footer = footer; } - @Override - public WriteOperation<String, DestinationT> createWriteOperation() { - return new TextWriteOperation<>(this, header, footer); + public WriteOperation<String> createWriteOperation() { + return new TextWriteOperation(this, header, footer); } /** A {@link WriteOperation WriteOperation} for text files. */ - private static class TextWriteOperation<DestinationT> - extends WriteOperation<String, DestinationT> { + private static class TextWriteOperation extends WriteOperation<String> { @Nullable private final String header; @Nullable private final String footer; @@ -67,20 +65,20 @@ class TextSink<UserT, DestinationT> extends FileBasedSink<String, DestinationT> } @Override - public Writer<String, DestinationT> createWriter() throws Exception { - return new TextWriter<>(this, header, footer); + public Writer<String> createWriter() throws Exception { + return new TextWriter(this, header, footer); } } /** A {@link Writer Writer} for text files. */ - private static class TextWriter<DestinationT> extends Writer<String, DestinationT> { + private static class TextWriter extends Writer<String> { private static final String NEWLINE = "\n"; @Nullable private final String header; @Nullable private final String footer; private OutputStreamWriter out; public TextWriter( - WriteOperation<String, DestinationT> writeOperation, + WriteOperation<String> writeOperation, @Nullable String header, @Nullable String footer) { super(writeOperation, MimeTypes.TEXT);
