Repository: beam Updated Branches: refs/heads/master bcc2806c0 -> 4e425ca1b
Fix HDFSFileSourceâs split size estimate Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/086e1674 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/086e1674 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/086e1674 Branch: refs/heads/master Commit: 086e1674641be37aafd1235f2ac2db2da012376b Parents: bcc2806 Author: Igor Bernstein <[email protected]> Authored: Sun Jan 29 00:00:02 2017 -0500 Committer: Dan Halperin <[email protected]> Committed: Mon Jan 30 21:02:55 2017 -0800 ---------------------------------------------------------------------- .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 10 +++++++++ .../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 23 ++++++++++++++++++++ 2 files changed, 33 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/086e1674/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java index 61660de..1affb4a 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java @@ -237,7 +237,14 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> { @Override public long getEstimatedSizeBytes(PipelineOptions options) { long size = 0; + try { + // If this source represents a split from splitIntoBundles, then return the size of the split, + // rather then the entire input + if (serializableSplit != null) { + return serializableSplit.getSplit().getLength(); + } + Job job = Job.getInstance(); // new instance for (FileStatus st : listStatus(createFormat(job), job)) { size += st.getLen(); @@ -245,6 +252,9 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> { } catch (IOException | NoSuchMethodException | InvocationTargetException | IllegalAccessException | InstantiationException e) { // ignore, and return 0 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // ignore, and return 0 } return size; } http://git-wip-us.apache.org/repos/asf/beam/blob/086e1674/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java index 6145952..4c3f1ce 100644 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java @@ -151,6 +151,29 @@ public class HDFSFileSourceTest { assertTrue(nonEmptySplits > 2); } + @Test + public void testSplitEstimatedSize() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + + List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0); + File file = createFileWithData("tmp.avro", expectedResults); + + HDFSFileSource<IntWritable, Text> source = + HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class, + IntWritable.class, Text.class); + + long originalSize = source.getEstimatedSizeBytes(options); + long splitTotalSize = 0; + List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.splitIntoBundles( + SequenceFile.SYNC_INTERVAL, options + ); + for (BoundedSource<KV<IntWritable, Text>> splitSource : splits) { + splitTotalSize += splitSource.getEstimatedSizeBytes(options); + } + // Assert that the estimated size of the whole is the sum of its parts + assertEquals(originalSize, splitTotalSize); + } + private File createFileWithData(String filename, List<KV<IntWritable, Text>> records) throws IOException { File tmpFile = tmpFolder.newFile(filename);
