Repository: incubator-beam Updated Branches: refs/heads/master f8e26be7d -> 75bfd781f
TextIO.Read: support ValueProvider Add test Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4ebdf0bf Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4ebdf0bf Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4ebdf0bf Branch: refs/heads/master Commit: 4ebdf0bfcd17ab5df9b5d7132507c49979392721 Parents: f8e26be Author: sammcveety <sam.mcve...@gmail.com> Authored: Sun Oct 16 18:37:51 2016 -0400 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue Nov 1 09:43:53 2016 -0700 ---------------------------------------------------------------------- .../dataflow/internal/ReadTranslator.java | 12 ++-- .../DataflowPipelineTranslatorTest.java | 28 ++++++++++ .../apache/beam/sdk/io/CompressedSource.java | 2 +- .../org/apache/beam/sdk/io/FileBasedSource.java | 59 ++++++++++++++------ .../java/org/apache/beam/sdk/io/TextIO.java | 43 +++++++++++--- 5 files changed, 116 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4ebdf0bf/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java index 83836c0..b3af165 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationCo import org.apache.beam.sdk.io.FileBasedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.values.PValue; @@ -50,10 +51,13 @@ public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> { // TODO: Move this validation out of translation once IOChannelUtils is portable // and can be reconstructed on the worker. if (source instanceof FileBasedSource) { - String filePatternOrSpec = ((FileBasedSource<?>) source).getFileOrPatternSpec(); - context.getPipelineOptions() - .getPathValidator() - .validateInputFilePatternSupported(filePatternOrSpec); + ValueProvider<String> filePatternOrSpec = + ((FileBasedSource<?>) source).getFileOrPatternSpecProvider(); + if (filePatternOrSpec.isAccessible()) { + context.getPipelineOptions() + .getPathValidator() + .validateInputFilePatternSupported(filePatternOrSpec.get()); + } } context.addStep(transform, "ParallelRead"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4ebdf0bf/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 762844b..c925454 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -67,6 +67,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.OldDoFn; @@ -732,6 +733,33 @@ public class DataflowPipelineTranslatorTest implements Serializable { Collections.<DataflowPackage>emptyList()); } + private static class TestValueProvider implements ValueProvider<String>, Serializable { + @Override + public boolean isAccessible() { + return false; + } + + @Override + public String get() { + throw new RuntimeException("Should not be called."); + } + } + + @Test + public void testInaccessibleProvider() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + Pipeline pipeline = Pipeline.create(options); + DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options); + + pipeline.apply(TextIO.Read.from(new TestValueProvider()).withoutValidation()); + + // Check that translation does not fail. + t.translate( + pipeline, + (DataflowRunner) pipeline.getRunner(), + Collections.<DataflowPackage>emptyList()); + } + @Test public void testToSingletonTranslation() throws Exception { // A "change detector" test that makes sure the translation http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4ebdf0bf/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index e0f1b59..f33b9bd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -301,7 +301,7 @@ public class CompressedSource<T> extends FileBasedSource<T> { */ private CompressedSource( FileBasedSource<T> sourceDelegate, DecompressingChannelFactory channelFactory) { - super(sourceDelegate.getFileOrPatternSpec(), Long.MAX_VALUE); + super(sourceDelegate.getFileOrPatternSpecProvider(), Long.MAX_VALUE); this.sourceDelegate = sourceDelegate; this.channelFactory = channelFactory; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4ebdf0bf/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index b073236..e0fc6b6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -39,6 +39,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; @@ -77,7 +79,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { // Package-private for testing. static final int THREAD_POOL_SIZE = 128; - private final String fileOrPatternSpec; + private final ValueProvider<String> fileOrPatternSpec; private final Mode mode; /** @@ -99,6 +101,15 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { * @param minBundleSize minimum bundle size in bytes. */ public FileBasedSource(String fileOrPatternSpec, long minBundleSize) { + this(StaticValueProvider.of(fileOrPatternSpec), minBundleSize); + } + + /** + * Create a {@code FileBaseSource} based on a file or a file pattern specification. + * Same as the {@code String} constructor, but accepting a {@link ValueProvider} + * to allow for runtime configuration of the source. + */ + public FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) { super(0, Long.MAX_VALUE, minBundleSize); mode = Mode.FILEPATTERN; this.fileOrPatternSpec = fileOrPatternSpec; @@ -124,10 +135,14 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { long startOffset, long endOffset) { super(startOffset, endOffset, minBundleSize); mode = Mode.SINGLE_FILE_OR_SUBRANGE; - this.fileOrPatternSpec = fileName; + this.fileOrPatternSpec = StaticValueProvider.of(fileName); } public final String getFileOrPatternSpec() { + return fileOrPatternSpec.get(); + } + + public final ValueProvider<String> getFileOrPatternSpecProvider() { return fileOrPatternSpec; } @@ -149,7 +164,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { end, getEndOffset()); - FileBasedSource<T> source = createForSubrangeOfFile(fileOrPatternSpec, start, end); + checkState(fileOrPatternSpec.isAccessible(), + "Subrange creation should only happen at execution time."); + FileBasedSource<T> source = createForSubrangeOfFile(fileOrPatternSpec.get(), start, end); if (start > 0 || end != Long.MAX_VALUE) { checkArgument(source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE, "Source created for the range [%s,%s) must be a subrange source", start, end); @@ -186,12 +203,14 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { // we perform the size estimation of files and file patterns using the interface provided by // IOChannelFactory. - IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec); if (mode == Mode.FILEPATTERN) { + checkState(fileOrPatternSpec.isAccessible(), + "Size estimation should be done at execution time."); + IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get()); // TODO Implement a more efficient parallel/batch size estimation mechanism for file patterns. long startTime = System.currentTimeMillis(); long totalSize = 0; - Collection<String> inputs = factory.match(fileOrPatternSpec); + Collection<String> inputs = factory.match(fileOrPatternSpec.get()); if (inputs.size() <= MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT) { totalSize = getExactTotalSizeOfFiles(inputs, factory); LOG.debug("Size estimation of all files of pattern {} took {} ms", @@ -274,7 +293,10 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("filePattern", getFileOrPatternSpec()) + String patternDisplay = getFileOrPatternSpecProvider().isAccessible() + ? getFileOrPatternSpecProvider().get() + : getFileOrPatternSpecProvider().toString(); + builder.add(DisplayData.item("filePattern", patternDisplay) .withLabel("File Pattern")); } @@ -307,7 +329,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE)); try { - for (final String file : FileBasedSource.expandFilePattern(fileOrPatternSpec)) { + checkState(fileOrPatternSpec.isAccessible(), + "Bundle splitting should only happen at execution time."); + for (final String file : FileBasedSource.expandFilePattern(fileOrPatternSpec.get())) { futures.add(createFutureForFileSplit(file, desiredBundleSizeBytes, options, service)); } List<? extends FileBasedSource<T>> splitResults = @@ -346,8 +370,10 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { // We split a file-based source into subranges only if the file is efficiently seekable. // If a file is not efficiently seekable it would be highly inefficient to create and read a // source based on a subrange of that file. - IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec); - return factory.isReadSeekEfficient(fileOrPatternSpec); + checkState(fileOrPatternSpec.isAccessible(), + "isSplittable should only be called at runtime."); + IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get()); + return factory.isReadSeekEfficient(fileOrPatternSpec.get()); } @Override @@ -357,7 +383,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { if (mode == Mode.FILEPATTERN) { long startTime = System.currentTimeMillis(); - Collection<String> files = FileBasedSource.expandFilePattern(fileOrPatternSpec); + Collection<String> files = FileBasedSource.expandFilePattern(fileOrPatternSpec.get()); List<FileBasedReader<T>> fileReaders = new ArrayList<>(); for (String fileName : files) { long endOffset; @@ -387,9 +413,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { public String toString() { switch (mode) { case FILEPATTERN: - return fileOrPatternSpec; + return fileOrPatternSpec.toString(); case SINGLE_FILE_OR_SUBRANGE: - return fileOrPatternSpec + " range " + super.toString(); + return fileOrPatternSpec.toString() + " range " + super.toString(); default: throw new IllegalStateException("Unexpected mode: " + mode); } @@ -420,8 +446,8 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { checkArgument( mode != Mode.FILEPATTERN, "Cannot determine the exact end offset of a file pattern"); if (getEndOffset() == Long.MAX_VALUE) { - IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec); - return factory.getSizeBytes(fileOrPatternSpec); + IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get()); + return factory.getSizeBytes(fileOrPatternSpec.get()); } else { return getEndOffset(); } @@ -493,8 +519,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { @Override protected final boolean startImpl() throws IOException { FileBasedSource<T> source = getCurrentSource(); - IOChannelFactory factory = IOChannelUtils.getFactory(source.getFileOrPatternSpec()); - this.channel = factory.open(source.getFileOrPatternSpec()); + IOChannelFactory factory = IOChannelUtils.getFactory( + source.getFileOrPatternSpecProvider().get()); + this.channel = factory.open(source.getFileOrPatternSpecProvider().get()); if (channel instanceof SeekableByteChannel) { SeekableByteChannel seekChannel = (SeekableByteChannel) channel; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4ebdf0bf/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 2dbcda7..84c24ea 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; @@ -44,6 +45,8 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.IOChannelUtils; @@ -142,6 +145,13 @@ public class TextIO { } /** + * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. + */ + public static Bound<String> from(ValueProvider<String> filepattern) { + return new Bound<>(DEFAULT_TEXT_CODER).from(filepattern); + } + + /** * Returns a transform for reading text files that uses the given * {@code Coder<T>} to decode each of the lines of the file into a * value of type {@code T}. @@ -194,7 +204,7 @@ public class TextIO { */ public static class Bound<T> extends PTransform<PBegin, PCollection<T>> { /** The filepattern to read from. */ - @Nullable private final String filepattern; + @Nullable private final ValueProvider<String> filepattern; /** The Coder to use to decode each line. */ private final Coder<T> coder; @@ -209,8 +219,8 @@ public class TextIO { this(null, null, coder, true, TextIO.CompressionType.AUTO); } - private Bound(String name, String filepattern, Coder<T> coder, boolean validate, - TextIO.CompressionType compressionType) { + private Bound(@Nullable String name, @Nullable ValueProvider<String> filepattern, + Coder<T> coder, boolean validate, TextIO.CompressionType compressionType) { super(name); this.coder = coder; this.filepattern = filepattern; @@ -227,6 +237,16 @@ public class TextIO { */ public Bound<T> from(String filepattern) { + checkNotNull(filepattern, "Filepattern cannot be empty."); + return new Bound<>(name, StaticValueProvider.of(filepattern), coder, validate, + compressionType); + } + + /** + * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. + */ + public Bound<T> from(ValueProvider<String> filepattern) { + checkNotNull(filepattern, "Filepattern cannot be empty."); return new Bound<>(name, filepattern, coder, validate, compressionType); } @@ -278,14 +298,15 @@ public class TextIO { } if (validate) { + checkState(filepattern.isAccessible(), "Cannot validate with a RVP."); try { checkState( - !IOChannelUtils.getFactory(filepattern).match(filepattern).isEmpty(), + !IOChannelUtils.getFactory(filepattern.get()).match(filepattern.get()).isEmpty(), "Unable to find any files matching %s", filepattern); } catch (IOException e) { throw new IllegalStateException( - String.format("Failed to validate %s", filepattern), e); + String.format("Failed to validate %s", filepattern.get()), e); } } @@ -324,12 +345,14 @@ public class TextIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); + String filepatternDisplay = filepattern.isAccessible() + ? filepattern.get() : filepattern.toString(); builder .add(DisplayData.item("compressionType", compressionType.toString()) .withLabel("Compression Type")) .addIfNotDefault(DisplayData.item("validation", validate) .withLabel("Validation Enabled"), true) - .addIfNotNull(DisplayData.item("filePattern", filepattern) + .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay) .withLabel("File Pattern")); } @@ -339,7 +362,7 @@ public class TextIO { } public String getFilepattern() { - return filepattern; + return filepattern.get(); } public boolean needsValidation() { @@ -870,6 +893,12 @@ public class TextIO { this.coder = coder; } + @VisibleForTesting + TextSource(ValueProvider<String> fileSpec, Coder<T> coder) { + super(fileSpec, 1L); + this.coder = coder; + } + private TextSource(String fileName, long start, long end, Coder<T> coder) { super(fileName, 1L, start, end); this.coder = coder;