Repository: beam Updated Branches: refs/heads/master c489686e4 -> 99056df36
Improves logging of FileBasedSource size estimates Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/91fb481b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/91fb481b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/91fb481b Branch: refs/heads/master Commit: 91fb481b3c0bf217320ad772b2c5a55eb90e1ac5 Parents: c489686 Author: Eugene Kirpichov <[email protected]> Authored: Tue Mar 28 15:20:05 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Mar 28 16:37:14 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/FileBasedSource.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/91fb481b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index 5659d5b..35629d8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -222,6 +222,11 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { fileOrPatternSpec, System.currentTimeMillis() - startTime); } + LOG.info( + "Filepattern {} matched {} files with total size {}", + fileOrPatternSpec.get(), + inputs.size(), + totalSize); return totalSize; } else { long start = getStartOffset(); @@ -286,8 +291,18 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { Collections.shuffle(selectedFiles); selectedFiles = selectedFiles.subList(0, sampleSize); - return files.size() * getExactTotalSizeOfFiles(selectedFiles, ioChannelFactory) - / selectedFiles.size(); + long exactTotalSampleSize = getExactTotalSizeOfFiles(selectedFiles, ioChannelFactory); + double avgSize = 1.0 * exactTotalSampleSize / selectedFiles.size(); + long totalSize = Math.round(files.size() * avgSize); + LOG.info( + "Sampling {} files gave {} total bytes ({} average per file), " + + "inferring total size of {} files to be {}", + selectedFiles.size(), + exactTotalSampleSize, + avgSize, + files.size(), + totalSize); + return totalSize; } @Override
