http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedSource.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedSource.java deleted file mode 100644 index be3a415..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedSource.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.io; - -import com.google.cloud.dataflow.sdk.annotations.Experimental; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; - -import org.joda.time.Instant; - -import java.io.IOException; -import java.util.List; -import java.util.NoSuchElementException; - -/** - * A {@link Source} that reads a finite amount of input and, because of that, supports - * some additional operations. - * - * <p>The operations are: - * <ul> - * <li>Splitting into bundles of given size: {@link #splitIntoBundles}; - * <li>Size estimation: {@link #getEstimatedSizeBytes}; - * <li>Telling whether or not this source produces key/value pairs in sorted order: - * {@link #producesSortedKeys}; - * <li>The reader ({@link BoundedReader}) supports progress estimation - * ({@link BoundedReader#getFractionConsumed}) and dynamic splitting - * ({@link BoundedReader#splitAtFraction}). - * </ul> - * - * <p>To use this class for supporting your custom input type, derive your class - * class from it, and override the abstract methods. For an example, see {@link DatastoreIO}. - * - * @param <T> Type of records read by the source. - */ -public abstract class BoundedSource<T> extends Source<T> { - /** - * Splits the source into bundles of approximately {@code desiredBundleSizeBytes}. - */ - public abstract List<? extends BoundedSource<T>> splitIntoBundles( - long desiredBundleSizeBytes, PipelineOptions options) throws Exception; - - /** - * An estimate of the total size (in bytes) of the data that would be read from this source. - * This estimate is in terms of external storage size, before any decompression or other - * processing done by the reader. - */ - public abstract long getEstimatedSizeBytes(PipelineOptions options) throws Exception; - - /** - * Whether this source is known to produce key/value pairs sorted by lexicographic order on - * the bytes of the encoded key. - */ - public abstract boolean producesSortedKeys(PipelineOptions options) throws Exception; - - /** - * Returns a new {@link BoundedReader} that reads from this source. - */ - public abstract BoundedReader<T> createReader(PipelineOptions options) throws IOException; - - /** - * A {@code Reader} that reads a bounded amount of input and supports some additional - * operations, such as progress estimation and dynamic work rebalancing. - * - * <h3>Boundedness</h3> - * <p>Once {@link #start} or {@link #advance} has returned false, neither will be called - * again on this object. - * - * <h3>Thread safety</h3> - * All methods will be run from the same thread except {@link #splitAtFraction}, - * {@link #getFractionConsumed} and {@link #getCurrentSource}, which can be called concurrently - * from a different thread. There will not be multiple concurrent calls to - * {@link #splitAtFraction} but there can be for {@link #getFractionConsumed} if - * {@link #splitAtFraction} is implemented. - * - * <p>If the source does not implement {@link #splitAtFraction}, you do not need to worry about - * thread safety. If implemented, it must be safe to call {@link #splitAtFraction} and - * {@link #getFractionConsumed} concurrently with other methods. - * - * <p>Additionally, a successful {@link #splitAtFraction} call must, by definition, cause - * {@link #getCurrentSource} to start returning a different value. - * Callers of {@link #getCurrentSource} need to be aware of the possibility that the returned - * value can change at any time, and must only access the properties of the source returned by - * {@link #getCurrentSource} which do not change between {@link #splitAtFraction} calls. - * - * <h3>Implementing {@link #splitAtFraction}</h3> - * In the course of dynamic work rebalancing, the method {@link #splitAtFraction} - * may be called concurrently with {@link #advance} or {@link #start}. It is critical that - * their interaction is implemented in a thread-safe way, otherwise data loss is possible. - * - * <p>Sources which support dynamic work rebalancing should use - * {@link com.google.cloud.dataflow.sdk.io.range.RangeTracker} to manage the (source-specific) - * range of positions that is being split. If your source supports dynamic work rebalancing, - * please use that class to implement it if possible; if not possible, please contact the team - * at <i>[email protected]</i>. - */ - @Experimental(Experimental.Kind.SOURCE_SINK) - public abstract static class BoundedReader<T> extends Source.Reader<T> { - /** - * Returns a value in [0, 1] representing approximately what fraction of the - * {@link #getCurrentSource current source} this reader has read so far, or {@code null} if such - * an estimate is not available. - * - * <p>It is recommended that this method should satisfy the following properties: - * <ul> - * <li>Should return 0 before the {@link #start} call. - * <li>Should return 1 after a {@link #start} or {@link #advance} call that returns false. - * <li>The returned values should be non-decreasing (though they don't have to be unique). - * </ul> - * - * <p>By default, returns null to indicate that this cannot be estimated. - * - * <h5>Thread safety</h5> - * If {@link #splitAtFraction} is implemented, this method can be called concurrently to other - * methods (including itself), and it is therefore critical for it to be implemented - * in a thread-safe way. - */ - public Double getFractionConsumed() { - return null; - } - - /** - * Returns a {@code Source} describing the same input that this {@code Reader} currently reads - * (including items already read). - * - * <h3>Usage</h3> - * <p>Reader subclasses can use this method for convenience to access unchanging properties of - * the source being read. Alternatively, they can cache these properties in the constructor. - * <p>The framework will call this method in the course of dynamic work rebalancing, e.g. after - * a successful {@link BoundedSource.BoundedReader#splitAtFraction} call. - * - * <h3>Mutability and thread safety</h3> - * Remember that {@link Source} objects must always be immutable. However, the return value of - * this function may be affected by dynamic work rebalancing, happening asynchronously via - * {@link BoundedSource.BoundedReader#splitAtFraction}, meaning it can return a different - * {@link Source} object. However, the returned object itself will still itself be immutable. - * Callers must take care not to rely on properties of the returned source that may be - * asynchronously changed as a result of this process (e.g. do not cache an end offset when - * reading a file). - * - * <h3>Implementation</h3> - * For convenience, subclasses should usually return the most concrete subclass of - * {@link Source} possible. - * In practice, the implementation of this method should nearly always be one of the following: - * <ul> - * <li>Source that inherits from a base class that already implements - * {@link #getCurrentSource}: delegate to base class. In this case, it is almost always - * an error for the subclass to maintain its own copy of the source. - * <pre>{@code - * public FooReader(FooSource<T> source) { - * super(source); - * } - * - * public FooSource<T> getCurrentSource() { - * return (FooSource<T>)super.getCurrentSource(); - * } - * }</pre> - * <li>Source that does not support dynamic work rebalancing: return a private final variable. - * <pre>{@code - * private final FooSource<T> source; - * - * public FooReader(FooSource<T> source) { - * this.source = source; - * } - * - * public FooSource<T> getCurrentSource() { - * return source; - * } - * }</pre> - * <li>{@link BoundedSource.BoundedReader} that explicitly supports dynamic work rebalancing: - * maintain a variable pointing to an immutable source object, and protect it with - * synchronization. - * <pre>{@code - * private FooSource<T> source; - * - * public FooReader(FooSource<T> source) { - * this.source = source; - * } - * - * public synchronized FooSource<T> getCurrentSource() { - * return source; - * } - * - * public synchronized FooSource<T> splitAtFraction(double fraction) { - * ... - * FooSource<T> primary = ...; - * FooSource<T> residual = ...; - * this.source = primary; - * return residual; - * } - * }</pre> - * </ul> - */ - @Override - public abstract BoundedSource<T> getCurrentSource(); - - /** - * Tells the reader to narrow the range of the input it's going to read and give up - * the remainder, so that the new range would contain approximately the given - * fraction of the amount of data in the current range. - * - * <p>Returns a {@code BoundedSource} representing the remainder. - * - * <h5>Detailed description</h5> - * Assuming the following sequence of calls: - * <pre>{@code - * BoundedSource<T> initial = reader.getCurrentSource(); - * BoundedSource<T> residual = reader.splitAtFraction(fraction); - * BoundedSource<T> primary = reader.getCurrentSource(); - * }</pre> - * <ul> - * <li> The "primary" and "residual" sources, when read, should together cover the same - * set of records as "initial". - * <li> The current reader should continue to be in a valid state, and continuing to read - * from it should, together with the records it already read, yield the same records - * as would have been read by "primary". - * <li> The amount of data read by "primary" should ideally represent approximately - * the given fraction of the amount of data read by "initial". - * </ul> - * For example, a reader that reads a range of offsets <i>[A, B)</i> in a file might implement - * this method by truncating the current range to <i>[A, A + fraction*(B-A))</i> and returning - * a Source representing the range <i>[A + fraction*(B-A), B)</i>. - * - * <p>This method should return {@code null} if the split cannot be performed for this fraction - * while satisfying the semantics above. E.g., a reader that reads a range of offsets - * in a file should return {@code null} if it is already past the position in its range - * corresponding to the given fraction. In this case, the method MUST have no effect - * (the reader must behave as if the method hadn't been called at all). - * - * <h5>Statefulness</h5> - * Since this method (if successful) affects the reader's source, in subsequent invocations - * "fraction" should be interpreted relative to the new current source. - * - * <h5>Thread safety and blocking</h5> - * This method will be called concurrently to other methods (however there will not be multiple - * concurrent invocations of this method itself), and it is critical for it to be implemented - * in a thread-safe way (otherwise data loss is possible). - * - * <p>It is also very important that this method always completes quickly. In particular, - * it should not perform or wait on any blocking operations such as I/O, RPCs etc. Violating - * this requirement may stall completion of the work item or even cause it to fail. - * - * <p>It is incorrect to make both this method and {@link #start}/{@link #advance} - * {@code synchronized}, because those methods can perform blocking operations, and then - * this method would have to wait for those calls to complete. - * - * <p>{@link com.google.cloud.dataflow.sdk.io.range.RangeTracker} makes it easy to implement - * this method safely and correctly. - * - * <p>By default, returns null to indicate that splitting is not possible. - */ - public BoundedSource<T> splitAtFraction(double fraction) { - return null; - } - - /** - * By default, returns the minimum possible timestamp. - */ - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return BoundedWindow.TIMESTAMP_MIN_VALUE; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java deleted file mode 100644 index e3dca91..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java +++ /dev/null @@ -1,413 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.io; - -import com.google.cloud.dataflow.sdk.annotations.Experimental; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.common.base.Preconditions; -import com.google.common.io.ByteStreams; -import com.google.common.primitives.Ints; - -import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; -import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; - -import java.io.IOException; -import java.io.PushbackInputStream; -import java.io.Serializable; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.util.NoSuchElementException; -import java.util.zip.GZIPInputStream; - -/** - * A Source that reads from compressed files. A {@code CompressedSources} wraps a delegate - * {@link FileBasedSource} that is able to read the decompressed file format. - * - * <p>For example, use the following to read from a gzip-compressed XML file: - * - * <pre> {@code - * XmlSource mySource = XmlSource.from(...); - * PCollection<T> collection = p.apply(Read.from(CompressedSource - * .from(mySource) - * .withDecompression(CompressedSource.CompressionMode.GZIP))); - * } </pre> - * - * <p>Supported compression algorithms are {@link CompressionMode#GZIP} and - * {@link CompressionMode#BZIP2}. User-defined compression types are supported by implementing - * {@link DecompressingChannelFactory}. - * - * <p>By default, the compression algorithm is selected from those supported in - * {@link CompressionMode} based on the file name provided to the source, namely - * {@code ".bz2"} indicates {@link CompressionMode#BZIP2} and {@code ".gz"} indicates - * {@link CompressionMode#GZIP}. If the file name does not match any of the supported - * algorithms, it is assumed to be uncompressed data. - * - * @param <T> The type to read from the compressed file. - */ -@Experimental(Experimental.Kind.SOURCE_SINK) -public class CompressedSource<T> extends FileBasedSource<T> { - /** - * Factory interface for creating channels that decompress the content of an underlying channel. - */ - public static interface DecompressingChannelFactory extends Serializable { - /** - * Given a channel, create a channel that decompresses the content read from the channel. - * @throws IOException - */ - public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) - throws IOException; - } - - /** - * Factory interface for creating channels that decompress the content of an underlying channel, - * based on both the channel and the file name. - */ - private static interface FileNameBasedDecompressingChannelFactory - extends DecompressingChannelFactory { - /** - * Given a channel, create a channel that decompresses the content read from the channel. - * @throws IOException - */ - ReadableByteChannel createDecompressingChannel(String fileName, ReadableByteChannel channel) - throws IOException; - - /** - * Given a file name, returns true if the file name matches any supported compression - * scheme. - */ - boolean isCompressed(String fileName); - } - - /** - * Default compression types supported by the {@code CompressedSource}. - */ - public enum CompressionMode implements DecompressingChannelFactory { - /** - * Reads a byte channel assuming it is compressed with gzip. - */ - GZIP { - @Override - public boolean matches(String fileName) { - return fileName.toLowerCase().endsWith(".gz"); - } - - @Override - public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) - throws IOException { - // Determine if the input stream is gzipped. The input stream returned from the - // GCS connector may already be decompressed; GCS does this based on the - // content-encoding property. - PushbackInputStream stream = new PushbackInputStream(Channels.newInputStream(channel), 2); - byte[] headerBytes = new byte[2]; - int bytesRead = ByteStreams.read( - stream /* source */, headerBytes /* dest */, 0 /* offset */, 2 /* len */); - stream.unread(headerBytes, 0, bytesRead); - if (bytesRead >= 2) { - byte zero = 0x00; - int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]); - if (header == GZIPInputStream.GZIP_MAGIC) { - return Channels.newChannel(new GzipCompressorInputStream(stream)); - } - } - return Channels.newChannel(stream); - } - }, - - /** - * Reads a byte channel assuming it is compressed with bzip2. - */ - BZIP2 { - @Override - public boolean matches(String fileName) { - return fileName.toLowerCase().endsWith(".bz2"); - } - - @Override - public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) - throws IOException { - return Channels.newChannel( - new BZip2CompressorInputStream(Channels.newInputStream(channel))); - } - }; - - /** - * Returns {@code true} if the given file name implies that the contents are compressed - * according to the compression embodied by this factory. - */ - public abstract boolean matches(String fileName); - - @Override - public abstract ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) - throws IOException; - } - - /** - * Reads a byte channel detecting compression according to the file name. If the filename - * is not any other known {@link CompressionMode}, it is presumed to be uncompressed. - */ - private static class DecompressAccordingToFilename - implements FileNameBasedDecompressingChannelFactory { - - @Override - public ReadableByteChannel createDecompressingChannel( - String fileName, ReadableByteChannel channel) throws IOException { - for (CompressionMode type : CompressionMode.values()) { - if (type.matches(fileName)) { - return type.createDecompressingChannel(channel); - } - } - // Uncompressed - return channel; - } - - @Override - public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) { - throw new UnsupportedOperationException( - String.format("%s does not support createDecompressingChannel(%s) but only" - + " createDecompressingChannel(%s,%s)", - getClass().getSimpleName(), - String.class.getSimpleName(), - ReadableByteChannel.class.getSimpleName(), - ReadableByteChannel.class.getSimpleName())); - } - - @Override - public boolean isCompressed(String fileName) { - for (CompressionMode type : CompressionMode.values()) { - if (type.matches(fileName)) { - return true; - } - } - return false; - } - } - - private final FileBasedSource<T> sourceDelegate; - private final DecompressingChannelFactory channelFactory; - - /** - * Creates a {@link Read} transform that reads from that reads from the underlying - * {@link FileBasedSource} {@code sourceDelegate} after decompressing it with a {@link - * DecompressingChannelFactory}. - */ - public static <T> Read.Bounded<T> readFromSource( - FileBasedSource<T> sourceDelegate, DecompressingChannelFactory channelFactory) { - return Read.from(new CompressedSource<>(sourceDelegate, channelFactory)); - } - - /** - * Creates a {@code CompressedSource} from an underlying {@code FileBasedSource}. The type - * of compression used will be based on the file name extension unless explicitly - * configured via {@link CompressedSource#withDecompression}. - */ - public static <T> CompressedSource<T> from(FileBasedSource<T> sourceDelegate) { - return new CompressedSource<>(sourceDelegate, new DecompressAccordingToFilename()); - } - - /** - * Return a {@code CompressedSource} that is like this one but will decompress its underlying file - * with the given {@link DecompressingChannelFactory}. - */ - public CompressedSource<T> withDecompression(DecompressingChannelFactory channelFactory) { - return new CompressedSource<>(this.sourceDelegate, channelFactory); - } - - /** - * Creates a {@code CompressedSource} from a delegate file based source and a decompressing - * channel factory. - */ - private CompressedSource( - FileBasedSource<T> sourceDelegate, DecompressingChannelFactory channelFactory) { - super(sourceDelegate.getFileOrPatternSpec(), Long.MAX_VALUE); - this.sourceDelegate = sourceDelegate; - this.channelFactory = channelFactory; - } - - /** - * Creates a {@code CompressedSource} for an individual file. Used by {@link - * CompressedSource#createForSubrangeOfFile}. - */ - private CompressedSource(FileBasedSource<T> sourceDelegate, - DecompressingChannelFactory channelFactory, String filePatternOrSpec, long minBundleSize, - long startOffset, long endOffset) { - super(filePatternOrSpec, minBundleSize, startOffset, endOffset); - Preconditions.checkArgument( - startOffset == 0, - "CompressedSources must start reading at offset 0. Requested offset: " + startOffset); - this.sourceDelegate = sourceDelegate; - this.channelFactory = channelFactory; - } - - /** - * Validates that the delegate source is a valid source and that the channel factory is not null. - */ - @Override - public void validate() { - super.validate(); - Preconditions.checkNotNull(sourceDelegate); - sourceDelegate.validate(); - Preconditions.checkNotNull(channelFactory); - } - - /** - * Creates a {@code CompressedSource} for a subrange of a file. Called by superclass to create a - * source for a single file. - */ - @Override - protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) { - return new CompressedSource<>(sourceDelegate.createForSubrangeOfFile(fileName, start, end), - channelFactory, fileName, Long.MAX_VALUE, start, end); - } - - /** - * Determines whether a single file represented by this source is splittable. Returns true - * if we are using the default decompression factory and and it determines - * from the requested file name that the file is not compressed. - */ - @Override - protected final boolean isSplittable() throws Exception { - if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) { - FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory = - (FileNameBasedDecompressingChannelFactory) channelFactory; - return !fileNameBasedChannelFactory.isCompressed(getFileOrPatternSpec()); - } - return true; - } - - /** - * Creates a {@code FileBasedReader} to read a single file. - * - * <p>Uses the delegate source to create a single file reader for the delegate source. - * Utilizes the default decompression channel factory to not wrap the source reader - * if the file name does not represent a compressed file allowing for splitting of - * the source. - */ - @Override - protected final FileBasedReader<T> createSingleFileReader(PipelineOptions options) { - if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) { - FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory = - (FileNameBasedDecompressingChannelFactory) channelFactory; - if (!fileNameBasedChannelFactory.isCompressed(getFileOrPatternSpec())) { - return sourceDelegate.createSingleFileReader(options); - } - } - return new CompressedReader<T>( - this, sourceDelegate.createSingleFileReader(options)); - } - - /** - * Returns whether the delegate source produces sorted keys. - */ - @Override - public final boolean producesSortedKeys(PipelineOptions options) throws Exception { - return sourceDelegate.producesSortedKeys(options); - } - - /** - * Returns the delegate source's default output coder. - */ - @Override - public final Coder<T> getDefaultOutputCoder() { - return sourceDelegate.getDefaultOutputCoder(); - } - - public final DecompressingChannelFactory getChannelFactory() { - return channelFactory; - } - - /** - * Reader for a {@link CompressedSource}. Decompresses its input and uses a delegate - * reader to read elements from the decompressed input. - * @param <T> The type of records read from the source. - */ - public static class CompressedReader<T> extends FileBasedReader<T> { - - private final FileBasedReader<T> readerDelegate; - private final CompressedSource<T> source; - private int numRecordsRead; - - /** - * Create a {@code CompressedReader} from a {@code CompressedSource} and delegate reader. - */ - public CompressedReader(CompressedSource<T> source, FileBasedReader<T> readerDelegate) { - super(source); - this.source = source; - this.readerDelegate = readerDelegate; - } - - /** - * Gets the current record from the delegate reader. - */ - @Override - public T getCurrent() throws NoSuchElementException { - return readerDelegate.getCurrent(); - } - - /** - * Returns true only for the first record; compressed sources cannot be split. - */ - @Override - protected final boolean isAtSplitPoint() { - // We have to return true for the first record, but not for the state before reading it, - // and not for the state after reading any other record. Hence == rather than >= or <=. - // This is required because FileBasedReader is intended for readers that can read a range - // of offsets in a file and where the range can be split in parts. CompressedReader, - // however, is a degenerate case because it cannot be split, but it has to satisfy the - // semantics of offsets and split points anyway. - return numRecordsRead == 1; - } - - /** - * Creates a decompressing channel from the input channel and passes it to its delegate reader's - * {@link FileBasedReader#startReading(ReadableByteChannel)}. - */ - @Override - protected final void startReading(ReadableByteChannel channel) throws IOException { - if (source.getChannelFactory() instanceof FileNameBasedDecompressingChannelFactory) { - FileNameBasedDecompressingChannelFactory channelFactory = - (FileNameBasedDecompressingChannelFactory) source.getChannelFactory(); - readerDelegate.startReading(channelFactory.createDecompressingChannel( - getCurrentSource().getFileOrPatternSpec(), - channel)); - } else { - readerDelegate.startReading(source.getChannelFactory().createDecompressingChannel( - channel)); - } - } - - /** - * Reads the next record via the delegate reader. - */ - @Override - protected final boolean readNextRecord() throws IOException { - if (!readerDelegate.readNextRecord()) { - return false; - } - ++numRecordsRead; - return true; - } - - /** - * Returns the delegate reader's current offset in the decompressed input. - */ - @Override - protected final long getCurrentOffset() { - return readerDelegate.getCurrentOffset(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java deleted file mode 100644 index 07609ba..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Copyright (C) 2016 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.io; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.cloud.dataflow.sdk.io.CountingSource.NowTimestampFn; -import com.google.cloud.dataflow.sdk.io.Read.Unbounded; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; -import com.google.cloud.dataflow.sdk.values.PBegin; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded; -import com.google.common.base.Optional; - -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * A {@link PTransform} that produces longs. When used to produce a - * {@link IsBounded#BOUNDED bounded} {@link PCollection}, {@link CountingInput} starts at {@code 0} - * and counts up to a specified maximum. When used to produce an - * {@link IsBounded#UNBOUNDED unbounded} {@link PCollection}, it counts up to {@link Long#MAX_VALUE} - * and then never produces more output. (In practice, this limit should never be reached.) - * - * <p>The bounded {@link CountingInput} is implemented based on {@link OffsetBasedSource} and - * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it - * supports dynamic work rebalancing. - * - * <p>To produce a bounded {@code PCollection<Long>}, use {@link CountingInput#upTo(long)}: - * - * <pre>{@code - * Pipeline p = ... - * PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000); - * PCollection<Long> bounded = p.apply(producer); - * }</pre> - * - * <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingInput#unbounded()}, - * calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values - * with timestamps other than {@link Instant#now}. - * - * <pre>{@code - * Pipeline p = ... - * - * // To create an unbounded producer that uses processing time as the element timestamp. - * PCollection<Long> unbounded = p.apply(CountingInput.unbounded()); - * // Or, to create an unbounded source that uses a provided function to set the element timestamp. - * PCollection<Long> unboundedWithTimestamps = - * p.apply(CountingInput.unbounded().withTimestampFn(someFn)); - * }</pre> - */ -public class CountingInput { - /** - * Creates a {@link BoundedCountingInput} that will produce the specified number of elements, - * from {@code 0} to {@code numElements - 1}. - */ - public static BoundedCountingInput upTo(long numElements) { - checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements); - return new BoundedCountingInput(numElements); - } - - /** - * Creates an {@link UnboundedCountingInput} that will produce numbers starting from {@code 0} up - * to {@link Long#MAX_VALUE}. - * - * <p>After {@link Long#MAX_VALUE}, the transform never produces more output. (In practice, this - * limit should never be reached.) - * - * <p>Elements in the resulting {@link PCollection PCollection<Long>} will by default have - * timestamps corresponding to processing time at element generation, provided by - * {@link Instant#now}. Use the transform returned by - * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the output - * timestamps. - */ - public static UnboundedCountingInput unbounded() { - return new UnboundedCountingInput( - new NowTimestampFn(), Optional.<Long>absent(), Optional.<Duration>absent()); - } - - /** - * A {@link PTransform} that will produce a specified number of {@link Long Longs} starting from - * 0. - */ - public static class BoundedCountingInput extends PTransform<PBegin, PCollection<Long>> { - private final long numElements; - - private BoundedCountingInput(long numElements) { - this.numElements = numElements; - } - - @SuppressWarnings("deprecation") - @Override - public PCollection<Long> apply(PBegin begin) { - return begin.apply(Read.from(CountingSource.upTo(numElements))); - } - } - - /** - * A {@link PTransform} that will produce numbers starting from {@code 0} up to - * {@link Long#MAX_VALUE}. - * - * <p>After {@link Long#MAX_VALUE}, the transform never produces more output. (In practice, this - * limit should never be reached.) - * - * <p>Elements in the resulting {@link PCollection PCollection<Long>} will by default have - * timestamps corresponding to processing time at element generation, provided by - * {@link Instant#now}. Use the transform returned by - * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the output - * timestamps. - */ - public static class UnboundedCountingInput extends PTransform<PBegin, PCollection<Long>> { - private final SerializableFunction<Long, Instant> timestampFn; - private final Optional<Long> maxNumRecords; - private final Optional<Duration> maxReadTime; - - private UnboundedCountingInput( - SerializableFunction<Long, Instant> timestampFn, - Optional<Long> maxNumRecords, - Optional<Duration> maxReadTime) { - this.timestampFn = timestampFn; - this.maxNumRecords = maxNumRecords; - this.maxReadTime = maxReadTime; - } - - /** - * Returns an {@link UnboundedCountingInput} like this one, but where output elements have the - * timestamp specified by the timestampFn. - * - * <p>Note that the timestamps produced by {@code timestampFn} may not decrease. - */ - public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, Instant> timestampFn) { - return new UnboundedCountingInput(timestampFn, maxNumRecords, maxReadTime); - } - - /** - * Returns an {@link UnboundedCountingInput} like this one, but that will read at most the - * specified number of elements. - * - * <p>A bounded amount of elements will be produced by the result transform, and the result - * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}. - */ - public UnboundedCountingInput withMaxNumRecords(long maxRecords) { - checkArgument( - maxRecords > 0, "MaxRecords must be a positive (nonzero) value. Got %s", maxRecords); - return new UnboundedCountingInput(timestampFn, Optional.of(maxRecords), maxReadTime); - } - - /** - * Returns an {@link UnboundedCountingInput} like this one, but that will read for at most the - * specified amount of time. - * - * <p>A bounded amount of elements will be produced by the result transform, and the result - * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}. - */ - public UnboundedCountingInput withMaxReadTime(Duration readTime) { - checkNotNull(readTime, "ReadTime cannot be null"); - return new UnboundedCountingInput(timestampFn, maxNumRecords, Optional.of(readTime)); - } - - @SuppressWarnings("deprecation") - @Override - public PCollection<Long> apply(PBegin begin) { - Unbounded<Long> read = Read.from(CountingSource.unboundedWithTimestampFn(timestampFn)); - if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) { - return begin.apply(read); - } else if (maxNumRecords.isPresent() && !maxReadTime.isPresent()) { - return begin.apply(read.withMaxNumRecords(maxNumRecords.get())); - } else if (!maxNumRecords.isPresent() && maxReadTime.isPresent()) { - return begin.apply(read.withMaxReadTime(maxReadTime.get())); - } else { - return begin.apply( - read.withMaxReadTime(maxReadTime.get()).withMaxNumRecords(maxNumRecords.get())); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java deleted file mode 100644 index 412f3a7..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java +++ /dev/null @@ -1,397 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.io; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.cloud.dataflow.sdk.coders.AvroCoder; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.DefaultCoder; -import com.google.cloud.dataflow.sdk.coders.VarLongCoder; -import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput; -import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.collect.ImmutableList; - -import org.joda.time.Instant; - -import java.io.IOException; -import java.util.List; -import java.util.NoSuchElementException; - -/** - * A source that produces longs. When used as a {@link BoundedSource}, {@link CountingSource} - * starts at {@code 0} and counts up to a specified maximum. When used as an - * {@link UnboundedSource}, it counts up to {@link Long#MAX_VALUE} and then never produces more - * output. (In practice, this limit should never be reached.) - * - * <p>The bounded {@link CountingSource} is implemented based on {@link OffsetBasedSource} and - * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it - * supports dynamic work rebalancing. - * - * <p>To produce a bounded {@code PCollection<Long>}, use {@link CountingSource#upTo(long)}: - * - * <pre>{@code - * Pipeline p = ... - * PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000); - * PCollection<Long> bounded = p.apply(producer); - * }</pre> - * - * <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingInput#unbounded()}, - * calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values - * with timestamps other than {@link Instant#now}. - * - * <pre>{@code - * Pipeline p = ... - * - * // To create an unbounded PCollection that uses processing time as the element timestamp. - * PCollection<Long> unbounded = p.apply(CountingInput.unbounded()); - * // Or, to create an unbounded source that uses a provided function to set the element timestamp. - * PCollection<Long> unboundedWithTimestamps = - * p.apply(CountingInput.unbounded().withTimestampFn(someFn)); - * - * }</pre> - */ -public class CountingSource { - /** - * Creates a {@link BoundedSource} that will produce the specified number of elements, - * from {@code 0} to {@code numElements - 1}. - * - * @deprecated use {@link CountingInput#upTo(long)} instead - */ - @Deprecated - public static BoundedSource<Long> upTo(long numElements) { - checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements); - return new BoundedCountingSource(0, numElements); - } - - /** - * Creates an {@link UnboundedSource} that will produce numbers starting from {@code 0} up to - * {@link Long#MAX_VALUE}. - * - * <p>After {@link Long#MAX_VALUE}, the source never produces more output. (In practice, this - * limit should never be reached.) - * - * <p>Elements in the resulting {@link PCollection PCollection<Long>} will have timestamps - * corresponding to processing time at element generation, provided by {@link Instant#now}. - * - * @deprecated use {@link CountingInput#unbounded()} instead - */ - @Deprecated - public static UnboundedSource<Long, CounterMark> unbounded() { - return unboundedWithTimestampFn(new NowTimestampFn()); - } - - /** - * Creates an {@link UnboundedSource} that will produce numbers starting from {@code 0} up to - * {@link Long#MAX_VALUE}, with element timestamps supplied by the specified function. - * - * <p>After {@link Long#MAX_VALUE}, the source never produces more output. (In practice, this - * limit should never be reached.) - * - * <p>Note that the timestamps produced by {@code timestampFn} may not decrease. - * - * @deprecated use {@link CountingInput#unbounded()} and call - * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} instead - */ - @Deprecated - public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn( - SerializableFunction<Long, Instant> timestampFn) { - return new UnboundedCountingSource(0, 1, timestampFn); - } - - ///////////////////////////////////////////////////////////////////////////////////////////// - - /** Prevent instantiation. */ - private CountingSource() {} - - /** - * A function that returns {@link Instant#now} as the timestamp for each generated element. - */ - static class NowTimestampFn implements SerializableFunction<Long, Instant> { - @Override - public Instant apply(Long input) { - return Instant.now(); - } - } - - /** - * An implementation of {@link CountingSource} that produces a bounded {@link PCollection}. - * It is implemented on top of {@link OffsetBasedSource} (with associated reader - * {@link BoundedCountingReader}) and performs efficient initial splitting and supports dynamic - * work rebalancing. - */ - private static class BoundedCountingSource extends OffsetBasedSource<Long> { - /** - * Creates a {@link BoundedCountingSource} that generates the numbers in the specified - * {@code [start, end)} range. - */ - public BoundedCountingSource(long start, long end) { - super(start, end, 1 /* can be split every 1 offset */); - } - - //////////////////////////////////////////////////////////////////////////////////////////// - - @Override - public long getBytesPerOffset() { - return 8; - } - - @Override - public long getMaxEndOffset(PipelineOptions options) throws Exception { - return getEndOffset(); - } - - @Override - public OffsetBasedSource<Long> createSourceForSubrange(long start, long end) { - return new BoundedCountingSource(start, end); - } - - @Override - public boolean producesSortedKeys(PipelineOptions options) throws Exception { - return true; - } - - @Override - public com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader<Long> createReader( - PipelineOptions options) throws IOException { - return new BoundedCountingReader(this); - } - - @Override - public Coder<Long> getDefaultOutputCoder() { - return VarLongCoder.of(); - } - } - - /** - * The reader associated with {@link BoundedCountingSource}. - * - * @see BoundedCountingSource - */ - private static class BoundedCountingReader extends OffsetBasedSource.OffsetBasedReader<Long> { - private long current; - - public BoundedCountingReader(OffsetBasedSource<Long> source) { - super(source); - } - - @Override - protected long getCurrentOffset() throws NoSuchElementException { - return current; - } - - @Override - public synchronized BoundedCountingSource getCurrentSource() { - return (BoundedCountingSource) super.getCurrentSource(); - } - - @Override - public Long getCurrent() throws NoSuchElementException { - return current; - } - - @Override - protected boolean startImpl() throws IOException { - current = getCurrentSource().getStartOffset(); - return true; - } - - @Override - protected boolean advanceImpl() throws IOException { - current++; - return true; - } - - @Override - public void close() throws IOException {} - } - - /** - * An implementation of {@link CountingSource} that produces an unbounded {@link PCollection}. - */ - private static class UnboundedCountingSource extends UnboundedSource<Long, CounterMark> { - /** The first number (>= 0) generated by this {@link UnboundedCountingSource}. */ - private final long start; - /** The interval between numbers generated by this {@link UnboundedCountingSource}. */ - private final long stride; - /** The function used to produce timestamps for the generated elements. */ - private final SerializableFunction<Long, Instant> timestampFn; - - /** - * Creates an {@link UnboundedSource} that will produce numbers starting from {@code 0} up to - * {@link Long#MAX_VALUE}, with element timestamps supplied by the specified function. - * - * <p>After {@link Long#MAX_VALUE}, the source never produces more output. (In practice, this - * limit should never be reached.) - * - * <p>Note that the timestamps produced by {@code timestampFn} may not decrease. - */ - public UnboundedCountingSource( - long start, long stride, SerializableFunction<Long, Instant> timestampFn) { - this.start = start; - this.stride = stride; - this.timestampFn = timestampFn; - } - - /** - * Splits an unbounded source {@code desiredNumSplits} ways by giving each split every - * {@code desiredNumSplits}th element that this {@link UnboundedCountingSource} - * produces. - * - * <p>E.g., if a source produces all even numbers {@code [0, 2, 4, 6, 8, ...)} and we want to - * split into 3 new sources, then the new sources will produce numbers that are 6 apart and - * are offset at the start by the original stride: {@code [0, 6, 12, ...)}, - * {@code [2, 8, 14, ...)}, and {@code [4, 10, 16, ...)}. - */ - @Override - public List<? extends UnboundedSource<Long, CountingSource.CounterMark>> generateInitialSplits( - int desiredNumSplits, PipelineOptions options) throws Exception { - // Using Javadoc example, stride 2 with 3 splits becomes stride 6. - long newStride = stride * desiredNumSplits; - - ImmutableList.Builder<UnboundedCountingSource> splits = ImmutableList.builder(); - for (int i = 0; i < desiredNumSplits; ++i) { - // Starts offset by the original stride. Using Javadoc example, this generates starts of - // 0, 2, and 4. - splits.add(new UnboundedCountingSource(start + i * stride, newStride, timestampFn)); - } - return splits.build(); - } - - @Override - public UnboundedReader<Long> createReader( - PipelineOptions options, CounterMark checkpointMark) { - return new UnboundedCountingReader(this, checkpointMark); - } - - @Override - public Coder<CountingSource.CounterMark> getCheckpointMarkCoder() { - return AvroCoder.of(CountingSource.CounterMark.class); - } - - @Override - public void validate() {} - - @Override - public Coder<Long> getDefaultOutputCoder() { - return VarLongCoder.of(); - } - } - - /** - * The reader associated with {@link UnboundedCountingSource}. - * - * @see UnboundedCountingSource - */ - private static class UnboundedCountingReader extends UnboundedReader<Long> { - private UnboundedCountingSource source; - private long current; - private Instant currentTimestamp; - - public UnboundedCountingReader(UnboundedCountingSource source, CounterMark mark) { - this.source = source; - if (mark == null) { - // Because we have not emitted an element yet, and start() calls advance, we need to - // "un-advance" so that start() produces the correct output. - this.current = source.start - source.stride; - } else { - this.current = mark.getLastEmitted(); - } - } - - @Override - public boolean start() throws IOException { - return advance(); - } - - @Override - public boolean advance() throws IOException { - // Overflow-safe check that (current + source.stride) <= LONG.MAX_VALUE. Else, stop producing. - if (Long.MAX_VALUE - source.stride < current) { - return false; - } - current += source.stride; - currentTimestamp = source.timestampFn.apply(current); - return true; - } - - @Override - public Instant getWatermark() { - return source.timestampFn.apply(current); - } - - @Override - public CounterMark getCheckpointMark() { - return new CounterMark(current); - } - - @Override - public UnboundedSource<Long, CounterMark> getCurrentSource() { - return source; - } - - @Override - public Long getCurrent() throws NoSuchElementException { - return current; - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return currentTimestamp; - } - - @Override - public void close() throws IOException {} - } - - /** - * The checkpoint for an unbounded {@link CountingSource} is simply the last value produced. The - * associated source object encapsulates the information needed to produce the next value. - */ - @DefaultCoder(AvroCoder.class) - public static class CounterMark implements UnboundedSource.CheckpointMark { - /** The last value emitted. */ - private final long lastEmitted; - - /** - * Creates a checkpoint mark reflecting the last emitted value. - */ - public CounterMark(long lastEmitted) { - this.lastEmitted = lastEmitted; - } - - /** - * Returns the last value emitted by the reader. - */ - public long getLastEmitted() { - return lastEmitted; - } - - ///////////////////////////////////////////////////////////////////////////////////// - - @SuppressWarnings("unused") // For AvroCoder - private CounterMark() { - this.lastEmitted = 0L; - } - - @Override - public void finalizeCheckpoint() throws IOException {} - } -}
