Repository: beam
Updated Branches:
  refs/heads/master cdf050c0d -> 77a0a2afc


http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
 
b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index 0b7f203..e6f7ac4 100644
--- 
a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ 
b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -75,418 +75,408 @@ import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * Test HBaseIO.
- */
+/** Test HBaseIO. */
 @RunWith(JUnit4.class)
 public class HBaseIOTest {
-    @Rule public final transient TestPipeline p = TestPipeline.create();
-    @Rule public ExpectedException thrown = ExpectedException.none();
-
-    private static HBaseTestingUtility htu;
-    private static HBaseAdmin admin;
-
-    private static final Configuration conf = HBaseConfiguration.create();
-    private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
-    private static final byte[] COLUMN_NAME = Bytes.toBytes("name");
-    private static final byte[] COLUMN_EMAIL = Bytes.toBytes("email");
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
-        // Try to bind the hostname to localhost to solve an issue when it is 
not configured or
-        // no DNS resolution available.
-        conf.setStrings("hbase.master.hostname", "localhost");
-        conf.setStrings("hbase.regionserver.hostname", "localhost");
-        htu = new HBaseTestingUtility(conf);
-
-        // We don't use the full htu.startMiniCluster() to avoid starting 
unneeded HDFS/MR daemons
-        htu.startMiniZKCluster();
-        MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 4);
-        hbm.waitForActiveAndReadyMaster();
-
-        admin = htu.getHBaseAdmin();
-    }
-
-    @AfterClass
-    public static void afterClass() throws Exception {
-        if (admin != null) {
-            admin.close();
-            admin = null;
-        }
-        if (htu != null) {
-            htu.shutdownMiniHBaseCluster();
-            htu.shutdownMiniZKCluster();
-            htu = null;
-        }
-    }
-
-    @Test
-    public void testReadBuildsCorrectly() {
-        HBaseIO.Read read = 
HBaseIO.read().withConfiguration(conf).withTableId("table");
-        assertEquals("table", read.getTableId());
-        assertNotNull("configuration", read.getConfiguration());
-    }
-
-    @Test
-    public void testReadBuildsCorrectlyInDifferentOrder() {
-        HBaseIO.Read read = 
HBaseIO.read().withTableId("table").withConfiguration(conf);
-        assertEquals("table", read.getTableId());
-        assertNotNull("configuration", read.getConfiguration());
-    }
-
-    @Test
-    public void testWriteBuildsCorrectly() {
-        HBaseIO.Write write = 
HBaseIO.write().withConfiguration(conf).withTableId("table");
-        assertEquals("table", write.getTableId());
-        assertNotNull("configuration", write.getConfiguration());
-    }
-
-    @Test
-    public void testWriteBuildsCorrectlyInDifferentOrder() {
-        HBaseIO.Write write = 
HBaseIO.write().withTableId("table").withConfiguration(conf);
-        assertEquals("table", write.getTableId());
-        assertNotNull("configuration", write.getConfiguration());
-    }
-
-    @Test
-    public void testWriteValidationFailsMissingTable() {
-        HBaseIO.Write write = HBaseIO.write().withConfiguration(conf);
-        thrown.expect(IllegalArgumentException.class);
-        write.validate(null /* input */);
-    }
-
-    @Test
-    public void testWriteValidationFailsMissingConfiguration() {
-        HBaseIO.Write write = HBaseIO.write().withTableId("table");
-        thrown.expect(IllegalArgumentException.class);
-        write.validate(null /* input */);
-    }
-
-    /** Tests that when reading from a non-existent table, the read fails. */
-    @Test
-    public void testReadingFailsTableDoesNotExist() throws Exception {
-        final String table = "TEST-TABLE-INVALID";
-        // Exception will be thrown by read.validate() when read is applied.
-        thrown.expect(IllegalArgumentException.class);
-        thrown.expectMessage(String.format("Table %s does not exist", table));
-        runReadTest(HBaseIO.read().withConfiguration(conf).withTableId(table),
-                new ArrayList<Result>());
-    }
-
-    /** Tests that when reading from an empty table, the read succeeds. */
-    @Test
-    public void testReadingEmptyTable() throws Exception {
-        final String table = "TEST-EMPTY-TABLE";
-        createTable(table);
-        runReadTest(HBaseIO.read().withConfiguration(conf).withTableId(table),
-                new ArrayList<Result>());
-    }
-
-    @Test
-    public void testReading() throws Exception {
-        final String table = "TEST-MANY-ROWS-TABLE";
-        final int numRows = 1001;
-        createTable(table);
-        writeData(table, numRows);
-        
runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table), 
1001);
-    }
-
-    /** Tests reading all rows from a split table. */
-    @Test
-    public void testReadingWithSplits() throws Exception {
-        final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
-        final int numRows = 1500;
-        final int numRegions = 4;
-        final long bytesPerRow = 100L;
-
-        // Set up test table data and sample row keys for size estimation and 
splitting.
-        createTable(table);
-        writeData(table, numRows);
-
-        HBaseIO.Read read = 
HBaseIO.read().withConfiguration(conf).withTableId(table);
-        HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes 
*/);
-        List<? extends BoundedSource<Result>> splits =
-                source.split(numRows * bytesPerRow / numRegions,
-                        null /* options */);
-
-        // Test num splits and split equality.
-        assertThat(splits, hasSize(4));
-        assertSourcesEqualReferenceSource(source, splits, null /* options */);
-    }
-
-    /** Tests that a {@link HBaseSource} can be read twice, verifying its 
immutability. */
-    @Test
-    public void testReadingSourceTwice() throws Exception {
-        final String table = "TEST-READING-TWICE";
-        final int numRows = 10;
-
-        // Set up test table data and sample row keys for size estimation and 
splitting.
-        createTable(table);
-        writeData(table, numRows);
-
-        HBaseIO.Read read = 
HBaseIO.read().withConfiguration(conf).withTableId(table);
-        HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes 
*/);
-        assertThat(SourceTestUtils.readFromSource(source, null), 
hasSize(numRows));
-        // second read.
-        assertThat(SourceTestUtils.readFromSource(source, null), 
hasSize(numRows));
-    }
-
-    /** Tests reading all rows using a filter. */
-    @Test
-    public void testReadingWithFilter() throws Exception {
-        final String table = "TEST-FILTER-TABLE";
-        final int numRows = 1001;
-
-        createTable(table);
-        writeData(table, numRows);
-
-        String regex = ".*17.*";
-        Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,
-                new RegexStringComparator(regex));
-        HBaseIO.Read read =
-                
HBaseIO.read().withConfiguration(conf).withTableId(table).withFilter(filter);
-        runReadTestLength(read, 20);
-    }
-
-    /**
-     * Tests reading all rows using key ranges. Tests a prefix [), a suffix 
(], and a restricted
-     * range [] and that some properties hold across them.
-     */
-    @Test
-    public void testReadingWithKeyRange() throws Exception {
-        final String table = "TEST-KEY-RANGE-TABLE";
-        final int numRows = 1001;
-        final byte[] startRow = "2".getBytes();
-        final byte[] stopRow = "9".getBytes();
-        final ByteKey startKey = ByteKey.copyFrom(startRow);
-
-        createTable(table);
-        writeData(table, numRows);
-
-        // Test prefix: [beginning, startKey).
-        final ByteKeyRange prefixRange = 
ByteKeyRange.ALL_KEYS.withEndKey(startKey);
-        
runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table)
-                .withKeyRange(prefixRange), 126);
-
-        // Test suffix: [startKey, end).
-        final ByteKeyRange suffixRange = 
ByteKeyRange.ALL_KEYS.withStartKey(startKey);
-        
runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table)
-                .withKeyRange(suffixRange), 875);
-
-        // Test restricted range: [startKey, endKey).
-        // This one tests the second signature of .withKeyRange
-        
runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table)
-                .withKeyRange(startRow, stopRow), 441);
-    }
-
-    /**
-     * Tests dynamic work rebalancing exhaustively.
-     */
-    @Test
-    public void testReadingSplitAtFractionExhaustive() throws Exception {
-        final String table = "TEST-FEW-ROWS-SPLIT-EXHAUSTIVE-TABLE";
-        final int numRows = 7;
-
-        createTable(table);
-        writeData(table, numRows);
-
-        HBaseIO.Read read = 
HBaseIO.read().withConfiguration(conf).withTableId(table);
-        HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes 
*/)
-            .withStartKey(ByteKey.of(48)).withEndKey(ByteKey.of(58));
-
-        assertSplitAtFractionExhaustive(source, null);
-    }
-
-    /**
-     * Unit tests of splitAtFraction.
-     */
-    @Test
-    public void testReadingSplitAtFraction() throws Exception {
-        final String table = "TEST-SPLIT-AT-FRACTION";
-        final int numRows = 10;
-
-        createTable(table);
-        writeData(table, numRows);
-
-        HBaseIO.Read read = 
HBaseIO.read().withConfiguration(conf).withTableId(table);
-        HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes 
*/);
-
-        // The value k is based on the partitioning schema for the data, in 
this test case,
-        // the partitioning is HEX-based, so we start from 1/16m and the value 
k will be
-        // around 1/256, so the tests are done in approximately k ~= 0.003922 
steps
-        double k = 0.003922;
-
-        assertSplitAtFractionFails(source, 0, k, null /* options */);
-        assertSplitAtFractionFails(source, 0, 1.0, null /* options */);
-        // With 1 items read, all split requests past k will succeed.
-        assertSplitAtFractionSucceedsAndConsistent(source, 1, k, null /* 
options */);
-        assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.666, null /* 
options */);
-        // With 3 items read, all split requests past 3k will succeed.
-        assertSplitAtFractionFails(source, 3, 2 * k, null /* options */);
-        assertSplitAtFractionSucceedsAndConsistent(source, 3, 3 * k, null /* 
options */);
-        assertSplitAtFractionSucceedsAndConsistent(source, 3, 4 * k, null /* 
options */);
-        // With 6 items read, all split requests past 6k will succeed.
-        assertSplitAtFractionFails(source, 6, 5 * k, null /* options */);
-        assertSplitAtFractionSucceedsAndConsistent(source, 6, 0.7, null /* 
options */);
-    }
-
-    @Test
-    public void testReadingDisplayData() {
-        HBaseIO.Read read = 
HBaseIO.read().withConfiguration(conf).withTableId("fooTable");
-        DisplayData displayData = DisplayData.from(read);
-        assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
-        assertThat(displayData, hasDisplayItem("configuration"));
-    }
-
-    /** Tests that a record gets written to the service and messages are 
logged. */
-    @Test
-    public void testWriting() throws Exception {
-        final String table = "table";
-        final String key = "key";
-        final String value = "value";
-        final int numMutations = 100;
-
-        createTable(table);
-
-        p.apply("multiple rows", Create.of(makeMutations(key, value, 
numMutations)))
-         .apply("write", 
HBaseIO.write().withConfiguration(conf).withTableId(table));
-        p.run().waitUntilFinish();
-
-        List<Result> results = readTable(table, new Scan());
-        assertEquals(numMutations, results.size());
-    }
-
-    /** Tests that when writing to a non-existent table, the write fails. */
-    @Test
-    public void testWritingFailsTableDoesNotExist() throws Exception {
-        final String table = "TEST-TABLE-DOES-NOT-EXIST";
-
-        p.apply(Create.empty(HBaseMutationCoder.of()))
-         .apply("write", 
HBaseIO.write().withConfiguration(conf).withTableId(table));
-
-        // Exception will be thrown by write.validate() when write is applied.
-        thrown.expect(IllegalArgumentException.class);
-        thrown.expectMessage(String.format("Table %s does not exist", table));
-        p.run();
-    }
-
-    /** Tests that when writing an element fails, the write fails. */
-    @Test
-    public void testWritingFailsBadElement() throws Exception {
-        final String table = "TEST-TABLE-BAD-ELEMENT";
-        final String key = "KEY";
-        createTable(table);
-
-        p.apply(Create.of(makeBadMutation(key)))
-         .apply(HBaseIO.write().withConfiguration(conf).withTableId(table));
-
-        thrown.expect(Pipeline.PipelineExecutionException.class);
-        
thrown.expectCause(Matchers.<Throwable>instanceOf(IllegalArgumentException.class));
-        thrown.expectMessage("No columns to insert");
-        p.run().waitUntilFinish();
-    }
-
-    @Test
-    public void testWritingDisplayData() {
-        HBaseIO.Write write = 
HBaseIO.write().withTableId("fooTable").withConfiguration(conf);
-        DisplayData displayData = DisplayData.from(write);
-        assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
-    }
-
-    // HBase helper methods
-    private static void createTable(String tableId) throws Exception {
-        byte[][] splitKeys = {"4".getBytes(), "8".getBytes(), "C".getBytes()};
-        createTable(tableId, COLUMN_FAMILY, splitKeys);
-    }
-
-    private static void createTable(String tableId, byte[] columnFamily, 
byte[][] splitKeys)
-            throws Exception {
-        TableName tableName = TableName.valueOf(tableId);
-        HTableDescriptor desc = new HTableDescriptor(tableName);
-        HColumnDescriptor colDef = new HColumnDescriptor(columnFamily);
-        desc.addFamily(colDef);
-        admin.createTable(desc, splitKeys);
-    }
-
-    /**
-     * Helper function to create a table and return the rows that it created.
-     */
-    private static void writeData(String tableId, int numRows) throws 
Exception {
-        Connection connection = admin.getConnection();
-        TableName tableName = TableName.valueOf(tableId);
-        BufferedMutator mutator = connection.getBufferedMutator(tableName);
-        List<Mutation> mutations = makeTableData(numRows);
-        mutator.mutate(mutations);
-        mutator.flush();
-        mutator.close();
-    }
-
-    private static List<Mutation> makeTableData(int numRows) {
-        List<Mutation> mutations = new ArrayList<>(numRows);
-        for (int i = 0; i < numRows; ++i) {
-            // We pad values in hex order 0,1, ... ,F,0, ...
-            String prefix = String.format("%X", i % 16);
-            // This 21 is to have a key longer than an input
-            byte[] rowKey = Bytes.toBytes(
-                    StringUtils.leftPad("_" + String.valueOf(i), 21, prefix));
-            byte[] value = Bytes.toBytes(String.valueOf(i));
-            byte[] valueEmail = Bytes.toBytes(String.valueOf(i) + 
"@email.com");
-            mutations.add(new Put(rowKey).addColumn(COLUMN_FAMILY, 
COLUMN_NAME, value));
-            mutations.add(new Put(rowKey).addColumn(COLUMN_FAMILY, 
COLUMN_EMAIL, valueEmail));
-        }
-        return mutations;
-    }
-
-    private static ResultScanner scanTable(String tableId, Scan scan) throws 
Exception {
-        Connection connection = ConnectionFactory.createConnection(conf);
-        TableName tableName = TableName.valueOf(tableId);
-        Table table = connection.getTable(tableName);
-        return table.getScanner(scan);
-    }
-
-    private static List<Result> readTable(String tableId, Scan scan) throws 
Exception {
-        ResultScanner scanner = scanTable(tableId, scan);
-        List<Result> results = new ArrayList<>();
-        for (Result result : scanner) {
-            results.add(result);
-        }
-        scanner.close();
-        return results;
-    }
-
-    // Beam helper methods
-    /** Helper function to make a single row mutation to be written. */
-    private static Iterable<Mutation> makeMutations(String key, String value, 
int numMutations) {
-        List<Mutation> mutations = new ArrayList<>();
-        for (int i = 0; i < numMutations; i++) {
-            mutations.add(makeMutation(key + i, value));
-        }
-        return mutations;
-    }
-
-    private static Mutation makeMutation(String key, String value) {
-        return new Put(key.getBytes(StandardCharsets.UTF_8))
-                    .addColumn(COLUMN_FAMILY, COLUMN_NAME, 
Bytes.toBytes(value))
-                    .addColumn(COLUMN_FAMILY, COLUMN_EMAIL, 
Bytes.toBytes(value + "@email.com"));
-    }
-
-    private static Mutation makeBadMutation(String key) {
-        return new Put(key.getBytes());
-    }
-
-    private void runReadTest(HBaseIO.Read read, List<Result> expected) {
-        final String transformId = read.getTableId() + "_" + 
read.getKeyRange();
-        PCollection<Result> rows = p.apply("Read" + transformId, read);
-        PAssert.that(rows).containsInAnyOrder(expected);
-        p.run().waitUntilFinish();
-    }
-
-    private void runReadTestLength(HBaseIO.Read read, long numElements) {
-        final String transformId = read.getTableId() + "_" + 
read.getKeyRange();
-        PCollection<Result> rows = p.apply("Read" + transformId, read);
-        PAssert.thatSingleton(rows.apply("Count" + transformId,
-                Count.<Result>globally())).isEqualTo(numElements);
-        p.run().waitUntilFinish();
-    }
+  @Rule public final transient TestPipeline p = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private static HBaseTestingUtility htu;
+  private static HBaseAdmin admin;
+
+  private static final Configuration conf = HBaseConfiguration.create();
+  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
+  private static final byte[] COLUMN_NAME = Bytes.toBytes("name");
+  private static final byte[] COLUMN_EMAIL = Bytes.toBytes("email");
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+    // Try to bind the hostname to localhost to solve an issue when it is not 
configured or
+    // no DNS resolution available.
+    conf.setStrings("hbase.master.hostname", "localhost");
+    conf.setStrings("hbase.regionserver.hostname", "localhost");
+    htu = new HBaseTestingUtility(conf);
+
+    // We don't use the full htu.startMiniCluster() to avoid starting unneeded 
HDFS/MR daemons
+    htu.startMiniZKCluster();
+    MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 4);
+    hbm.waitForActiveAndReadyMaster();
+
+    admin = htu.getHBaseAdmin();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    if (admin != null) {
+      admin.close();
+      admin = null;
+    }
+    if (htu != null) {
+      htu.shutdownMiniHBaseCluster();
+      htu.shutdownMiniZKCluster();
+      htu = null;
+    }
+  }
+
+  @Test
+  public void testReadBuildsCorrectly() {
+    HBaseIO.Read read = 
HBaseIO.read().withConfiguration(conf).withTableId("table");
+    assertEquals("table", read.getTableId());
+    assertNotNull("configuration", read.getConfiguration());
+  }
+
+  @Test
+  public void testReadBuildsCorrectlyInDifferentOrder() {
+    HBaseIO.Read read = 
HBaseIO.read().withTableId("table").withConfiguration(conf);
+    assertEquals("table", read.getTableId());
+    assertNotNull("configuration", read.getConfiguration());
+  }
+
+  @Test
+  public void testWriteBuildsCorrectly() {
+    HBaseIO.Write write = 
HBaseIO.write().withConfiguration(conf).withTableId("table");
+    assertEquals("table", write.getTableId());
+    assertNotNull("configuration", write.getConfiguration());
+  }
+
+  @Test
+  public void testWriteBuildsCorrectlyInDifferentOrder() {
+    HBaseIO.Write write = 
HBaseIO.write().withTableId("table").withConfiguration(conf);
+    assertEquals("table", write.getTableId());
+    assertNotNull("configuration", write.getConfiguration());
+  }
+
+  @Test
+  public void testWriteValidationFailsMissingTable() {
+    HBaseIO.Write write = HBaseIO.write().withConfiguration(conf);
+    thrown.expect(IllegalArgumentException.class);
+    write.validate(null /* input */);
+  }
+
+  @Test
+  public void testWriteValidationFailsMissingConfiguration() {
+    HBaseIO.Write write = HBaseIO.write().withTableId("table");
+    thrown.expect(IllegalArgumentException.class);
+    write.validate(null /* input */);
+  }
+
+  /** Tests that when reading from a non-existent table, the read fails. */
+  @Test
+  public void testReadingFailsTableDoesNotExist() throws Exception {
+    final String table = "TEST-TABLE-INVALID";
+    // Exception will be thrown by read.validate() when read is applied.
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(String.format("Table %s does not exist", table));
+    runReadTest(HBaseIO.read().withConfiguration(conf).withTableId(table), new 
ArrayList<Result>());
+  }
+
+  /** Tests that when reading from an empty table, the read succeeds. */
+  @Test
+  public void testReadingEmptyTable() throws Exception {
+    final String table = "TEST-EMPTY-TABLE";
+    createTable(table);
+    runReadTest(HBaseIO.read().withConfiguration(conf).withTableId(table), new 
ArrayList<Result>());
+  }
+
+  @Test
+  public void testReading() throws Exception {
+    final String table = "TEST-MANY-ROWS-TABLE";
+    final int numRows = 1001;
+    createTable(table);
+    writeData(table, numRows);
+    
runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table), 
1001);
+  }
+
+  /** Tests reading all rows from a split table. */
+  @Test
+  public void testReadingWithSplits() throws Exception {
+    final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
+    final int numRows = 1500;
+    final int numRegions = 4;
+    final long bytesPerRow = 100L;
+
+    // Set up test table data and sample row keys for size estimation and 
splitting.
+    createTable(table);
+    writeData(table, numRows);
+
+    HBaseIO.Read read = 
HBaseIO.read().withConfiguration(conf).withTableId(table);
+    HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */);
+    List<? extends BoundedSource<Result>> splits =
+        source.split(numRows * bytesPerRow / numRegions, null /* options */);
+
+    // Test num splits and split equality.
+    assertThat(splits, hasSize(4));
+    assertSourcesEqualReferenceSource(source, splits, null /* options */);
+  }
+
+  /** Tests that a {@link HBaseSource} can be read twice, verifying its 
immutability. */
+  @Test
+  public void testReadingSourceTwice() throws Exception {
+    final String table = "TEST-READING-TWICE";
+    final int numRows = 10;
+
+    // Set up test table data and sample row keys for size estimation and 
splitting.
+    createTable(table);
+    writeData(table, numRows);
+
+    HBaseIO.Read read = 
HBaseIO.read().withConfiguration(conf).withTableId(table);
+    HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */);
+    assertThat(SourceTestUtils.readFromSource(source, null), hasSize(numRows));
+    // second read.
+    assertThat(SourceTestUtils.readFromSource(source, null), hasSize(numRows));
+  }
+
+  /** Tests reading all rows using a filter. */
+  @Test
+  public void testReadingWithFilter() throws Exception {
+    final String table = "TEST-FILTER-TABLE";
+    final int numRows = 1001;
+
+    createTable(table);
+    writeData(table, numRows);
+
+    String regex = ".*17.*";
+    Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new 
RegexStringComparator(regex));
+    HBaseIO.Read read =
+        
HBaseIO.read().withConfiguration(conf).withTableId(table).withFilter(filter);
+    runReadTestLength(read, 20);
+  }
+
+  /**
+   * Tests reading all rows using key ranges. Tests a prefix [), a suffix (], 
and a restricted range
+   * [] and that some properties hold across them.
+   */
+  @Test
+  public void testReadingWithKeyRange() throws Exception {
+    final String table = "TEST-KEY-RANGE-TABLE";
+    final int numRows = 1001;
+    final byte[] startRow = "2".getBytes();
+    final byte[] stopRow = "9".getBytes();
+    final ByteKey startKey = ByteKey.copyFrom(startRow);
+
+    createTable(table);
+    writeData(table, numRows);
+
+    // Test prefix: [beginning, startKey).
+    final ByteKeyRange prefixRange = 
ByteKeyRange.ALL_KEYS.withEndKey(startKey);
+    runReadTestLength(
+        
HBaseIO.read().withConfiguration(conf).withTableId(table).withKeyRange(prefixRange),
 126);
+
+    // Test suffix: [startKey, end).
+    final ByteKeyRange suffixRange = 
ByteKeyRange.ALL_KEYS.withStartKey(startKey);
+    runReadTestLength(
+        
HBaseIO.read().withConfiguration(conf).withTableId(table).withKeyRange(suffixRange),
 875);
+
+    // Test restricted range: [startKey, endKey).
+    // This one tests the second signature of .withKeyRange
+    runReadTestLength(
+        
HBaseIO.read().withConfiguration(conf).withTableId(table).withKeyRange(startRow,
 stopRow),
+        441);
+  }
+
+  /** Tests dynamic work rebalancing exhaustively. */
+  @Test
+  public void testReadingSplitAtFractionExhaustive() throws Exception {
+    final String table = "TEST-FEW-ROWS-SPLIT-EXHAUSTIVE-TABLE";
+    final int numRows = 7;
+
+    createTable(table);
+    writeData(table, numRows);
+
+    HBaseIO.Read read = 
HBaseIO.read().withConfiguration(conf).withTableId(table);
+    HBaseSource source =
+        new HBaseSource(read, null /* estimatedSizeBytes */)
+            .withStartKey(ByteKey.of(48))
+            .withEndKey(ByteKey.of(58));
+
+    assertSplitAtFractionExhaustive(source, null);
+  }
+
+  /** Unit tests of splitAtFraction. */
+  @Test
+  public void testReadingSplitAtFraction() throws Exception {
+    final String table = "TEST-SPLIT-AT-FRACTION";
+    final int numRows = 10;
+
+    createTable(table);
+    writeData(table, numRows);
+
+    HBaseIO.Read read = 
HBaseIO.read().withConfiguration(conf).withTableId(table);
+    HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */);
+
+    // The value k is based on the partitioning schema for the data, in this 
test case,
+    // the partitioning is HEX-based, so we start from 1/16m and the value k 
will be
+    // around 1/256, so the tests are done in approximately k ~= 0.003922 steps
+    double k = 0.003922;
+
+    assertSplitAtFractionFails(source, 0, k, null /* options */);
+    assertSplitAtFractionFails(source, 0, 1.0, null /* options */);
+    // With 1 items read, all split requests past k will succeed.
+    assertSplitAtFractionSucceedsAndConsistent(source, 1, k, null /* options 
*/);
+    assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.666, null /* 
options */);
+    // With 3 items read, all split requests past 3k will succeed.
+    assertSplitAtFractionFails(source, 3, 2 * k, null /* options */);
+    assertSplitAtFractionSucceedsAndConsistent(source, 3, 3 * k, null /* 
options */);
+    assertSplitAtFractionSucceedsAndConsistent(source, 3, 4 * k, null /* 
options */);
+    // With 6 items read, all split requests past 6k will succeed.
+    assertSplitAtFractionFails(source, 6, 5 * k, null /* options */);
+    assertSplitAtFractionSucceedsAndConsistent(source, 6, 0.7, null /* options 
*/);
+  }
+
+  @Test
+  public void testReadingDisplayData() {
+    HBaseIO.Read read = 
HBaseIO.read().withConfiguration(conf).withTableId("fooTable");
+    DisplayData displayData = DisplayData.from(read);
+    assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
+    assertThat(displayData, hasDisplayItem("configuration"));
+  }
+
+  /** Tests that a record gets written to the service and messages are logged. 
*/
+  @Test
+  public void testWriting() throws Exception {
+    final String table = "table";
+    final String key = "key";
+    final String value = "value";
+    final int numMutations = 100;
+
+    createTable(table);
+
+    p.apply("multiple rows", Create.of(makeMutations(key, value, 
numMutations)))
+        .apply("write", 
HBaseIO.write().withConfiguration(conf).withTableId(table));
+    p.run().waitUntilFinish();
+
+    List<Result> results = readTable(table, new Scan());
+    assertEquals(numMutations, results.size());
+  }
+
+  /** Tests that when writing to a non-existent table, the write fails. */
+  @Test
+  public void testWritingFailsTableDoesNotExist() throws Exception {
+    final String table = "TEST-TABLE-DOES-NOT-EXIST";
+
+    p.apply(Create.empty(HBaseMutationCoder.of()))
+        .apply("write", 
HBaseIO.write().withConfiguration(conf).withTableId(table));
+
+    // Exception will be thrown by write.validate() when write is applied.
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(String.format("Table %s does not exist", table));
+    p.run();
+  }
+
+  /** Tests that when writing an element fails, the write fails. */
+  @Test
+  public void testWritingFailsBadElement() throws Exception {
+    final String table = "TEST-TABLE-BAD-ELEMENT";
+    final String key = "KEY";
+    createTable(table);
+
+    p.apply(Create.of(makeBadMutation(key)))
+        .apply(HBaseIO.write().withConfiguration(conf).withTableId(table));
+
+    thrown.expect(Pipeline.PipelineExecutionException.class);
+    
thrown.expectCause(Matchers.<Throwable>instanceOf(IllegalArgumentException.class));
+    thrown.expectMessage("No columns to insert");
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testWritingDisplayData() {
+    HBaseIO.Write write = 
HBaseIO.write().withTableId("fooTable").withConfiguration(conf);
+    DisplayData displayData = DisplayData.from(write);
+    assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
+  }
+
+  // HBase helper methods
+  private static void createTable(String tableId) throws Exception {
+    byte[][] splitKeys = {"4".getBytes(), "8".getBytes(), "C".getBytes()};
+    createTable(tableId, COLUMN_FAMILY, splitKeys);
+  }
+
+  private static void createTable(String tableId, byte[] columnFamily, 
byte[][] splitKeys)
+      throws Exception {
+    TableName tableName = TableName.valueOf(tableId);
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    HColumnDescriptor colDef = new HColumnDescriptor(columnFamily);
+    desc.addFamily(colDef);
+    admin.createTable(desc, splitKeys);
+  }
+
+  /** Helper function to create a table and return the rows that it created. */
+  private static void writeData(String tableId, int numRows) throws Exception {
+    Connection connection = admin.getConnection();
+    TableName tableName = TableName.valueOf(tableId);
+    BufferedMutator mutator = connection.getBufferedMutator(tableName);
+    List<Mutation> mutations = makeTableData(numRows);
+    mutator.mutate(mutations);
+    mutator.flush();
+    mutator.close();
+  }
+
+  private static List<Mutation> makeTableData(int numRows) {
+    List<Mutation> mutations = new ArrayList<>(numRows);
+    for (int i = 0; i < numRows; ++i) {
+      // We pad values in hex order 0,1, ... ,F,0, ...
+      String prefix = String.format("%X", i % 16);
+      // This 21 is to have a key longer than an input
+      byte[] rowKey = Bytes.toBytes(StringUtils.leftPad("_" + 
String.valueOf(i), 21, prefix));
+      byte[] value = Bytes.toBytes(String.valueOf(i));
+      byte[] valueEmail = Bytes.toBytes(String.valueOf(i) + "@email.com");
+      mutations.add(new Put(rowKey).addColumn(COLUMN_FAMILY, COLUMN_NAME, 
value));
+      mutations.add(new Put(rowKey).addColumn(COLUMN_FAMILY, COLUMN_EMAIL, 
valueEmail));
+    }
+    return mutations;
+  }
+
+  private static ResultScanner scanTable(String tableId, Scan scan) throws 
Exception {
+    Connection connection = ConnectionFactory.createConnection(conf);
+    TableName tableName = TableName.valueOf(tableId);
+    Table table = connection.getTable(tableName);
+    return table.getScanner(scan);
+  }
+
+  private static List<Result> readTable(String tableId, Scan scan) throws 
Exception {
+    ResultScanner scanner = scanTable(tableId, scan);
+    List<Result> results = new ArrayList<>();
+    for (Result result : scanner) {
+      results.add(result);
+    }
+    scanner.close();
+    return results;
+  }
+
+  // Beam helper methods
+  /** Helper function to make a single row mutation to be written. */
+  private static Iterable<Mutation> makeMutations(String key, String value, 
int numMutations) {
+    List<Mutation> mutations = new ArrayList<>();
+    for (int i = 0; i < numMutations; i++) {
+      mutations.add(makeMutation(key + i, value));
+    }
+    return mutations;
+  }
+
+  private static Mutation makeMutation(String key, String value) {
+    return new Put(key.getBytes(StandardCharsets.UTF_8))
+        .addColumn(COLUMN_FAMILY, COLUMN_NAME, Bytes.toBytes(value))
+        .addColumn(COLUMN_FAMILY, COLUMN_EMAIL, Bytes.toBytes(value + 
"@email.com"));
+  }
+
+  private static Mutation makeBadMutation(String key) {
+    return new Put(key.getBytes());
+  }
+
+  private void runReadTest(HBaseIO.Read read, List<Result> expected) {
+    final String transformId = read.getTableId() + "_" + read.getKeyRange();
+    PCollection<Result> rows = p.apply("Read" + transformId, read);
+    PAssert.that(rows).containsInAnyOrder(expected);
+    p.run().waitUntilFinish();
+  }
+
+  private void runReadTestLength(HBaseIO.Read read, long numElements) {
+    final String transformId = read.getTableId() + "_" + read.getKeyRange();
+    PCollection<Result> rows = p.apply("Read" + transformId, read);
+    PAssert.thatSingleton(rows.apply("Count" + transformId, 
Count.<Result>globally()))
+        .isEqualTo(numElements);
+    p.run().waitUntilFinish();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoderTest.java
 
b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoderTest.java
index 5bf2d80..41525dc 100644
--- 
a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoderTest.java
+++ 
b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoderTest.java
@@ -28,9 +28,7 @@ import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * Tests for HBaseMutationCoder.
- */
+/** Tests for HBaseMutationCoder. */
 @RunWith(JUnit4.class)
 public class HBaseMutationCoderTest {
   @Rule public final ExpectedException thrown = ExpectedException.none();

http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseResultCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseResultCoderTest.java
 
b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseResultCoderTest.java
index c6b27d6..5af5e16 100644
--- 
a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseResultCoderTest.java
+++ 
b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseResultCoderTest.java
@@ -25,9 +25,7 @@ import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * Tests for HBaseResultCoder.
- */
+/** Tests for HBaseResultCoder. */
 @RunWith(JUnit4.class)
 public class HBaseResultCoderTest {
   @Rule public final ExpectedException thrown = ExpectedException.none();

http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/SerializableScanTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/SerializableScanTest.java
 
b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/SerializableScanTest.java
index 49eb4e3..7d2fd28 100644
--- 
a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/SerializableScanTest.java
+++ 
b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/SerializableScanTest.java
@@ -28,14 +28,12 @@ import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * Tests for SerializableScan.
- */
+/** Tests for SerializableScan. */
 @RunWith(JUnit4.class)
 public class SerializableScanTest {
   @Rule public final ExpectedException thrown = ExpectedException.none();
   private static final SerializableScan DEFAULT_SERIALIZABLE_SCAN =
-          new SerializableScan(new Scan());
+      new SerializableScan(new Scan());
 
   @Test
   public void testSerializationDeserialization() throws Exception {

Reply via email to