Repository: beam Updated Branches: refs/heads/master 2b0e699b8 -> e1791c3f8
[BEAM-2212] FileBasedSource: refactor to remove uses of fileOrPatternSpec.get() Makes it less likely to have errors from printing ValueProviders instead of runtime values Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9423babd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9423babd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9423babd Branch: refs/heads/master Commit: 9423babd8f827e843723c218441e9a91aaa7b361 Parents: 5bac40e Author: Dan Halperin <[email protected]> Authored: Mon May 8 09:59:16 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon May 8 11:48:39 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/FileBasedSource.java | 39 ++++++++++++-------- 1 file changed, 24 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9423babd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index 4e07342..d4413c9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -196,19 +196,20 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { // This implementation of method getEstimatedSizeBytes is provided to simplify subclasses. Here // we perform the size estimation of files and file patterns using the interface provided by // FileSystem. + checkState( + fileOrPatternSpec.isAccessible(), + "Cannot estimate size of a FileBasedSource with inaccessible file pattern: {}.", + fileOrPatternSpec); + String fileOrPattern = fileOrPatternSpec.get(); if (mode == Mode.FILEPATTERN) { - checkState(fileOrPatternSpec.isAccessible(), - "Size estimation should be done at execution time."); - String pattern = fileOrPatternSpec.get(); long totalSize = 0; - List<MatchResult> inputs = - FileSystems.match(Collections.singletonList(pattern)); + List<MatchResult> inputs = FileSystems.match(Collections.singletonList(fileOrPattern)); MatchResult result = Iterables.getOnlyElement(inputs); checkArgument( result.status() == Status.OK, "Error matching the pattern or glob %s: status %s", - pattern, + fileOrPattern, result.status()); List<Metadata> allMatches = result.metadata(); for (Metadata metadata : allMatches) { @@ -216,7 +217,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { } LOG.info( "Filepattern {} matched {} files with total size {}", - fileOrPatternSpec.get(), + fileOrPattern, allMatches.size(), totalSize); return totalSize; @@ -245,14 +246,17 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { // split a FileBasedSource based on a file pattern to FileBasedSources based on full single // files. For files that can be efficiently seeked, we further split FileBasedSources based on // those files to FileBasedSources based on sub ranges of single files. + checkState( + fileOrPatternSpec.isAccessible(), + "Cannot split a FileBasedSource without access to the file or pattern specification: {}.", + fileOrPatternSpec); + String fileOrPattern = fileOrPatternSpec.get(); if (mode == Mode.FILEPATTERN) { long startTime = System.currentTimeMillis(); - checkState(fileOrPatternSpec.isAccessible(), - "Bundle splitting should only happen at execution time."); - List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPatternSpec.get()); + List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPattern); checkArgument(!expandedFiles.isEmpty(), - "Unable to find any files matching %s", fileOrPatternSpec.get()); + "Unable to find any files matching %s", fileOrPattern); List<FileBasedSource<T>> splitResults = new ArrayList<>(expandedFiles.size()); for (Metadata metadata : expandedFiles) { FileBasedSource<T> split = createForSubrangeOfFile(metadata, 0, metadata.sizeBytes()); @@ -268,7 +272,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { LOG.info( "Splitting filepattern {} into bundles of size {} took {} ms " + "and produced {} files and {} bundles", - fileOrPatternSpec.get(), + fileOrPattern, desiredBundleSizeBytes, System.currentTimeMillis() - startTime, expandedFiles.size(), @@ -283,7 +287,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { } else { LOG.debug("The source for file {} is not split into sub-range based sources since " + "the file is not seekable", - fileOrPatternSpec); + fileOrPattern); return ImmutableList.of(this); } } @@ -315,10 +319,15 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { public final BoundedReader<T> createReader(PipelineOptions options) throws IOException { // Validate the current source prior to creating a reader for it. this.validate(); + checkState( + fileOrPatternSpec.isAccessible(), + "Cannot create a file reader without access to the file or pattern specification: {}.", + fileOrPatternSpec); + String fileOrPattern = fileOrPatternSpec.get(); if (mode == Mode.FILEPATTERN) { long startTime = System.currentTimeMillis(); - List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPatternSpec.get()); + List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPattern); List<FileBasedReader<T>> fileReaders = new ArrayList<>(); for (Metadata metadata : fileMetadata) { long endOffset = metadata.sizeBytes(); @@ -327,7 +336,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { } LOG.debug( "Creating a reader for file pattern {} took {} ms", - fileOrPatternSpec.get(), + fileOrPattern, System.currentTimeMillis() - startTime); if (fileReaders.size() == 1) { return fileReaders.get(0);
