Repository: beam Updated Branches: refs/heads/master f458065da -> 49245080a
[BEAM-2391] Clone Scan in HBaseReader Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1da8da79 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1da8da79 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1da8da79 Branch: refs/heads/master Commit: 1da8da79dcc623c53b35d97419566b736447f6b6 Parents: f458065 Author: Dan Halperin <dhalp...@google.com> Authored: Thu May 18 16:19:29 2017 -0400 Committer: Ismaël MejÃa <ieme...@apache.org> Committed: Fri May 19 10:49:55 2017 +0200 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/hbase/HBaseIO.java | 5 +++-- .../org/apache/beam/sdk/io/hbase/HBaseIOTest.java | 17 +++++++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1da8da79/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index 3c42da9..849873c 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -487,8 +487,9 @@ public class HBaseIO { connection = ConnectionFactory.createConnection(configuration); TableName tableName = TableName.valueOf(tableId); Table table = connection.getTable(tableName); - Scan scan = source.read.serializableScan.get(); - scanner = table.getScanner(scan); + // [BEAM-2319] We have to clone the Scan because the underlying scanner may mutate it. + Scan scanClone = new Scan(source.read.serializableScan.get()); + scanner = table.getScanner(scanClone); iter = scanner.iterator(); return advance(); } http://git-wip-us.apache.org/repos/asf/beam/blob/1da8da79/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java index 1cdfc7f..4a06789 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.hbase.HBaseIO.HBaseSource; import org.apache.beam.sdk.io.range.ByteKey; import org.apache.beam.sdk.io.range.ByteKeyRange; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; @@ -205,6 +206,22 @@ public class HBaseIOTest { assertSourcesEqualReferenceSource(source, splits, null /* options */); } + /** Tests that a {@link HBaseSource} can be read twice, verifying its immutability. */ + @Test + public void testReadingSourceTwice() throws Exception { + final String table = "TEST-READING-TWICE"; + final int numRows = 10; + + // Set up test table data and sample row keys for size estimation and splitting. + createTable(table); + writeData(table, numRows); + + HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table); + HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */); + assertThat(SourceTestUtils.readFromSource(source, null), hasSize(numRows)); + // second read. + assertThat(SourceTestUtils.readFromSource(source, null), hasSize(numRows)); + } /** Tests reading all rows using a filter. */ @Test