http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 3bf5d5b..4e2b61c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -27,8 +27,10 @@ import static org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParamete import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import java.io.IOException; import java.io.InputStream; @@ -40,7 +42,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -67,6 +68,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; @@ -74,6 +76,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder; import org.apache.beam.sdk.util.MimeTypes; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; @@ -95,9 +98,9 @@ import org.slf4j.LoggerFactory; * <p>The process of writing to file-based sink is as follows: * * <ol> - * <li>An optional subclass-defined initialization, - * <li>a parallel write of bundles to temporary files, and finally, - * <li>these temporary files are renamed with final output filenames. + * <li>An optional subclass-defined initialization, + * <li>a parallel write of bundles to temporary files, and finally, + * <li>these temporary files are renamed with final output filenames. * </ol> * * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the @@ -121,7 +124,8 @@ import org.slf4j.LoggerFactory; * @param <OutputT> the type of values written to the sink. */ @Experimental(Kind.FILESYSTEM) -public abstract class FileBasedSink<OutputT, DestinationT> implements Serializable, HasDisplayData { +public abstract class FileBasedSink<UserT, DestinationT, OutputT> + implements Serializable, HasDisplayData { private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class); /** Directly supported file output compression types. */ @@ -199,7 +203,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } } - private final DynamicDestinations<?, DestinationT> dynamicDestinations; + private final DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations; /** * The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the @@ -215,8 +219,54 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * destination type into an instance of {@link FilenamePolicy}. */ @Experimental(Kind.FILESYSTEM) - public abstract static class DynamicDestinations<UserT, DestinationT> + public abstract static class DynamicDestinations<UserT, DestinationT, OutputT> implements HasDisplayData, Serializable { + interface SideInputAccessor { + <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view); + } + + private SideInputAccessor sideInputAccessor; + + static class SideInputAccessorViaProcessContext implements SideInputAccessor { + private DoFn<?, ?>.ProcessContext processContext; + + SideInputAccessorViaProcessContext(DoFn<?, ?>.ProcessContext processContext) { + this.processContext = processContext; + } + + @Override + public <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view) { + return processContext.sideInput(view); + } + } + + /** + * Override to specify that this object needs access to one or more side inputs. This side + * inputs must be globally windowed, as they will be accessed from the global window. + */ + public List<PCollectionView<?>> getSideInputs() { + return ImmutableList.of(); + } + + /** + * Returns the value of a given side input. The view must be present in {@link + * #getSideInputs()}. + */ + protected final <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view) { + return sideInputAccessor.sideInput(view); + } + + final void setSideInputAccessor(SideInputAccessor sideInputAccessor) { + this.sideInputAccessor = sideInputAccessor; + } + + final void setSideInputAccessorFromProcessContext(DoFn<?, ?>.ProcessContext context) { + this.sideInputAccessor = new SideInputAccessorViaProcessContext(context); + } + + /** Convert an input record type into the output type. */ + public abstract OutputT formatRecord(UserT record); + /** * Returns an object that represents at a high level the destination being written to. May not * return null. A destination must have deterministic hash and equality methods defined. @@ -256,12 +306,13 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab return destinationCoder; } // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry. - @Nullable TypeDescriptor<DestinationT> descriptor = + @Nullable + TypeDescriptor<DestinationT> descriptor = extractFromTypeParameters( this, DynamicDestinations.class, new TypeVariableExtractor< - DynamicDestinations<UserT, DestinationT>, DestinationT>() {}); + DynamicDestinations<UserT, DestinationT, OutputT>, DestinationT>() {}); checkArgument( descriptor != null, "Unable to infer a coder for DestinationT, " @@ -323,7 +374,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab @Experimental(Kind.FILESYSTEM) public FileBasedSink( ValueProvider<ResourceId> tempDirectoryProvider, - DynamicDestinations<?, DestinationT> dynamicDestinations) { + DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations) { this(tempDirectoryProvider, dynamicDestinations, CompressionType.UNCOMPRESSED); } @@ -331,7 +382,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab @Experimental(Kind.FILESYSTEM) public FileBasedSink( ValueProvider<ResourceId> tempDirectoryProvider, - DynamicDestinations<?, DestinationT> dynamicDestinations, + DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations, WritableByteChannelFactory writableByteChannelFactory) { this.tempDirectoryProvider = NestedValueProvider.of(tempDirectoryProvider, new ExtractDirectory()); @@ -341,8 +392,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab /** Return the {@link DynamicDestinations} used. */ @SuppressWarnings("unchecked") - public <UserT> DynamicDestinations<UserT, DestinationT> getDynamicDestinations() { - return (DynamicDestinations<UserT, DestinationT>) dynamicDestinations; + public DynamicDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() { + return (DynamicDestinations<UserT, DestinationT, OutputT>) dynamicDestinations; } /** @@ -357,7 +408,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab public void validate(PipelineOptions options) {} /** Return a subclass of {@link WriteOperation} that will manage the write to the sink. */ - public abstract WriteOperation<OutputT, DestinationT> createWriteOperation(); + public abstract WriteOperation<DestinationT, OutputT> createWriteOperation(); public void populateDisplayData(DisplayData.Builder builder) { getDynamicDestinations().populateDisplayData(builder); @@ -371,11 +422,11 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * written, * * <ol> - * <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the - * output bundles. - * <li>During finalize, these temporary files are copied to final output locations and named - * according to a file naming template. - * <li>Finally, any temporary files that were created during the write are removed. + * <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the + * output bundles. + * <li>During finalize, these temporary files are copied to final output locations and named + * according to a file naming template. + * <li>Finally, any temporary files that were created during the write are removed. * </ol> * * <p>Subclass implementations of WriteOperation must implement {@link @@ -400,9 +451,9 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * * @param <OutputT> the type of values written to the sink. */ - public abstract static class WriteOperation<OutputT, DestinationT> implements Serializable { + public abstract static class WriteOperation<DestinationT, OutputT> implements Serializable { /** The Sink that this WriteOperation will write to. */ - protected final FileBasedSink<OutputT, DestinationT> sink; + protected final FileBasedSink<?, DestinationT, OutputT> sink; /** Directory for temporary output files. */ protected final ValueProvider<ResourceId> tempDirectory; @@ -428,7 +479,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * * @param sink the FileBasedSink that will be used to configure this write operation. */ - public WriteOperation(FileBasedSink<OutputT, DestinationT> sink) { + public WriteOperation(FileBasedSink<?, DestinationT, OutputT> sink) { this( sink, NestedValueProvider.of(sink.getTempDirectoryProvider(), new TemporaryDirectoryBuilder())); @@ -463,12 +514,12 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * @param tempDirectory the base directory to be used for temporary output files. */ @Experimental(Kind.FILESYSTEM) - public WriteOperation(FileBasedSink<OutputT, DestinationT> sink, ResourceId tempDirectory) { + public WriteOperation(FileBasedSink<?, DestinationT, OutputT> sink, ResourceId tempDirectory) { this(sink, StaticValueProvider.of(tempDirectory)); } private WriteOperation( - FileBasedSink<OutputT, DestinationT> sink, ValueProvider<ResourceId> tempDirectory) { + FileBasedSink<?, DestinationT, OutputT> sink, ValueProvider<ResourceId> tempDirectory) { this.sink = sink; this.tempDirectory = tempDirectory; this.windowedWrites = false; @@ -478,7 +529,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * Clients must implement to return a subclass of {@link Writer}. This method must not mutate * the state of the object. */ - public abstract Writer<OutputT, DestinationT> createWriter() throws Exception; + public abstract Writer<DestinationT, OutputT> createWriter() throws Exception; /** Indicates that the operation will be performing windowed writes. */ public void setWindowedWrites(boolean windowedWrites) { @@ -533,7 +584,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab protected final Map<ResourceId, ResourceId> buildOutputFilenames( Iterable<FileResult<DestinationT>> writerResults) { int numShards = Iterables.size(writerResults); - Map<ResourceId, ResourceId> outputFilenames = new HashMap<>(); + Map<ResourceId, ResourceId> outputFilenames = Maps.newHashMap(); // Either all results have a shard number set (if the sink is configured with a fixed // number of shards), or they all don't (otherwise). @@ -597,7 +648,6 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab "Only generated %s distinct file names for %s files.", numDistinctShards, outputFilenames.size()); - return outputFilenames; } @@ -691,7 +741,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } /** Returns the FileBasedSink for this write operation. */ - public FileBasedSink<OutputT, DestinationT> getSink() { + public FileBasedSink<?, DestinationT, OutputT> getSink() { return sink; } @@ -727,10 +777,10 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * * @param <OutputT> the type of values to write. */ - public abstract static class Writer<OutputT, DestinationT> { + public abstract static class Writer<DestinationT, OutputT> { private static final Logger LOG = LoggerFactory.getLogger(Writer.class); - private final WriteOperation<OutputT, DestinationT> writeOperation; + private final WriteOperation<DestinationT, OutputT> writeOperation; /** Unique id for this output bundle. */ private String id; @@ -757,7 +807,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab private final String mimeType; /** Construct a new {@link Writer} that will produce files of the given MIME type. */ - public Writer(WriteOperation<OutputT, DestinationT> writeOperation, String mimeType) { + public Writer(WriteOperation<DestinationT, OutputT> writeOperation, String mimeType) { checkNotNull(writeOperation); this.writeOperation = writeOperation; this.mimeType = mimeType; @@ -930,9 +980,14 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } /** Return the WriteOperation that this Writer belongs to. */ - public WriteOperation<OutputT, DestinationT> getWriteOperation() { + public WriteOperation<DestinationT, OutputT> getWriteOperation() { return writeOperation; } + + /** Return the user destination object for this writer. */ + public DestinationT getDestination() { + return destination; + } } /** @@ -987,7 +1042,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab @Experimental(Kind.FILESYSTEM) public ResourceId getDestinationFile( - DynamicDestinations<?, DestinationT> dynamicDestinations, + DynamicDestinations<?, DestinationT, ?> dynamicDestinations, int numShards, OutputFileHints outputFileHints) { checkArgument(getShard() != UNKNOWN_SHARDNUM);
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java index 6e7b243..29b3e29 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java @@ -45,7 +45,6 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PBegin; @@ -357,10 +356,12 @@ public class TFRecordIO { checkState(getOutputPrefix() != null, "need to set the output prefix of a TFRecordIO.Write transform"); WriteFiles<byte[], Void, byte[]> write = - WriteFiles.<byte[], Void, byte[]>to( + WriteFiles.to( new TFRecordSink( - getOutputPrefix(), getShardTemplate(), getFilenameSuffix(), getCompressionType()), - SerializableFunctions.<byte[]>identity()); + getOutputPrefix(), + getShardTemplate(), + getFilenameSuffix(), + getCompressionType())); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -548,7 +549,7 @@ public class TFRecordIO { /** A {@link FileBasedSink} for TFRecord files. Produces TFRecord files. */ @VisibleForTesting - static class TFRecordSink extends FileBasedSink<byte[], Void> { + static class TFRecordSink extends FileBasedSink<byte[], Void, byte[]> { @VisibleForTesting TFRecordSink( ValueProvider<ResourceId> outputPrefix, @@ -557,7 +558,7 @@ public class TFRecordIO { TFRecordIO.CompressionType compressionType) { super( outputPrefix, - DynamicFileDestinations.constant( + DynamicFileDestinations.<byte[]>constant( DefaultFilenamePolicy.fromStandardParameters( outputPrefix, shardTemplate, suffix, false)), writableByteChannelFactory(compressionType)); @@ -571,7 +572,7 @@ public class TFRecordIO { } @Override - public WriteOperation<byte[], Void> createWriteOperation() { + public WriteOperation<Void, byte[]> createWriteOperation() { return new TFRecordWriteOperation(this); } @@ -591,23 +592,23 @@ public class TFRecordIO { } /** A {@link WriteOperation WriteOperation} for TFRecord files. */ - private static class TFRecordWriteOperation extends WriteOperation<byte[], Void> { + private static class TFRecordWriteOperation extends WriteOperation<Void, byte[]> { private TFRecordWriteOperation(TFRecordSink sink) { super(sink); } @Override - public Writer<byte[], Void> createWriter() throws Exception { + public Writer<Void, byte[]> createWriter() throws Exception { return new TFRecordWriter(this); } } /** A {@link Writer Writer} for TFRecord files. */ - private static class TFRecordWriter extends Writer<byte[], Void> { + private static class TFRecordWriter extends Writer<Void, byte[]> { private WritableByteChannel outChannel; private TFRecordCodec codec; - private TFRecordWriter(WriteOperation<byte[], Void> writeOperation) { + private TFRecordWriter(WriteOperation<Void, byte[]> writeOperation) { super(writeOperation, MimeTypes.BINARY); } http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 765a842..312dc07 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -23,6 +23,10 @@ import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.util.List; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -52,8 +56,8 @@ import org.apache.beam.sdk.values.PDone; * * <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to * instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the - * file(s) to be read. Alternatively, if the filenames to be read are themselves in a - * {@link PCollection}, apply {@link TextIO#readAll()}. + * file(s) to be read. Alternatively, if the filenames to be read are themselves in a {@link + * PCollection}, apply {@link TextIO#readAll()}. * * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, each * corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', @@ -70,8 +74,8 @@ import org.apache.beam.sdk.values.PDone; * * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and - * scalability. Note that it may decrease performance if the filepattern matches only a small - * number of files. + * scalability. Note that it may decrease performance if the filepattern matches only a small number + * of files. * * <p>Example 2: reading a PCollection of filenames. * @@ -121,9 +125,9 @@ import org.apache.beam.sdk.values.PDone; * allows you to convert any input value into a custom destination object, and map that destination * object to a {@link FilenamePolicy}. This allows using different filename policies (or more * commonly, differently-configured instances of the same policy) based on the input record. Often - * this is used in conjunction with {@link TextIO#writeCustomType(SerializableFunction)}, which - * allows your {@link DynamicDestinations} object to examine the input type and takes a format - * function to convert that type to a string for writing. + * this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link + * DynamicDestinations} object to examine the input type and takes a format function to convert that + * type to a string for writing. * * <p>A convenience shortcut is provided for the case where the default naming policy is used, but * different configurations of this policy are wanted based on the input record. Default naming @@ -189,20 +193,23 @@ public class TextIO { * line. * * <p>This version allows you to apply {@link TextIO} writes to a PCollection of a custom type - * {@link T}, along with a format function that converts the input type {@link T} to the String - * that will be written to the file. The advantage of this is it allows a user-provided {@link + * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the String that + * will be written to the file must be specified. If using a custom {@link DynamicDestinations} + * object this is done using {@link DynamicDestinations#formatRecord}, otherwise the {@link + * TypedWrite#withFormatFunction} can be used to specify a format function. + * + * <p>The advantage of using a custom type is that is it allows a user-provided {@link * DynamicDestinations} object, set via {@link Write#to(DynamicDestinations)} to examine the - * user's custom type when choosing a destination. + * custom type when choosing a destination. */ - public static <T> TypedWrite<T> writeCustomType(SerializableFunction<T, String> formatFunction) { - return new AutoValue_TextIO_TypedWrite.Builder<T>() + public static <UserT> TypedWrite<UserT> writeCustomType() { + return new AutoValue_TextIO_TypedWrite.Builder<UserT>() .setFilenamePrefix(null) .setTempDirectory(null) .setShardTemplate(null) .setFilenameSuffix(null) .setFilenamePolicy(null) .setDynamicDestinations(null) - .setFormatFunction(formatFunction) .setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED) .setWindowedWrites(false) .setNumShards(0) @@ -417,11 +424,11 @@ public class TextIO { } } - ///////////////////////////////////////////////////////////////////////////// + // /////////////////////////////////////////////////////////////////////////// /** Implementation of {@link #write}. */ @AutoValue - public abstract static class TypedWrite<T> extends PTransform<PCollection<T>, PDone> { + public abstract static class TypedWrite<UserT> extends PTransform<PCollection<UserT>, PDone> { /** The prefix of each file written, combined with suffix and shardTemplate. */ @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix(); @@ -449,10 +456,19 @@ public class TextIO { /** Allows for value-dependent {@link DynamicDestinations} to be vended. */ @Nullable - abstract DynamicDestinations<T, ?> getDynamicDestinations(); + abstract DynamicDestinations<UserT, ?, String> getDynamicDestinations(); + + @Nullable + /** A destination function for using {@link DefaultFilenamePolicy} */ + abstract SerializableFunction<UserT, Params> getDestinationFunction(); - /** A function that converts T to a String, for writing to the file. */ - abstract SerializableFunction<T, String> getFormatFunction(); + @Nullable + /** A default destination for empty PCollections. */ + abstract Params getEmptyDestination(); + + /** A function that converts UserT to a String, for writing to the file. */ + @Nullable + abstract SerializableFunction<UserT, String> getFormatFunction(); /** Whether to write windowed output files. */ abstract boolean getWindowedWrites(); @@ -463,37 +479,42 @@ public class TextIO { */ abstract WritableByteChannelFactory getWritableByteChannelFactory(); - abstract Builder<T> toBuilder(); + abstract Builder<UserT> toBuilder(); @AutoValue.Builder - abstract static class Builder<T> { - abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix); + abstract static class Builder<UserT> { + abstract Builder<UserT> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix); + + abstract Builder<UserT> setTempDirectory(ValueProvider<ResourceId> tempDirectory); - abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> tempDirectory); + abstract Builder<UserT> setShardTemplate(@Nullable String shardTemplate); - abstract Builder<T> setShardTemplate(@Nullable String shardTemplate); + abstract Builder<UserT> setFilenameSuffix(@Nullable String filenameSuffix); - abstract Builder<T> setFilenameSuffix(@Nullable String filenameSuffix); + abstract Builder<UserT> setHeader(@Nullable String header); - abstract Builder<T> setHeader(@Nullable String header); + abstract Builder<UserT> setFooter(@Nullable String footer); - abstract Builder<T> setFooter(@Nullable String footer); + abstract Builder<UserT> setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy); - abstract Builder<T> setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy); + abstract Builder<UserT> setDynamicDestinations( + @Nullable DynamicDestinations<UserT, ?, String> dynamicDestinations); - abstract Builder<T> setDynamicDestinations( - @Nullable DynamicDestinations<T, ?> dynamicDestinations); + abstract Builder<UserT> setDestinationFunction( + @Nullable SerializableFunction<UserT, Params> destinationFunction); - abstract Builder<T> setFormatFunction(SerializableFunction<T, String> formatFunction); + abstract Builder<UserT> setEmptyDestination(Params emptyDestination); - abstract Builder<T> setNumShards(int numShards); + abstract Builder<UserT> setFormatFunction(SerializableFunction<UserT, String> formatFunction); - abstract Builder<T> setWindowedWrites(boolean windowedWrites); + abstract Builder<UserT> setNumShards(int numShards); - abstract Builder<T> setWritableByteChannelFactory( + abstract Builder<UserT> setWindowedWrites(boolean windowedWrites); + + abstract Builder<UserT> setWritableByteChannelFactory( WritableByteChannelFactory writableByteChannelFactory); - abstract TypedWrite<T> build(); + abstract TypedWrite<UserT> build(); } /** @@ -513,18 +534,18 @@ public class TextIO { * <p>If {@link #withTempDirectory} has not been called, this filename prefix will be used to * infer a directory for temporary files. */ - public TypedWrite<T> to(String filenamePrefix) { + public TypedWrite<UserT> to(String filenamePrefix) { return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix)); } /** Like {@link #to(String)}. */ @Experimental(Kind.FILESYSTEM) - public TypedWrite<T> to(ResourceId filenamePrefix) { + public TypedWrite<UserT> to(ResourceId filenamePrefix) { return toResource(StaticValueProvider.of(filenamePrefix)); } /** Like {@link #to(String)}. */ - public TypedWrite<T> to(ValueProvider<String> outputPrefix) { + public TypedWrite<UserT> to(ValueProvider<String> outputPrefix) { return toResource(NestedValueProvider.of(outputPrefix, new SerializableFunction<String, ResourceId>() { @Override @@ -538,7 +559,7 @@ public class TextIO { * Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A * directory for temporary files must be specified using {@link #withTempDirectory}. */ - public TypedWrite<T> to(FilenamePolicy filenamePolicy) { + public TypedWrite<UserT> to(FilenamePolicy filenamePolicy) { return toBuilder().setFilenamePolicy(filenamePolicy).build(); } @@ -547,7 +568,7 @@ public class TextIO { * objects can examine the input record when creating a {@link FilenamePolicy}. A directory for * temporary files must be specified using {@link #withTempDirectory}. */ - public TypedWrite<T> to(DynamicDestinations<T, ?> dynamicDestinations) { + public TypedWrite<UserT> to(DynamicDestinations<UserT, ?, String> dynamicDestinations) { return toBuilder().setDynamicDestinations(dynamicDestinations).build(); } @@ -558,26 +579,39 @@ public class TextIO { * emptyDestination parameter specified where empty files should be written for when the written * {@link PCollection} is empty. */ - public TypedWrite<T> to( - SerializableFunction<T, Params> destinationFunction, Params emptyDestination) { - return to(DynamicFileDestinations.toDefaultPolicies(destinationFunction, emptyDestination)); + public TypedWrite<UserT> to( + SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) { + return toBuilder() + .setDestinationFunction(destinationFunction) + .setEmptyDestination(emptyDestination) + .build(); } /** Like {@link #to(ResourceId)}. */ @Experimental(Kind.FILESYSTEM) - public TypedWrite<T> toResource(ValueProvider<ResourceId> filenamePrefix) { + public TypedWrite<UserT> toResource(ValueProvider<ResourceId> filenamePrefix) { return toBuilder().setFilenamePrefix(filenamePrefix).build(); } + /** + * Specifies a format function to convert {@link UserT} to the output type. If {@link + * #to(DynamicDestinations)} is used, {@link DynamicDestinations#formatRecord(Object)} must be + * used instead. + */ + public TypedWrite<UserT> withFormatFunction( + SerializableFunction<UserT, String> formatFunction) { + return toBuilder().setFormatFunction(formatFunction).build(); + } + /** Set the base directory used to generate temporary files. */ @Experimental(Kind.FILESYSTEM) - public TypedWrite<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) { + public TypedWrite<UserT> withTempDirectory(ValueProvider<ResourceId> tempDirectory) { return toBuilder().setTempDirectory(tempDirectory).build(); } /** Set the base directory used to generate temporary files. */ @Experimental(Kind.FILESYSTEM) - public TypedWrite<T> withTempDirectory(ResourceId tempDirectory) { + public TypedWrite<UserT> withTempDirectory(ResourceId tempDirectory) { return withTempDirectory(StaticValueProvider.of(tempDirectory)); } @@ -589,7 +623,7 @@ public class TextIO { * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are * used. */ - public TypedWrite<T> withShardNameTemplate(String shardTemplate) { + public TypedWrite<UserT> withShardNameTemplate(String shardTemplate) { return toBuilder().setShardTemplate(shardTemplate).build(); } @@ -601,7 +635,7 @@ public class TextIO { * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are * used. */ - public TypedWrite<T> withSuffix(String filenameSuffix) { + public TypedWrite<UserT> withSuffix(String filenameSuffix) { return toBuilder().setFilenameSuffix(filenameSuffix).build(); } @@ -615,7 +649,7 @@ public class TextIO { * * @param numShards the number of shards to use, or 0 to let the system decide. */ - public TypedWrite<T> withNumShards(int numShards) { + public TypedWrite<UserT> withNumShards(int numShards) { checkArgument(numShards >= 0); return toBuilder().setNumShards(numShards).build(); } @@ -629,7 +663,7 @@ public class TextIO { * * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")} */ - public TypedWrite<T> withoutSharding() { + public TypedWrite<UserT> withoutSharding() { return withNumShards(1).withShardNameTemplate(""); } @@ -638,7 +672,7 @@ public class TextIO { * * <p>A {@code null} value will clear any previously configured header. */ - public TypedWrite<T> withHeader(@Nullable String header) { + public TypedWrite<UserT> withHeader(@Nullable String header) { return toBuilder().setHeader(header).build(); } @@ -647,7 +681,7 @@ public class TextIO { * * <p>A {@code null} value will clear any previously configured footer. */ - public TypedWrite<T> withFooter(@Nullable String footer) { + public TypedWrite<UserT> withFooter(@Nullable String footer) { return toBuilder().setFooter(footer).build(); } @@ -658,7 +692,7 @@ public class TextIO { * * <p>A {@code null} value will reset the value to the default value mentioned above. */ - public TypedWrite<T> withWritableByteChannelFactory( + public TypedWrite<UserT> withWritableByteChannelFactory( WritableByteChannelFactory writableByteChannelFactory) { return toBuilder().setWritableByteChannelFactory(writableByteChannelFactory).build(); } @@ -669,36 +703,58 @@ public class TextIO { * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using * {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}. */ - public TypedWrite<T> withWindowedWrites() { + public TypedWrite<UserT> withWindowedWrites() { return toBuilder().setWindowedWrites(true).build(); } - private DynamicDestinations<T, ?> resolveDynamicDestinations() { - DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations(); + private DynamicDestinations<UserT, ?, String> resolveDynamicDestinations() { + DynamicDestinations<UserT, ?, String> dynamicDestinations = getDynamicDestinations(); if (dynamicDestinations == null) { - FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); - if (usedFilenamePolicy == null) { - usedFilenamePolicy = - DefaultFilenamePolicy.fromStandardParameters( - getFilenamePrefix(), - getShardTemplate(), - getFilenameSuffix(), - getWindowedWrites()); + if (getDestinationFunction() != null) { + dynamicDestinations = + DynamicFileDestinations.toDefaultPolicies( + getDestinationFunction(), getEmptyDestination(), getFormatFunction()); + } else { + FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); + if (usedFilenamePolicy == null) { + usedFilenamePolicy = + DefaultFilenamePolicy.fromStandardParameters( + getFilenamePrefix(), + getShardTemplate(), + getFilenameSuffix(), + getWindowedWrites()); + } + dynamicDestinations = + DynamicFileDestinations.constant(usedFilenamePolicy, getFormatFunction()); } - dynamicDestinations = DynamicFileDestinations.constant(usedFilenamePolicy); } return dynamicDestinations; } @Override - public PDone expand(PCollection<T> input) { + public PDone expand(PCollection<UserT> input) { checkState( getFilenamePrefix() != null || getTempDirectory() != null, "Need to set either the filename prefix or the tempDirectory of a TextIO.Write " + "transform."); - checkState( - getFilenamePolicy() == null || getDynamicDestinations() == null, - "Cannot specify both a filename policy and dynamic destinations"); + + List<?> allToArgs = + Lists.newArrayList( + getFilenamePolicy(), + getDynamicDestinations(), + getFilenamePrefix(), + getDestinationFunction()); + checkArgument( + 1 == Iterables.size(Iterables.filter(allToArgs, Predicates.notNull())), + "Exactly one of filename policy, dynamic destinations, filename prefix, or destination " + + "function must be set"); + + if (getDynamicDestinations() != null) { + checkArgument( + getFormatFunction() == null, + "A format function should not be specified " + + "with DynamicDestinations. Use DynamicDestinations.formatRecord instead"); + } if (getFilenamePolicy() != null || getDynamicDestinations() != null) { checkState( getShardTemplate() == null && getFilenameSuffix() == null, @@ -709,20 +765,20 @@ public class TextIO { } public <DestinationT> PDone expandTyped( - PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) { + PCollection<UserT> input, + DynamicDestinations<UserT, DestinationT, String> dynamicDestinations) { ValueProvider<ResourceId> tempDirectory = getTempDirectory(); if (tempDirectory == null) { tempDirectory = getFilenamePrefix(); } - WriteFiles<T, DestinationT, String> write = + WriteFiles<UserT, DestinationT, String> write = WriteFiles.to( new TextSink<>( tempDirectory, dynamicDestinations, getHeader(), getFooter(), - getWritableByteChannelFactory()), - getFormatFunction()); + getWritableByteChannelFactory())); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -774,7 +830,7 @@ public class TextIO { @VisibleForTesting TypedWrite<String> inner; Write() { - this(TextIO.writeCustomType(SerializableFunctions.<String>identity())); + this(TextIO.<String>writeCustomType()); } Write(TypedWrite<String> inner) { @@ -783,43 +839,53 @@ public class TextIO { /** See {@link TypedWrite#to(String)}. */ public Write to(String filenamePrefix) { - return new Write(inner.to(filenamePrefix)); + return new Write( + inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.<String>identity())); } /** See {@link TypedWrite#to(ResourceId)}. */ @Experimental(Kind.FILESYSTEM) public Write to(ResourceId filenamePrefix) { - return new Write(inner.to(filenamePrefix)); + return new Write( + inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.<String>identity())); } /** See {@link TypedWrite#to(ValueProvider)}. */ public Write to(ValueProvider<String> outputPrefix) { - return new Write(inner.to(outputPrefix)); + return new Write( + inner.to(outputPrefix).withFormatFunction(SerializableFunctions.<String>identity())); } /** See {@link TypedWrite#toResource(ValueProvider)}. */ @Experimental(Kind.FILESYSTEM) public Write toResource(ValueProvider<ResourceId> filenamePrefix) { - return new Write(inner.toResource(filenamePrefix)); + return new Write( + inner + .toResource(filenamePrefix) + .withFormatFunction(SerializableFunctions.<String>identity())); } /** See {@link TypedWrite#to(FilenamePolicy)}. */ @Experimental(Kind.FILESYSTEM) public Write to(FilenamePolicy filenamePolicy) { - return new Write(inner.to(filenamePolicy)); + return new Write( + inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.<String>identity())); } /** See {@link TypedWrite#to(DynamicDestinations)}. */ @Experimental(Kind.FILESYSTEM) - public Write to(DynamicDestinations<String, ?> dynamicDestinations) { - return new Write(inner.to(dynamicDestinations)); + public Write to(DynamicDestinations<String, ?, String> dynamicDestinations) { + return new Write(inner.to(dynamicDestinations).withFormatFunction(null)); } /** See {@link TypedWrite#to(SerializableFunction, Params)}. */ @Experimental(Kind.FILESYSTEM) public Write to( SerializableFunction<String, Params> destinationFunction, Params emptyDestination) { - return new Write(inner.to(destinationFunction, emptyDestination)); + return new Write( + inner + .to(destinationFunction, emptyDestination) + .withFormatFunction(SerializableFunctions.<String>identity())); } /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */ http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java index b57b28c..387e0ac 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java @@ -34,13 +34,13 @@ import org.apache.beam.sdk.util.MimeTypes; * '\n'} represented in {@code UTF-8} format as the record separator. Each record (including the * last) is terminated. */ -class TextSink<UserT, DestinationT> extends FileBasedSink<String, DestinationT> { +class TextSink<UserT, DestinationT> extends FileBasedSink<UserT, DestinationT, String> { @Nullable private final String header; @Nullable private final String footer; TextSink( ValueProvider<ResourceId> baseOutputFilename, - DynamicDestinations<UserT, DestinationT> dynamicDestinations, + DynamicDestinations<UserT, DestinationT, String> dynamicDestinations, @Nullable String header, @Nullable String footer, WritableByteChannelFactory writableByteChannelFactory) { @@ -50,13 +50,13 @@ class TextSink<UserT, DestinationT> extends FileBasedSink<String, DestinationT> } @Override - public WriteOperation<String, DestinationT> createWriteOperation() { + public WriteOperation<DestinationT, String> createWriteOperation() { return new TextWriteOperation<>(this, header, footer); } /** A {@link WriteOperation WriteOperation} for text files. */ private static class TextWriteOperation<DestinationT> - extends WriteOperation<String, DestinationT> { + extends WriteOperation<DestinationT, String> { @Nullable private final String header; @Nullable private final String footer; @@ -67,20 +67,20 @@ class TextSink<UserT, DestinationT> extends FileBasedSink<String, DestinationT> } @Override - public Writer<String, DestinationT> createWriter() throws Exception { + public Writer<DestinationT, String> createWriter() throws Exception { return new TextWriter<>(this, header, footer); } } /** A {@link Writer Writer} for text files. */ - private static class TextWriter<DestinationT> extends Writer<String, DestinationT> { + private static class TextWriter<DestinationT> extends Writer<DestinationT, String> { private static final String NEWLINE = "\n"; @Nullable private final String header; @Nullable private final String footer; private OutputStreamWriter out; public TextWriter( - WriteOperation<String, DestinationT> writeOperation, + WriteOperation<DestinationT, String> writeOperation, @Nullable String header, @Nullable String footer) { super(writeOperation, MimeTypes.TEXT); http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index d8d7478..85c5652 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -60,7 +60,6 @@ import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -76,7 +75,9 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; @@ -121,9 +122,8 @@ public class WriteFiles<UserT, DestinationT, OutputT> private static final int SPILLED_RECORD_SHARDING_FACTOR = 10; static final int UNKNOWN_SHARDNUM = -1; - private FileBasedSink<OutputT, DestinationT> sink; - private SerializableFunction<UserT, OutputT> formatFunction; - private WriteOperation<OutputT, DestinationT> writeOperation; + private FileBasedSink<UserT, DestinationT, OutputT> sink; + private WriteOperation<DestinationT, OutputT> writeOperation; // This allows the number of shards to be dynamically computed based on the input // PCollection. @Nullable private final PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards; @@ -133,37 +133,44 @@ public class WriteFiles<UserT, DestinationT, OutputT> private final ValueProvider<Integer> numShardsProvider; private final boolean windowedWrites; private int maxNumWritersPerBundle; + // This is the set of side inputs used by this transform. This is usually populated by the users's + // DynamicDestinations object. + private final List<PCollectionView<?>> sideInputs; /** * Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting * the runner control how many different shards are produced. */ public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, OutputT> to( - FileBasedSink<OutputT, DestinationT> sink, - SerializableFunction<UserT, OutputT> formatFunction) { + FileBasedSink<UserT, DestinationT, OutputT> sink) { checkNotNull(sink, "sink"); return new WriteFiles<>( sink, - formatFunction, null /* runner-determined sharding */, null, false, - DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE); + DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE, + sink.getDynamicDestinations().getSideInputs()); } private WriteFiles( - FileBasedSink<OutputT, DestinationT> sink, - SerializableFunction<UserT, OutputT> formatFunction, + FileBasedSink<UserT, DestinationT, OutputT> sink, @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards, @Nullable ValueProvider<Integer> numShardsProvider, boolean windowedWrites, - int maxNumWritersPerBundle) { + int maxNumWritersPerBundle, + List<PCollectionView<?>> sideInputs) { this.sink = sink; - this.formatFunction = checkNotNull(formatFunction); this.computeNumShards = computeNumShards; this.numShardsProvider = numShardsProvider; this.windowedWrites = windowedWrites; this.maxNumWritersPerBundle = maxNumWritersPerBundle; + this.sideInputs = sideInputs; + } + + @Override + public Map<TupleTag<?>, PValue> getAdditionalInputs() { + return PCollectionViews.toAdditionalInputs(sideInputs); } @Override @@ -207,15 +214,10 @@ public class WriteFiles<UserT, DestinationT, OutputT> } /** Returns the {@link FileBasedSink} associated with this PTransform. */ - public FileBasedSink<OutputT, DestinationT> getSink() { + public FileBasedSink<UserT, DestinationT, OutputT> getSink() { return sink; } - /** Returns the the format function that maps the user type to the record written to files. */ - public SerializableFunction<UserT, OutputT> getFormatFunction() { - return formatFunction; - } - /** * Returns whether or not to perform windowed writes. */ @@ -266,11 +268,11 @@ public class WriteFiles<UserT, DestinationT, OutputT> ValueProvider<Integer> numShardsProvider) { return new WriteFiles<>( sink, - formatFunction, computeNumShards, numShardsProvider, windowedWrites, - maxNumWritersPerBundle); + maxNumWritersPerBundle, + sideInputs); } /** Set the maximum number of writers created in a bundle before spilling to shuffle. */ @@ -278,11 +280,22 @@ public class WriteFiles<UserT, DestinationT, OutputT> int maxNumWritersPerBundle) { return new WriteFiles<>( sink, - formatFunction, computeNumShards, numShardsProvider, windowedWrites, - maxNumWritersPerBundle); + maxNumWritersPerBundle, + sideInputs); + } + + public WriteFiles<UserT, DestinationT, OutputT> withSideInputs( + List<PCollectionView<?>> sideInputs) { + return new WriteFiles<>( + sink, + computeNumShards, + numShardsProvider, + windowedWrites, + maxNumWritersPerBundle, + sideInputs); } /** @@ -297,7 +310,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> checkNotNull( sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead"); return new WriteFiles<>( - sink, formatFunction, sharding, null, windowedWrites, maxNumWritersPerBundle); + sink, sharding, null, windowedWrites, maxNumWritersPerBundle, sideInputs); } /** @@ -305,8 +318,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> * runner-determined sharding. */ public WriteFiles<UserT, DestinationT, OutputT> withRunnerDeterminedSharding() { - return new WriteFiles<>( - sink, formatFunction, null, null, windowedWrites, maxNumWritersPerBundle); + return new WriteFiles<>(sink, null, null, windowedWrites, maxNumWritersPerBundle, sideInputs); } /** @@ -323,7 +335,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> */ public WriteFiles<UserT, DestinationT, OutputT> withWindowedWrites() { return new WriteFiles<>( - sink, formatFunction, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle); + sink, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle, sideInputs); } private static class WriterKey<DestinationT> { @@ -374,7 +386,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> private final Coder<DestinationT> destinationCoder; private final boolean windowedWrites; - private Map<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> writers; + private Map<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> writers; private int spilledShardNum = UNKNOWN_SHARDNUM; WriteBundles( @@ -394,6 +406,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c); PaneInfo paneInfo = c.pane(); // If we are doing windowed writes, we need to ensure that we have separate files for // data in different windows/panes. Similar for dynamic writes, make sure that different @@ -402,7 +415,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> // the map will only have a single element. DestinationT destination = sink.getDynamicDestinations().getDestination(c.element()); WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination); - Writer<OutputT, DestinationT> writer = writers.get(key); + Writer<DestinationT, OutputT> writer = writers.get(key); if (writer == null) { if (writers.size() <= maxNumWritersPerBundle) { String uuid = UUID.randomUUID().toString(); @@ -436,14 +449,14 @@ public class WriteFiles<UserT, DestinationT, OutputT> return; } } - writeOrClose(writer, formatFunction.apply(c.element())); + writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(c.element())); } @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { - for (Map.Entry<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> entry : + for (Map.Entry<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> entry : writers.entrySet()) { - Writer<OutputT, DestinationT> writer = entry.getValue(); + Writer<DestinationT, OutputT> writer = entry.getValue(); FileResult<DestinationT> result; try { result = writer.close(); @@ -478,13 +491,14 @@ public class WriteFiles<UserT, DestinationT, OutputT> @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c); // Since we key by a 32-bit hash of the destination, there might be multiple destinations // in this iterable. The number of destinations is generally very small (1000s or less), so // there will rarely be hash collisions. - Map<DestinationT, Writer<OutputT, DestinationT>> writers = Maps.newHashMap(); + Map<DestinationT, Writer<DestinationT, OutputT>> writers = Maps.newHashMap(); for (UserT input : c.element().getValue()) { DestinationT destination = sink.getDynamicDestinations().getDestination(input); - Writer<OutputT, DestinationT> writer = writers.get(destination); + Writer<DestinationT, OutputT> writer = writers.get(destination); if (writer == null) { LOG.debug("Opening writer for write operation {}", writeOperation); writer = writeOperation.createWriter(); @@ -501,12 +515,12 @@ public class WriteFiles<UserT, DestinationT, OutputT> LOG.debug("Done opening writer"); writers.put(destination, writer); } - writeOrClose(writer, formatFunction.apply(input)); - } + writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(input)); + } // Close all writers. - for (Map.Entry<DestinationT, Writer<OutputT, DestinationT>> entry : writers.entrySet()) { - Writer<OutputT, DestinationT> writer = entry.getValue(); + for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) { + Writer<DestinationT, OutputT> writer = entry.getValue(); FileResult<DestinationT> result; try { // Close the writer; if this throws let the error propagate. @@ -526,8 +540,8 @@ public class WriteFiles<UserT, DestinationT, OutputT> } } - private static <OutputT, DestinationT> void writeOrClose( - Writer<OutputT, DestinationT> writer, OutputT t) throws Exception { + private static <DestinationT, OutputT> void writeOrClose( + Writer<DestinationT, OutputT> writer, OutputT t) throws Exception { try { writer.write(t); } catch (Exception e) { @@ -678,6 +692,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> input.apply( writeName, ParDo.of(new WriteBundles(windowedWrites, unwrittedRecordsTag, destinationCoder)) + .withSideInputs(sideInputs) .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag))); PCollection<FileResult<DestinationT>> writtenBundleFiles = writeTuple @@ -692,17 +707,18 @@ public class WriteFiles<UserT, DestinationT, OutputT> .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, UserT>create()) .apply( "WriteUnwritten", - ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE))) + ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE)) + .withSideInputs(sideInputs)) .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder)); results = PCollectionList.of(writtenBundleFiles) .and(writtenGroupedFiles) .apply(Flatten.<FileResult<DestinationT>>pCollections()); } else { - List<PCollectionView<?>> sideInputs = Lists.newArrayList(); + List<PCollectionView<?>> shardingSideInputs = Lists.newArrayList(); if (computeNumShards != null) { numShardsView = input.apply(computeNumShards); - sideInputs.add(numShardsView); + shardingSideInputs.add(numShardsView); } else { numShardsView = null; } @@ -715,7 +731,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> numShardsView, (numShardsView != null) ? null : numShardsProvider, destinationCoder)) - .withSideInputs(sideInputs)) + .withSideInputs(shardingSideInputs)) .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create()); shardedWindowCoder = @@ -728,7 +744,8 @@ public class WriteFiles<UserT, DestinationT, OutputT> results = sharded.apply( "WriteShardedBundles", - ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING))); + ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING)) + .withSideInputs(sideInputs)); } results.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder)); @@ -773,11 +790,12 @@ public class WriteFiles<UserT, DestinationT, OutputT> } else { final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView = results.apply(View.<FileResult<DestinationT>>asIterable()); - ImmutableList.Builder<PCollectionView<?>> sideInputs = + ImmutableList.Builder<PCollectionView<?>> finalizeSideInputs = ImmutableList.<PCollectionView<?>>builder().add(resultsView); if (numShardsView != null) { - sideInputs.add(numShardsView); + finalizeSideInputs.add(numShardsView); } + finalizeSideInputs.addAll(sideInputs); // Finalize the write in another do-once ParDo on the singleton collection containing the // Writer. The results from the per-bundle writes are given as an Iterable side input. @@ -794,7 +812,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> new DoFn<Void, Integer>() { @ProcessElement public void processElement(ProcessContext c) throws Exception { - LOG.info("Finalizing write operation {}.", writeOperation); + sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c); // We must always output at least 1 shard, and honor user-specified numShards // if // set. @@ -827,7 +845,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> writeOperation.removeTemporaryFiles(tempFiles); } }) - .withSideInputs(sideInputs.build())); + .withSideInputs(finalizeSideInputs.build())); } return PDone.in(input.getPipeline()); } @@ -857,7 +875,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> minShardsNeeded, destination); for (int i = 0; i < extraShardsNeeded; ++i) { - Writer<OutputT, DestinationT> writer = writeOperation.createWriter(); + Writer<DestinationT, OutputT> writer = writeOperation.createWriter(); // Currently this code path is only called in the unwindowed case. writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination); FileResult<DestinationT> emptyWrite = writer.close(); http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 154ff5a..a96b6be 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -30,9 +30,11 @@ import static org.junit.Assert.assertTrue; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -41,6 +43,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Random; import java.util.Set; @@ -48,6 +51,7 @@ import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.Nullable; @@ -55,6 +59,7 @@ import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; @@ -68,6 +73,7 @@ import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -77,6 +83,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; @@ -535,17 +542,147 @@ public class AvroIOTest { assertThat(actualElements, containsInAnyOrder(allElements.toArray())); } + private static final String SCHEMA_TEMPLATE_STRING = + "{\"namespace\": \"example.avro\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestTemplateSchema$$\",\n" + + " \"fields\": [\n" + + " {\"name\": \"$$full\", \"type\": \"string\"},\n" + + " {\"name\": \"$$suffix\", \"type\": [\"string\", \"null\"]}\n" + + " ]\n" + + "}"; + + private static String schemaFromPrefix(String prefix) { + return SCHEMA_TEMPLATE_STRING.replace("$$", prefix); + } + + private static GenericRecord createRecord(String record, String prefix, Schema schema) { + GenericRecord genericRecord = new GenericData.Record(schema); + genericRecord.put(prefix + "full", record); + genericRecord.put(prefix + "suffix", record.substring(1)); + return genericRecord; + } + + private static class TestDynamicDestinations + extends DynamicAvroDestinations<String, String, GenericRecord> { + ResourceId baseDir; + PCollectionView<Map<String, String>> schemaView; + + TestDynamicDestinations(ResourceId baseDir, PCollectionView<Map<String, String>> schemaView) { + this.baseDir = baseDir; + this.schemaView = schemaView; + } + + @Override + public Schema getSchema(String destination) { + // Return a per-destination schema. + String schema = sideInput(schemaView).get(destination); + return new Schema.Parser().parse(schema); + } + + @Override + public List<PCollectionView<?>> getSideInputs() { + return ImmutableList.<PCollectionView<?>>of(schemaView); + } + + @Override + public GenericRecord formatRecord(String record) { + String prefix = record.substring(0, 1); + return createRecord(record, prefix, getSchema(prefix)); + } + + @Override + public String getDestination(String element) { + // Destination is based on first character of string. + return element.substring(0, 1); + } + + @Override + public String getDefaultDestination() { + return ""; + } + + @Override + public FilenamePolicy getFilenamePolicy(String destination) { + return DefaultFilenamePolicy.fromStandardParameters( + StaticValueProvider.of( + baseDir.resolve("file_" + destination + ".txt", StandardResolveOptions.RESOLVE_FILE)), + null, + null, + false); + } + } + + @Test + @Category(NeedsRunner.class) + public void testDynamicDestinations() throws Exception { + ResourceId baseDir = + FileSystems.matchNewResource( + Files.createTempDirectory(tmpFolder.getRoot().toPath(), "testDynamicDestinations") + .toString(), + true); + + List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa", "caab"); + List<GenericRecord> expectedElements = Lists.newArrayListWithExpectedSize(elements.size()); + Map<String, String> schemaMap = Maps.newHashMap(); + for (String element : elements) { + String prefix = element.substring(0, 1); + String jsonSchema = schemaFromPrefix(prefix); + schemaMap.put(prefix, jsonSchema); + expectedElements.add(createRecord(element, prefix, new Schema.Parser().parse(jsonSchema))); + } + PCollectionView<Map<String, String>> schemaView = + writePipeline + .apply("createSchemaView", Create.of(schemaMap)) + .apply(View.<String, String>asMap()); + + PCollection<String> input = + writePipeline.apply("createInput", Create.of(elements).withCoder(StringUtf8Coder.of())); + input.apply( + AvroIO.<String>writeCustomTypeToGenericRecords() + .to(new TestDynamicDestinations(baseDir, schemaView)) + .withoutSharding() + .withTempDirectory(baseDir)); + writePipeline.run(); + + // Validate that the data written matches the expected elements in the expected order. + + List<String> prefixes = Lists.newArrayList(); + for (String element : elements) { + prefixes.add(element.substring(0, 1)); + } + prefixes = ImmutableSet.copyOf(prefixes).asList(); + + List<GenericRecord> actualElements = new ArrayList<>(); + for (String prefix : prefixes) { + File expectedFile = + new File( + baseDir + .resolve( + "file_" + prefix + ".txt-00000-of-00001", StandardResolveOptions.RESOLVE_FILE) + .toString()); + assertTrue("Expected output file " + expectedFile.getAbsolutePath(), expectedFile.exists()); + Schema schema = new Schema.Parser().parse(schemaFromPrefix(prefix)); + try (DataFileReader<GenericRecord> reader = + new DataFileReader<>(expectedFile, new GenericDatumReader<GenericRecord>(schema))) { + Iterators.addAll(actualElements, reader); + } + expectedFile.delete(); + } + assertThat(actualElements, containsInAnyOrder(expectedElements.toArray())); + } + @Test public void testWriteWithDefaultCodec() throws Exception { AvroIO.Write<String> write = AvroIO.write(String.class).to("/tmp/foo/baz"); - assertEquals(CodecFactory.deflateCodec(6).toString(), write.getCodec().toString()); + assertEquals(CodecFactory.deflateCodec(6).toString(), write.inner.getCodec().toString()); } @Test public void testWriteWithCustomCodec() throws Exception { AvroIO.Write<String> write = AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.snappyCodec()); - assertEquals(SNAPPY_CODEC, write.getCodec().toString()); + assertEquals(SNAPPY_CODEC, write.inner.getCodec().toString()); } @Test @@ -556,7 +693,7 @@ public class AvroIOTest { assertEquals( CodecFactory.deflateCodec(9).toString(), - SerializableUtils.clone(write.getCodec()).getCodec().toString()); + SerializableUtils.clone(write.inner.getCodec()).getCodec().toString()); } @Test @@ -567,7 +704,7 @@ public class AvroIOTest { assertEquals( CodecFactory.xzCodec(9).toString(), - SerializableUtils.clone(write.getCodec()).getCodec().toString()); + SerializableUtils.clone(write.inner.getCodec()).getCodec().toString()); } @Test @@ -618,7 +755,8 @@ public class AvroIOTest { String shardNameTemplate = firstNonNull( - write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); + write.inner.getShardTemplate(), + DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate); } @@ -710,7 +848,13 @@ public class AvroIOTest { assertThat(displayData, hasDisplayItem("filePrefix", "/foo")); assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-")); assertThat(displayData, hasDisplayItem("fileSuffix", "bar")); - assertThat(displayData, hasDisplayItem("schema", GenericClass.class)); + assertThat( + displayData, + hasDisplayItem( + "schema", + "{\"type\":\"record\",\"name\":\"GenericClass\",\"namespace\":\"org.apache.beam.sdk.io" + + ".AvroIOTest$\",\"fields\":[{\"name\":\"intField\",\"type\":\"int\"}," + + "{\"name\":\"stringField\",\"type\":\"string\"}]}")); assertThat(displayData, hasDisplayItem("numShards", 100)); assertThat(displayData, hasDisplayItem("codec", CodecFactory.snappyCodec().toString())); } http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index a6ad746..ff30e33 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -231,7 +231,7 @@ public class FileBasedSinkTest { SimpleSink.makeSimpleSink( getBaseOutputDirectory(), prefix, "", "", CompressionType.UNCOMPRESSED); - WriteOperation<String, Void> writeOp = + WriteOperation<Void, String> writeOp = new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory); List<File> temporaryFiles = new ArrayList<>(); @@ -482,11 +482,11 @@ public class FileBasedSinkTest { public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception { final String testUid = "testId"; ResourceId root = getBaseOutputDirectory(); - WriteOperation<String, Void> writeOp = + WriteOperation<Void, String> writeOp = SimpleSink.makeSimpleSink( root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory()) .createWriteOperation(); - final Writer<String, Void> writer = writeOp.createWriter(); + final Writer<Void, String> writer = writeOp.createWriter(); final ResourceId expectedFile = writeOp.tempDirectory.get().resolve(testUid, StandardResolveOptions.RESOLVE_FILE); http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java index 9196178..382898d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java @@ -28,10 +28,10 @@ import org.apache.beam.sdk.util.MimeTypes; /** * A simple {@link FileBasedSink} that writes {@link String} values as lines with header and footer. */ -class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> { +class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT, String> { public SimpleSink( ResourceId tempDirectory, - DynamicDestinations<String, DestinationT> dynamicDestinations, + DynamicDestinations<String, DestinationT, String> dynamicDestinations, WritableByteChannelFactory writableByteChannelFactory) { super(StaticValueProvider.of(tempDirectory), dynamicDestinations, writableByteChannelFactory); } @@ -50,7 +50,7 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> { String shardTemplate, String suffix, WritableByteChannelFactory writableByteChannelFactory) { - DynamicDestinations<String, Void> dynamicDestinations = + DynamicDestinations<String, Void, String> dynamicDestinations = DynamicFileDestinations.constant( DefaultFilenamePolicy.fromParams( new Params() @@ -67,7 +67,7 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> { } static final class SimpleWriteOperation<DestinationT> - extends WriteOperation<String, DestinationT> { + extends WriteOperation<DestinationT, String> { public SimpleWriteOperation(SimpleSink sink, ResourceId tempOutputDirectory) { super(sink, tempOutputDirectory); } @@ -82,7 +82,7 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> { } } - static final class SimpleWriter<DestinationT> extends Writer<String, DestinationT> { + static final class SimpleWriter<DestinationT> extends Writer<DestinationT, String> { static final String HEADER = "header"; static final String FOOTER = "footer"; http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java index a73ed7d..7f80c26 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java @@ -110,7 +110,8 @@ public class TextIOWriteTest { }); } - static class TestDynamicDestinations extends FileBasedSink.DynamicDestinations<String, String> { + static class TestDynamicDestinations + extends FileBasedSink.DynamicDestinations<String, String, String> { ResourceId baseDir; TestDynamicDestinations(ResourceId baseDir) { @@ -118,6 +119,11 @@ public class TextIOWriteTest { } @Override + public String formatRecord(String record) { + return record; + } + + @Override public String getDestination(String element) { // Destination is based on first character of string. return element.substring(0, 1); @@ -169,10 +175,7 @@ public class TextIOWriteTest { List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa", "caab"); PCollection<String> input = p.apply(Create.of(elements).withCoder(StringUtf8Coder.of())); - input.apply( - TextIO.write() - .to(new TestDynamicDestinations(baseDir)) - .withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true))); + input.apply(TextIO.write().to(new TestDynamicDestinations(baseDir)).withTempDirectory(baseDir)); p.run(); assertOutputFiles( @@ -268,8 +271,14 @@ public class TextIOWriteTest { new UserWriteType("caab", "sixth")); PCollection<UserWriteType> input = p.apply(Create.of(elements)); input.apply( - TextIO.writeCustomType(new SerializeUserWrite()) - .to(new UserWriteDestination(baseDir), new DefaultFilenamePolicy.Params()) + TextIO.<UserWriteType>writeCustomType() + .to( + new UserWriteDestination(baseDir), + new DefaultFilenamePolicy.Params() + .withBaseFilename( + baseDir.resolve( + "empty", ResolveOptions.StandardResolveOptions.RESOLVE_FILE))) + .withFormatFunction(new SerializeUserWrite()) .withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true))); p.run();
