Repository: beam Updated Branches: refs/heads/release-2.0.0 265405bc8 -> 94d104064
[BEAM-2212] FileBasedSource: refactor to remove uses of fileOrPatternSpec.get() Makes it less likely to have errors from printing ValueProviders instead of runtime values Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bff819a9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bff819a9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bff819a9 Branch: refs/heads/release-2.0.0 Commit: bff819a9858c79c6c3232b4c03f262421d325c00 Parents: e0faeee Author: Dan Halperin <[email protected]> Authored: Mon May 8 09:59:16 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon May 8 14:15:43 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/FileBasedSource.java | 39 ++++++++++++-------- 1 file changed, 24 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/bff819a9/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index 4e07342..d4413c9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -196,19 +196,20 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { // This implementation of method getEstimatedSizeBytes is provided to simplify subclasses. Here // we perform the size estimation of files and file patterns using the interface provided by // FileSystem. + checkState( + fileOrPatternSpec.isAccessible(), + "Cannot estimate size of a FileBasedSource with inaccessible file pattern: {}.", + fileOrPatternSpec); + String fileOrPattern = fileOrPatternSpec.get(); if (mode == Mode.FILEPATTERN) { - checkState(fileOrPatternSpec.isAccessible(), - "Size estimation should be done at execution time."); - String pattern = fileOrPatternSpec.get(); long totalSize = 0; - List<MatchResult> inputs = - FileSystems.match(Collections.singletonList(pattern)); + List<MatchResult> inputs = FileSystems.match(Collections.singletonList(fileOrPattern)); MatchResult result = Iterables.getOnlyElement(inputs); checkArgument( result.status() == Status.OK, "Error matching the pattern or glob %s: status %s", - pattern, + fileOrPattern, result.status()); List<Metadata> allMatches = result.metadata(); for (Metadata metadata : allMatches) { @@ -216,7 +217,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { } LOG.info( "Filepattern {} matched {} files with total size {}", - fileOrPatternSpec.get(), + fileOrPattern, allMatches.size(), totalSize); return totalSize; @@ -245,14 +246,17 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { // split a FileBasedSource based on a file pattern to FileBasedSources based on full single // files. For files that can be efficiently seeked, we further split FileBasedSources based on // those files to FileBasedSources based on sub ranges of single files. + checkState( + fileOrPatternSpec.isAccessible(), + "Cannot split a FileBasedSource without access to the file or pattern specification: {}.", + fileOrPatternSpec); + String fileOrPattern = fileOrPatternSpec.get(); if (mode == Mode.FILEPATTERN) { long startTime = System.currentTimeMillis(); - checkState(fileOrPatternSpec.isAccessible(), - "Bundle splitting should only happen at execution time."); - List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPatternSpec.get()); + List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPattern); checkArgument(!expandedFiles.isEmpty(), - "Unable to find any files matching %s", fileOrPatternSpec.get()); + "Unable to find any files matching %s", fileOrPattern); List<FileBasedSource<T>> splitResults = new ArrayList<>(expandedFiles.size()); for (Metadata metadata : expandedFiles) { FileBasedSource<T> split = createForSubrangeOfFile(metadata, 0, metadata.sizeBytes()); @@ -268,7 +272,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { LOG.info( "Splitting filepattern {} into bundles of size {} took {} ms " + "and produced {} files and {} bundles", - fileOrPatternSpec.get(), + fileOrPattern, desiredBundleSizeBytes, System.currentTimeMillis() - startTime, expandedFiles.size(), @@ -283,7 +287,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { } else { LOG.debug("The source for file {} is not split into sub-range based sources since " + "the file is not seekable", - fileOrPatternSpec); + fileOrPattern); return ImmutableList.of(this); } } @@ -315,10 +319,15 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { public final BoundedReader<T> createReader(PipelineOptions options) throws IOException { // Validate the current source prior to creating a reader for it. this.validate(); + checkState( + fileOrPatternSpec.isAccessible(), + "Cannot create a file reader without access to the file or pattern specification: {}.", + fileOrPatternSpec); + String fileOrPattern = fileOrPatternSpec.get(); if (mode == Mode.FILEPATTERN) { long startTime = System.currentTimeMillis(); - List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPatternSpec.get()); + List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPattern); List<FileBasedReader<T>> fileReaders = new ArrayList<>(); for (Metadata metadata : fileMetadata) { long endOffset = metadata.sizeBytes(); @@ -327,7 +336,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { } LOG.debug( "Creating a reader for file pattern {} took {} ms", - fileOrPatternSpec.get(), + fileOrPattern, System.currentTimeMillis() - startTime); if (fileReaders.size() == 1) { return fileReaders.get(0);
