Repository: beam
Updated Branches:
  refs/heads/master f458065da -> 49245080a


[BEAM-2391] Clone Scan in HBaseReader


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1da8da79
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1da8da79
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1da8da79

Branch: refs/heads/master
Commit: 1da8da79dcc623c53b35d97419566b736447f6b6
Parents: f458065
Author: Dan Halperin <dhalp...@google.com>
Authored: Thu May 18 16:19:29 2017 -0400
Committer: Ismaël Mejía <ieme...@apache.org>
Committed: Fri May 19 10:49:55 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/hbase/HBaseIO.java |  5 +++--
 .../org/apache/beam/sdk/io/hbase/HBaseIOTest.java  | 17 +++++++++++++++++
 2 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1da8da79/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 3c42da9..849873c 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -487,8 +487,9 @@ public class HBaseIO {
             connection = ConnectionFactory.createConnection(configuration);
             TableName tableName = TableName.valueOf(tableId);
             Table table = connection.getTable(tableName);
-            Scan scan = source.read.serializableScan.get();
-            scanner = table.getScanner(scan);
+            // [BEAM-2319] We have to clone the Scan because the underlying 
scanner may mutate it.
+            Scan scanClone = new Scan(source.read.serializableScan.get());
+            scanner = table.getScanner(scanClone);
             iter = scanner.iterator();
             return advance();
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/1da8da79/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
 
b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index 1cdfc7f..4a06789 100644
--- 
a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ 
b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.hbase.HBaseIO.HBaseSource;
 import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
@@ -205,6 +206,22 @@ public class HBaseIOTest {
         assertSourcesEqualReferenceSource(source, splits, null /* options */);
     }
 
+    /** Tests that a {@link HBaseSource} can be read twice, verifying its 
immutability. */
+    @Test
+    public void testReadingSourceTwice() throws Exception {
+        final String table = "TEST-READING-TWICE";
+        final int numRows = 10;
+
+        // Set up test table data and sample row keys for size estimation and 
splitting.
+        createTable(table);
+        writeData(table, numRows);
+
+        HBaseIO.Read read = 
HBaseIO.read().withConfiguration(conf).withTableId(table);
+        HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes 
*/);
+        assertThat(SourceTestUtils.readFromSource(source, null), 
hasSize(numRows));
+        // second read.
+        assertThat(SourceTestUtils.readFromSource(source, null), 
hasSize(numRows));
+    }
 
     /** Tests reading all rows using a filter. */
     @Test

Reply via email to