Repository: beam
Updated Branches:
  refs/heads/master 2b0e699b8 -> e1791c3f8


[BEAM-2212] FileBasedSource: refactor to remove uses of fileOrPatternSpec.get()

Makes it less likely to have errors from printing ValueProviders instead of 
runtime values


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9423babd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9423babd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9423babd

Branch: refs/heads/master
Commit: 9423babd8f827e843723c218441e9a91aaa7b361
Parents: 5bac40e
Author: Dan Halperin <[email protected]>
Authored: Mon May 8 09:59:16 2017 -0700
Committer: Dan Halperin <[email protected]>
Committed: Mon May 8 11:48:39 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSource.java | 39 ++++++++++++--------
 1 file changed, 24 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9423babd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index 4e07342..d4413c9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -196,19 +196,20 @@ public abstract class FileBasedSource<T> extends 
OffsetBasedSource<T> {
     // This implementation of method getEstimatedSizeBytes is provided to 
simplify subclasses. Here
     // we perform the size estimation of files and file patterns using the 
interface provided by
     // FileSystem.
+    checkState(
+        fileOrPatternSpec.isAccessible(),
+        "Cannot estimate size of a FileBasedSource with inaccessible file 
pattern: {}.",
+        fileOrPatternSpec);
+    String fileOrPattern = fileOrPatternSpec.get();
 
     if (mode == Mode.FILEPATTERN) {
-      checkState(fileOrPatternSpec.isAccessible(),
-                 "Size estimation should be done at execution time.");
-      String pattern = fileOrPatternSpec.get();
       long totalSize = 0;
-      List<MatchResult> inputs =
-          FileSystems.match(Collections.singletonList(pattern));
+      List<MatchResult> inputs = 
FileSystems.match(Collections.singletonList(fileOrPattern));
       MatchResult result = Iterables.getOnlyElement(inputs);
       checkArgument(
           result.status() == Status.OK,
           "Error matching the pattern or glob %s: status %s",
-          pattern,
+          fileOrPattern,
           result.status());
       List<Metadata> allMatches = result.metadata();
       for (Metadata metadata : allMatches) {
@@ -216,7 +217,7 @@ public abstract class FileBasedSource<T> extends 
OffsetBasedSource<T> {
       }
       LOG.info(
           "Filepattern {} matched {} files with total size {}",
-          fileOrPatternSpec.get(),
+          fileOrPattern,
           allMatches.size(),
           totalSize);
       return totalSize;
@@ -245,14 +246,17 @@ public abstract class FileBasedSource<T> extends 
OffsetBasedSource<T> {
     // split a FileBasedSource based on a file pattern to FileBasedSources 
based on full single
     // files. For files that can be efficiently seeked, we further split 
FileBasedSources based on
     // those files to FileBasedSources based on sub ranges of single files.
+    checkState(
+        fileOrPatternSpec.isAccessible(),
+        "Cannot split a FileBasedSource without access to the file or pattern 
specification: {}.",
+        fileOrPatternSpec);
+    String fileOrPattern = fileOrPatternSpec.get();
 
     if (mode == Mode.FILEPATTERN) {
       long startTime = System.currentTimeMillis();
-      checkState(fileOrPatternSpec.isAccessible(),
-                 "Bundle splitting should only happen at execution time.");
-      List<Metadata> expandedFiles = 
FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
+      List<Metadata> expandedFiles = 
FileBasedSource.expandFilePattern(fileOrPattern);
       checkArgument(!expandedFiles.isEmpty(),
-          "Unable to find any files matching %s", fileOrPatternSpec.get());
+          "Unable to find any files matching %s", fileOrPattern);
       List<FileBasedSource<T>> splitResults = new 
ArrayList<>(expandedFiles.size());
       for (Metadata metadata : expandedFiles) {
         FileBasedSource<T> split = createForSubrangeOfFile(metadata, 0, 
metadata.sizeBytes());
@@ -268,7 +272,7 @@ public abstract class FileBasedSource<T> extends 
OffsetBasedSource<T> {
       LOG.info(
           "Splitting filepattern {} into bundles of size {} took {} ms "
               + "and produced {} files and {} bundles",
-          fileOrPatternSpec.get(),
+          fileOrPattern,
           desiredBundleSizeBytes,
           System.currentTimeMillis() - startTime,
           expandedFiles.size(),
@@ -283,7 +287,7 @@ public abstract class FileBasedSource<T> extends 
OffsetBasedSource<T> {
       } else {
         LOG.debug("The source for file {} is not split into sub-range based 
sources since "
             + "the file is not seekable",
-            fileOrPatternSpec);
+            fileOrPattern);
         return ImmutableList.of(this);
       }
     }
@@ -315,10 +319,15 @@ public abstract class FileBasedSource<T> extends 
OffsetBasedSource<T> {
   public final BoundedReader<T> createReader(PipelineOptions options) throws 
IOException {
     // Validate the current source prior to creating a reader for it.
     this.validate();
+    checkState(
+        fileOrPatternSpec.isAccessible(),
+        "Cannot create a file reader without access to the file or pattern 
specification: {}.",
+        fileOrPatternSpec);
+    String fileOrPattern = fileOrPatternSpec.get();
 
     if (mode == Mode.FILEPATTERN) {
       long startTime = System.currentTimeMillis();
-      List<Metadata> fileMetadata = 
FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
+      List<Metadata> fileMetadata = 
FileBasedSource.expandFilePattern(fileOrPattern);
       List<FileBasedReader<T>> fileReaders = new ArrayList<>();
       for (Metadata metadata : fileMetadata) {
         long endOffset = metadata.sizeBytes();
@@ -327,7 +336,7 @@ public abstract class FileBasedSource<T> extends 
OffsetBasedSource<T> {
       }
       LOG.debug(
           "Creating a reader for file pattern {} took {} ms",
-          fileOrPatternSpec.get(),
+          fileOrPattern,
           System.currentTimeMillis() - startTime);
       if (fileReaders.size() == 1) {
         return fileReaders.get(0);

Reply via email to