Repository: beam
Updated Branches:
  refs/heads/master 0d639e63d -> ed0d4577d


[BEAM-1157] Add HBaseIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/89fae457
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/89fae457
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/89fae457

Branch: refs/heads/master
Commit: 89fae457e7dc5e0ab1102e2ecdb2a23004f198ba
Parents: 0d639e6
Author: Ismaël Mejía <[email protected]>
Authored: Thu Feb 9 15:56:18 2017 +0100
Committer: Dan Halperin <[email protected]>
Committed: Tue Feb 21 23:19:20 2017 -0800

----------------------------------------------------------------------
 pom.xml                                         |   6 +
 sdks/java/io/hbase/pom.xml                      | 238 +++++++
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   | 696 +++++++++++++++++++
 .../sdk/io/hbase/coders/HBaseMutationCoder.java |  71 ++
 .../sdk/io/hbase/coders/HBaseResultCoder.java   |  54 ++
 .../hbase/coders/SerializableConfiguration.java |  50 ++
 .../sdk/io/hbase/coders/SerializableScan.java   |  49 ++
 .../beam/sdk/io/hbase/coders/package-info.java  |  24 +
 .../apache/beam/sdk/io/hbase/package-info.java  |  24 +
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   | 430 ++++++++++++
 sdks/java/io/pom.xml                            |   5 +-
 sdks/java/javadoc/pom.xml                       |   5 +
 12 files changed, 1650 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a8f99fd..0a7b79f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -384,6 +384,12 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-hbase</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-io-hdfs</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
new file mode 100644
index 0000000..23582d2
--- /dev/null
+++ b/sdks/java/io/hbase/pom.xml
@@ -0,0 +1,238 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-parent</artifactId>
+    <version>0.6.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-io-hbase</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: HBase</name>
+  <description>Library to read and write from/to HBase</description>
+
+  <properties>
+    <hbase.version>1.3.0</hbase.version>
+    <hbase.guava.version>12.0.1</hbase.guava.version>
+    <hbase.protobuf.version>2.5.0</hbase.protobuf.version>
+    <hadoop.version>2.5.1</hadoop.version>
+  </properties>
+
+  <build>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-shade-plugin</artifactId>
+          <executions>
+            <execution>
+              <id>bundle-and-repackage</id>
+              <phase>package</phase>
+              <goals>
+                <goal>shade</goal>
+              </goals>
+              <configuration>
+                <createDependencyReducedPom>false</createDependencyReducedPom>
+                <artifactSet>
+                  <includes>
+                    <include>com.google.guava:guava</include>
+                    <include>com.google.protobuf:protobuf-java</include>
+                  </includes>
+                </artifactSet>
+                <relocations>
+                  <relocation>
+                    <pattern>com.google.common</pattern>
+                    
<shadedPattern>org.apache.beam.sdk.io.hbase.repackaged.com.google.common</shadedPattern>
+                  </relocation>
+                  <relocation>
+                    <pattern>com.google.thirdparty</pattern>
+                    
<shadedPattern>org.apache.beam.sdk.io.hbase.repackaged.com.google.thirdparty</shadedPattern>
+                  </relocation>
+                  <relocation>
+                    <pattern>com.google.protobuf</pattern>
+                    
<shadedPattern>org.apache.beam.sdk.io.hbase.repackaged.com.google.protobuf</shadedPattern>
+                  </relocation>
+                </relocations>
+              </configuration>
+            </execution>
+          </executions>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <exclusions>
+        <!-- exclude beam version of protobuf -->
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-lite</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${hbase.guava.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <scope>provided</scope>
+      <version>${hbase.protobuf.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <version>${hbase.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-protocol</artifactId>
+      <version>${hbase.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <version>${hbase.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-hadoop-compat</artifactId>
+      <version>${hbase.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-hadoop2-compat</artifactId>
+      <version>${hbase.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <version>2.6</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
new file mode 100644
index 0000000..3b5f4da
--- /dev/null
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -0,0 +1,696 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.protobuf.ByteString;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.ByteStringCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.hbase.coders.HBaseMutationCoder;
+import org.apache.beam.sdk.io.hbase.coders.HBaseResultCoder;
+import org.apache.beam.sdk.io.hbase.coders.SerializableConfiguration;
+import org.apache.beam.sdk.io.hbase.coders.SerializableScan;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A bounded source and sink for HBase.
+ *
+ * <p>For more information, see the online documentation at
+ * <a href="https://hbase.apache.org/";>HBase</a>.
+ *
+ * <h3>Reading from HBase</h3>
+ *
+ * <p>The HBase source returns a set of rows from a single table, returning a
+ * {@code PCollection<Result>}.
+ *
+ * <p>To configure a HBase source, you must supply a table id and a {@link 
Configuration}
+ * to identify the HBase instance. By default, {@link HBaseIO.Read} will read 
all rows in the
+ * table. The row range to be read can optionally be restricted using with a 
{@link Scan} object
+ * or using the {@link HBaseIO.Read#withKeyRange}, and a {@link Filter} using
+ * {@link HBaseIO.Read#withFilter}, for example:
+ *
+ * <pre>{@code
+ * // Scan the entire table.
+ * p.apply("read",
+ *     HBaseIO.read()
+ *         .withConfiguration(configuration)
+ *         .withTableId("table"));
+ *
+ * // Filter data using a HBaseIO Scan
+ * Scan scan = ...
+ * p.apply("read",
+ *     HBaseIO.read()
+ *         .withConfiguration(configuration)
+ *         .withTableId("table"))
+ *         .withScan(scan));
+ *
+ * // Scan a prefix of the table.
+ * ByteKeyRange keyRange = ...;
+ * p.apply("read",
+ *     HBaseIO.read()
+ *         .withConfiguration(configuration)
+ *         .withTableId("table")
+ *         .withKeyRange(keyRange));
+ *
+ * // Scan a subset of rows that match the specified row filter.
+ * p.apply("filtered read",
+ *     HBaseIO.read()
+ *         .withConfiguration(configuration)
+ *         .withTableId("table")
+ *         .withFilter(filter));
+ * }</pre>
+ *
+ * <h3>Writing to HBase</h3>
+ *
+ * <p>The HBase sink executes a set of row mutations on a single table. It 
takes as input a
+ * {@link PCollection PCollection&lt;KV&lt;ByteString, 
Iterable&lt;Mutation&gt;&gt;&gt;}, where the
+ * {@link ByteString} is the key of the row being mutated, and each {@link 
Mutation} represents an
+ * idempotent transformation to that row.
+ *
+ * <p>To configure a HBase sink, you must supply a table id and a {@link 
Configuration}
+ * to identify the HBase instance, for example:
+ *
+ * <pre>{@code
+ * Configuration configuration = ...;
+ * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
+ * data.setCoder(HBaseIO.WRITE_CODER);
+ *
+ * data.apply("write",
+ *     HBaseIO.write()
+ *         .withConfiguration(configuration)
+ *         .withTableId("table"));
+ * }</pre>
+ *
+ * <h3>Experimental</h3>
+ *
+ * <p>The design of the API for HBaseIO is currently related to the BigtableIO 
one,
+ * it can evolve or be different in some aspects, but the idea is that users 
can easily migrate
+ * from one to the other</p>.
+ */
+@Experimental
+public class HBaseIO {
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseIO.class);
+
+    /** Disallow construction of utility class. */
+    private HBaseIO() {
+    }
+
+    /**
+     * Creates an uninitialized {@link HBaseIO.Read}. Before use, the {@code 
Read} must be
+     * initialized with a
+     * {@link HBaseIO.Read#withConfiguration(Configuration)} that specifies
+     * the HBase instance, and a {@link HBaseIO.Read#withTableId tableId} that
+     * specifies which table to read. A {@link Filter} may also optionally be 
specified using
+     * {@link HBaseIO.Read#withFilter}.
+     */
+    @Experimental
+    public static Read read() {
+        return new Read(null, "", new SerializableScan(new Scan()));
+    }
+
+    /**
+     * A {@link PTransform} that reads from HBase. See the class-level Javadoc 
on
+     * {@link HBaseIO} for more information.
+     *
+     * @see HBaseIO
+     */
+    public static class Read extends PTransform<PBegin, PCollection<Result>> {
+        /**
+         * Returns a new {@link HBaseIO.Read} that will read from the HBase 
instance
+         * indicated by the given configuration.
+         */
+        public Read withConfiguration(Configuration configuration) {
+            checkNotNull(configuration, "conf");
+            return new Read(new SerializableConfiguration(configuration),
+                    tableId, serializableScan);
+        }
+
+        /**
+         * Returns a new {@link HBaseIO.Read} that will read from the 
specified table.
+         *
+         * <p>Does not modify this object.
+         */
+        public Read withTableId(String tableId) {
+            checkNotNull(tableId, "tableId");
+            return new Read(serializableConfiguration, tableId, 
serializableScan);
+        }
+
+        /**
+         * Returns a new {@link HBaseIO.Read} that will filter the rows read 
from HBase
+         * using the given scan.
+         *
+         * <p>Does not modify this object.
+         */
+        public Read withScan(Scan scan) {
+            checkNotNull(scan, "scan");
+            return new Read(serializableConfiguration, tableId, new 
SerializableScan(scan));
+        }
+
+        /**
+         * Returns a new {@link HBaseIO.Read} that will filter the rows read 
from HBase
+         * using the given row filter.
+         *
+         * <p>Does not modify this object.
+         */
+        public Read withFilter(Filter filter) {
+            checkNotNull(filter, "filter");
+            return withScan(serializableScan.getScan().setFilter(filter));
+        }
+
+        /**
+         * Returns a new {@link HBaseIO.Read} that will read only rows in the 
specified range.
+         *
+         * <p>Does not modify this object.
+         */
+        public Read withKeyRange(ByteKeyRange keyRange) {
+            checkNotNull(keyRange, "keyRange");
+            byte[] startRow = keyRange.getStartKey().getBytes();
+            byte[] stopRow = keyRange.getEndKey().getBytes();
+            return 
withScan(serializableScan.getScan().setStartRow(startRow).setStopRow(stopRow));
+        }
+
+        /**
+         * Returns a new {@link HBaseIO.Read} that will read only rows in the 
specified range.
+         *
+         * <p>Does not modify this object.
+         */
+        public Read withKeyRange(byte[] startRow, byte[] stopRow) {
+            checkNotNull(startRow, "startRow");
+            checkNotNull(stopRow, "stopRow");
+            ByteKeyRange keyRange =
+                    ByteKeyRange.of(ByteKey.copyFrom(startRow), 
ByteKey.copyFrom(stopRow));
+            return withKeyRange(keyRange);
+        }
+
+        private Read(SerializableConfiguration serializableConfiguration, 
String tableId,
+                     SerializableScan serializableScan) {
+            this.serializableConfiguration = serializableConfiguration;
+            this.tableId = tableId;
+            this.serializableScan = serializableScan;
+        }
+
+        @Override
+        public PCollection<Result> expand(PBegin input) {
+            HBaseSource source = new HBaseSource(this, null /* 
estimatedSizeBytes */);
+            return 
input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
+        }
+
+        @Override
+        public void validate(PBegin input) {
+            checkArgument(serializableConfiguration != null,
+                    "Configuration not provided");
+            checkArgument(!tableId.isEmpty(), "Table ID not specified");
+            try (Connection connection = ConnectionFactory.createConnection(
+                    serializableConfiguration.getConfiguration())) {
+                Admin admin = connection.getAdmin();
+                checkArgument(admin.tableExists(TableName.valueOf(tableId)),
+                        "Table %s does not exist", tableId);
+            } catch (IOException e) {
+                LOG.warn("Error checking whether table {} exists; 
proceeding.", tableId, e);
+            }
+        }
+
+        @Override
+        public void populateDisplayData(DisplayData.Builder builder) {
+            super.populateDisplayData(builder);
+            builder.add(DisplayData.item("configuration",
+                    serializableConfiguration.getConfiguration().toString()));
+            builder.add(DisplayData.item("tableId", tableId));
+            builder.addIfNotNull(DisplayData.item("scan", 
serializableScan.getScan().toString()));
+        }
+
+        public String getTableId() {
+            return tableId;
+        }
+
+        public Configuration getConfiguration() {
+            return serializableConfiguration.getConfiguration();
+        }
+
+        /**
+         * Returns the range of keys that will be read from the table.
+         */
+        public ByteKeyRange getKeyRange() {
+            byte[] startRow = serializableScan.getScan().getStartRow();
+            byte[] stopRow = serializableScan.getScan().getStopRow();
+            return ByteKeyRange.of(ByteKey.copyFrom(startRow), 
ByteKey.copyFrom(stopRow));
+        }
+
+        private SerializableConfiguration serializableConfiguration;
+        private String tableId;
+        private SerializableScan serializableScan;
+    }
+
+    static class HBaseSource extends BoundedSource<Result> {
+        private Read read;
+        @Nullable private Long estimatedSizeBytes;
+
+        HBaseSource(Read read, @Nullable Long estimatedSizeBytes) {
+            this.read = read;
+            this.estimatedSizeBytes = estimatedSizeBytes;
+        }
+
+        @Override
+        public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) 
throws Exception {
+            if (estimatedSizeBytes == null) {
+                estimatedSizeBytes = estimateSizeBytes();
+                LOG.debug("Estimated size {} bytes for table {} and scan {}", 
estimatedSizeBytes,
+                        read.tableId, read.serializableScan.getScan());
+            }
+            return estimatedSizeBytes;
+        }
+
+        /**
+         * This estimates the real size, it can be the compressed size 
depending on the HBase
+         * configuration.
+         */
+        private long estimateSizeBytes() throws Exception {
+            // This code is based on RegionSizeCalculator in hbase-server
+            long estimatedSizeBytes = 0L;
+            Configuration configuration = 
this.read.serializableConfiguration.getConfiguration();
+            try (Connection connection = 
ConnectionFactory.createConnection(configuration)) {
+                // filter regions for the given table/scan
+                List<HRegionLocation> regionLocations = 
getRegionLocations(connection);
+
+                // builds set of regions who are part of the table scan
+                Set<byte[]> tableRegions = new 
TreeSet<>(Bytes.BYTES_COMPARATOR);
+                for (HRegionLocation regionLocation : regionLocations) {
+                    
tableRegions.add(regionLocation.getRegionInfo().getRegionName());
+                }
+
+                // calculate estimated size for the regions
+                Admin admin = connection.getAdmin();
+                ClusterStatus clusterStatus = admin.getClusterStatus();
+                Collection<ServerName> servers = clusterStatus.getServers();
+                for (ServerName serverName : servers) {
+                    ServerLoad serverLoad = clusterStatus.getLoad(serverName);
+                    for (RegionLoad regionLoad : 
serverLoad.getRegionsLoad().values()) {
+                        byte[] regionId = regionLoad.getName();
+                        if (tableRegions.contains(regionId)) {
+                            long regionSizeBytes = 
regionLoad.getStorefileSizeMB() * 1_048_576L;
+                            estimatedSizeBytes += regionSizeBytes;
+                        }
+                    }
+                }
+            }
+            return estimatedSizeBytes;
+        }
+
+        private List<HRegionLocation> getRegionLocations(Connection 
connection) throws Exception {
+            final Scan scan = read.serializableScan.getScan();
+            byte[] startRow = scan.getStartRow();
+            byte[] stopRow = scan.getStopRow();
+
+            final List<HRegionLocation> regionLocations = new ArrayList<>();
+
+            final boolean scanWithNoLowerBound = startRow.length == 0;
+            final boolean scanWithNoUpperBound = stopRow.length == 0;
+
+            TableName tableName = TableName.valueOf(read.tableId);
+            RegionLocator regionLocator = 
connection.getRegionLocator(tableName);
+            List<HRegionLocation> tableRegionInfos = 
regionLocator.getAllRegionLocations();
+            for (HRegionLocation regionLocation : tableRegionInfos) {
+                final byte[] startKey = 
regionLocation.getRegionInfo().getStartKey();
+                final byte[] endKey = 
regionLocation.getRegionInfo().getEndKey();
+                boolean isLastRegion = endKey.length == 0;
+                // filters regions who are part of the scan
+                if ((scanWithNoLowerBound
+                        || isLastRegion || Bytes.compareTo(startRow, endKey) < 
0)
+                        && (scanWithNoUpperBound || Bytes.compareTo(stopRow, 
startKey) > 0)) {
+                    regionLocations.add(regionLocation);
+                }
+            }
+
+            return regionLocations;
+        }
+
+        private List<HBaseSource>
+            splitBasedOnRegions(List<HRegionLocation> regionLocations, int 
numSplits)
+                throws Exception {
+            final Scan scan = read.serializableScan.getScan();
+            byte[] startRow = scan.getStartRow();
+            byte[] stopRow = scan.getStopRow();
+
+            final List<HBaseSource> sources = new ArrayList<>(numSplits);
+            final boolean scanWithNoLowerBound = startRow.length == 0;
+            final boolean scanWithNoUpperBound = stopRow.length == 0;
+
+            for (HRegionLocation regionLocation : regionLocations) {
+                final byte[] startKey = 
regionLocation.getRegionInfo().getStartKey();
+                final byte[] endKey = 
regionLocation.getRegionInfo().getEndKey();
+                boolean isLastRegion = endKey.length == 0;
+                String host = regionLocation.getHostnamePort();
+
+                final byte[] splitStart = (scanWithNoLowerBound
+                        || Bytes.compareTo(startKey, startRow) >= 0) ? 
startKey : startRow;
+                final byte[] splitStop =
+                        (scanWithNoUpperBound || Bytes.compareTo(endKey, 
stopRow) <= 0)
+                                && !isLastRegion ? endKey : stopRow;
+
+                LOG.debug("{} {} {} {} {}", sources.size(), host, read.tableId,
+                        Bytes.toString(splitStart), Bytes.toString(splitStop));
+
+                // We need to create a new copy of the scan and read to add 
the new ranges
+                Scan newScan = new 
Scan(scan).setStartRow(splitStart).setStopRow(splitStop);
+                Read newRead = new Read(read.serializableConfiguration, 
read.tableId,
+                        new SerializableScan(newScan));
+                sources.add(new HBaseSource(newRead, estimatedSizeBytes));
+            }
+            return sources;
+        }
+
+        @Override
+        public List<? extends BoundedSource<Result>>
+            splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions 
options)
+                throws Exception {
+            LOG.debug("desiredBundleSize {} bytes", desiredBundleSizeBytes);
+            long estimatedSizeBytes = getEstimatedSizeBytes(options);
+            int numSplits = 1;
+            if (estimatedSizeBytes > 0 && desiredBundleSizeBytes > 0) {
+                numSplits = (int) Math.ceil((double) estimatedSizeBytes / 
desiredBundleSizeBytes);
+            }
+
+            try (Connection connection = ConnectionFactory.createConnection(
+                    read.getConfiguration())) {
+                List<HRegionLocation> regionLocations = 
getRegionLocations(connection);
+                int realNumSplits =
+                        numSplits < regionLocations.size() ? 
regionLocations.size() : numSplits;
+                LOG.debug("Suggested {} bundle(s) based on size", numSplits);
+                LOG.debug("Suggested {} bundle(s) based on number of regions",
+                        regionLocations.size());
+                final List<HBaseSource> sources = 
splitBasedOnRegions(regionLocations,
+                        realNumSplits);
+                LOG.debug("Split into {} bundle(s)", sources.size());
+                if (numSplits >= 1) {
+                    return sources;
+                }
+                return Collections.singletonList(this);
+            }
+        }
+
+        @Override
+        public BoundedReader<Result> createReader(PipelineOptions 
pipelineOptions)
+                throws IOException {
+            return new HBaseReader(this);
+        }
+
+        @Override
+        public void validate() {
+            read.validate(null /* input */);
+        }
+
+        @Override
+        public void populateDisplayData(DisplayData.Builder builder) {
+            read.populateDisplayData(builder);
+        }
+
+        @Override
+        public Coder<Result> getDefaultOutputCoder() {
+            return HBaseResultCoder.of();
+        }
+    }
+
+    private static class HBaseReader extends 
BoundedSource.BoundedReader<Result> {
+        private final HBaseSource source;
+        private Connection connection;
+        private ResultScanner scanner;
+        private Iterator<Result> iter;
+        private Result result;
+        private long recordsReturned;
+
+        HBaseReader(HBaseSource source) {
+            this.source = source;
+        }
+
+        @Override
+        public boolean start() throws IOException {
+            Configuration configuration = 
source.read.serializableConfiguration.getConfiguration();
+            String tableId = source.read.tableId;
+            connection = ConnectionFactory.createConnection(configuration);
+            TableName tableName = TableName.valueOf(tableId);
+            Table table = connection.getTable(tableName);
+            Scan scan = source.read.serializableScan.getScan();
+            scanner = table.getScanner(scan);
+            iter = scanner.iterator();
+            return advance();
+        }
+
+        @Override
+        public Result getCurrent() throws NoSuchElementException {
+            return result;
+        }
+
+        @Override
+        public boolean advance() throws IOException {
+            boolean hasRecord = iter.hasNext();
+            if (hasRecord) {
+                result = iter.next();
+                ++recordsReturned;
+            }
+            return hasRecord;
+        }
+
+        @Override
+        public void close() throws IOException {
+            LOG.debug("Closing reader after reading {} records.", 
recordsReturned);
+            if (scanner != null) {
+                scanner.close();
+                scanner = null;
+            }
+            if (connection != null) {
+                connection.close();
+                connection = null;
+            }
+        }
+
+        @Override
+        public BoundedSource<Result> getCurrentSource() {
+            return source;
+        }
+    }
+
+    /**
+     * Creates an uninitialized {@link HBaseIO.Write}. Before use, the {@code 
Write} must be
+     * initialized with a
+     * {@link HBaseIO.Write#withConfiguration(Configuration)} that specifies
+     * the destination HBase instance, and a {@link HBaseIO.Write#withTableId 
tableId}
+     * that specifies which table to write.
+     */
+    public static Write write() {
+        return new Write(null /* SerializableConfiguration */, "");
+    }
+
+    /**
+     * A {@link PTransform} that writes to HBase. See the class-level Javadoc 
on
+     * {@link HBaseIO} for more information.
+     *
+     * @see HBaseIO
+     */
+    public static class Write
+            extends PTransform<PCollection<KV<ByteString, 
Iterable<Mutation>>>, PDone> {
+
+        /**
+         * Returns a new {@link HBaseIO.Write} that will write to the HBase 
instance
+         * indicated by the given Configuration, and using any other specified 
customizations.
+         *
+         * <p>Does not modify this object.
+         */
+        public Write withConfiguration(Configuration configuration) {
+            checkNotNull(configuration, "conf");
+            return new Write(new SerializableConfiguration(configuration), 
tableId);
+        }
+
+        /**
+         * Returns a new {@link HBaseIO.Write} that will write to the 
specified table.
+         *
+         * <p>Does not modify this object.
+         */
+        public Write withTableId(String tableId) {
+            checkNotNull(tableId, "tableId");
+            return new Write(serializableConfiguration, tableId);
+        }
+
+        private Write(SerializableConfiguration serializableConfiguration, 
String tableId) {
+            this.serializableConfiguration = serializableConfiguration;
+            this.tableId = tableId;
+        }
+
+        @Override
+        public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> 
input) {
+            input.apply(ParDo.of(new HBaseWriterFn(tableId, 
serializableConfiguration)));
+            return PDone.in(input.getPipeline());
+        }
+
+        @Override
+        public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> 
input) {
+            checkArgument(serializableConfiguration != null, "Configuration 
not specified");
+            checkArgument(!tableId.isEmpty(), "Table ID not specified");
+            try (Connection connection = ConnectionFactory.createConnection(
+                    serializableConfiguration.getConfiguration())) {
+                Admin admin = connection.getAdmin();
+                checkArgument(admin.tableExists(TableName.valueOf(tableId)),
+                        "Table %s does not exist", tableId);
+            } catch (IOException e) {
+                LOG.warn("Error checking whether table {} exists; 
proceeding.", tableId, e);
+            }
+        }
+
+        @Override
+        public void populateDisplayData(DisplayData.Builder builder) {
+            super.populateDisplayData(builder);
+            builder.add(DisplayData.item("configuration",
+                    serializableConfiguration.getConfiguration().toString()));
+            builder.add(DisplayData.item("tableId", tableId));
+        }
+
+        public String getTableId() {
+            return tableId;
+        }
+
+        public Configuration getConfiguration() {
+            return serializableConfiguration.getConfiguration();
+        }
+
+        private final String tableId;
+        private final SerializableConfiguration serializableConfiguration;
+
+        private class HBaseWriterFn extends DoFn<KV<ByteString, 
Iterable<Mutation>>, Void> {
+
+            public HBaseWriterFn(String tableId,
+                                 SerializableConfiguration 
serializableConfiguration) {
+                this.tableId = checkNotNull(tableId, "tableId");
+                this.serializableConfiguration = 
checkNotNull(serializableConfiguration,
+                        "serializableConfiguration");
+            }
+
+            @Setup
+            public void setup() throws Exception {
+                Configuration configuration = 
this.serializableConfiguration.getConfiguration();
+                connection = ConnectionFactory.createConnection(configuration);
+
+                TableName tableName = TableName.valueOf(tableId);
+                BufferedMutatorParams params =
+                    new BufferedMutatorParams(tableName);
+                mutator = connection.getBufferedMutator(params);
+
+                recordsWritten = 0;
+            }
+
+            @StartBundle
+            public void startBundle(Context c) throws Exception {
+
+            }
+
+            @ProcessElement
+            public void processElement(ProcessContext ctx) throws Exception {
+                KV<ByteString, Iterable<Mutation>> record = ctx.element();
+                List<Mutation> mutations = new ArrayList<>();
+                for (Mutation mutation : record.getValue()) {
+                    mutations.add(mutation);
+                    ++recordsWritten;
+                }
+                mutator.mutate(mutations);
+            }
+
+            @FinishBundle
+            public void finishBundle(Context c) throws Exception {
+                mutator.flush();
+            }
+
+            @Teardown
+            public void tearDown() throws Exception {
+                if (mutator != null) {
+                    mutator.close();
+                    mutator = null;
+                }
+                if (connection != null) {
+                    connection.close();
+                    connection = null;
+                }
+                LOG.debug("Wrote {} records", recordsWritten);
+            }
+
+            @Override
+            public void populateDisplayData(DisplayData.Builder builder) {
+                builder.delegate(Write.this);
+            }
+
+            private final String tableId;
+            private final SerializableConfiguration serializableConfiguration;
+
+            private Connection connection;
+            private BufferedMutator mutator;
+
+            private long recordsWritten;
+        }
+    }
+
+    public static final Coder<KV<ByteString, Iterable<Mutation>>> WRITE_CODER =
+            KvCoder.of(ByteStringCoder.of(), 
IterableCoder.of(HBaseMutationCoder.of()));
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseMutationCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseMutationCoder.java
 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseMutationCoder.java
new file mode 100644
index 0000000..a99a943
--- /dev/null
+++ 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseMutationCoder.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase.coders;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+
+/**
+ * A {@link Coder} that serializes and deserializes the {@link Mutation} 
objects using {@link
+ * ProtobufUtil}.
+ */
+public class HBaseMutationCoder extends AtomicCoder<Mutation> implements 
Serializable {
+  private static final HBaseMutationCoder INSTANCE = new HBaseMutationCoder();
+
+  private HBaseMutationCoder() {}
+
+  public static HBaseMutationCoder of() {
+    return INSTANCE;
+  }
+
+  @Override
+  public void encode(Mutation mutation, OutputStream outStream,
+                     Coder.Context context) throws IOException {
+    MutationType type = getType(mutation);
+    MutationProto proto = ProtobufUtil.toMutation(type, mutation);
+    proto.writeDelimitedTo(outStream);
+  }
+
+  @Override
+  public Mutation decode(InputStream inStream,
+                         Coder.Context context) throws IOException {
+    return ProtobufUtil.toMutation(MutationProto.parseDelimitedFrom(inStream));
+  }
+
+  private static MutationType getType(Mutation mutation) {
+    if (mutation instanceof Put) {
+      return MutationType.PUT;
+    } else if (mutation instanceof Delete) {
+      return MutationType.DELETE;
+    } else {
+      // Increment and Append are not idempotent.  They should not be used in 
distributed jobs.
+      throw new IllegalArgumentException("Only Put and Delete are supported");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseResultCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseResultCoder.java
 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseResultCoder.java
new file mode 100644
index 0000000..f10a517
--- /dev/null
+++ 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseResultCoder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase.coders;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+
+/**
+ * A {@link Coder} that serializes and deserializes the {@link Result} objects 
using {@link
+ * ProtobufUtil}.
+ */
+public class HBaseResultCoder extends AtomicCoder<Result> implements 
Serializable {
+
+  private static final HBaseResultCoder INSTANCE = new HBaseResultCoder();
+
+  public static HBaseResultCoder of() {
+    return INSTANCE;
+  }
+
+  @Override
+  public Result decode(InputStream inputStream, Coder.Context context)
+      throws IOException {
+    return 
ProtobufUtil.toResult(ClientProtos.Result.parseDelimitedFrom(inputStream));
+  }
+
+  @Override
+  public void encode(Result value, OutputStream outputStream, Coder.Context 
context)
+      throws IOException {
+    ProtobufUtil.toResult(value).writeDelimitedTo(outputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableConfiguration.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableConfiguration.java
 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableConfiguration.java
new file mode 100644
index 0000000..de479de
--- /dev/null
+++ 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableConfiguration.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase.coders;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * This is just a wrapper class to serialize Hadoop/HBase {@link 
Configuration}.
+ */
+public class SerializableConfiguration implements Serializable {
+    private transient Configuration conf;
+
+    public SerializableConfiguration(Configuration conf) {
+        this.conf = conf;
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        conf.write(out);
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException {
+        conf = new Configuration(false);
+        conf.readFields(in);
+    }
+
+    public Configuration getConfiguration() {
+        return conf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableScan.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableScan.java
 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableScan.java
new file mode 100644
index 0000000..96beff9
--- /dev/null
+++ 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableScan.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase.coders;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+
+/**
+ * This is just a wrapper class to serialize HBase {@link Scan}.
+ */
+public class SerializableScan implements Serializable {
+    private transient Scan scan;
+
+    public SerializableScan(Scan scan) {
+        this.scan = scan;
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        ProtobufUtil.toScan(scan).writeDelimitedTo(out);
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException {
+        scan = ProtobufUtil.toScan(ClientProtos.Scan.parseDelimitedFrom(in));
+    }
+
+    public Scan getScan() {
+        return scan;
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/package-info.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/package-info.java
 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/package-info.java
new file mode 100644
index 0000000..d21b927
--- /dev/null
+++ 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines coders used while reading and writing from/to HBase.
+ *
+ * @see org.apache.beam.sdk.io.hbase.HBaseIO
+ */
+package org.apache.beam.sdk.io.hbase.coders;

http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/package-info.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/package-info.java
 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/package-info.java
new file mode 100644
index 0000000..95c1ea7
--- /dev/null
+++ 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines transforms for reading and writing from HBase.
+ *
+ * @see org.apache.beam.sdk.io.hbase.HBaseIO
+ */
+package org.apache.beam.sdk.io.hbase;

http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
 
b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
new file mode 100644
index 0000000..d3bc4dc
--- /dev/null
+++ 
b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import static 
org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource;
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+import com.google.protobuf.ByteString;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.hbase.HBaseIO.HBaseSource;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.hamcrest.Matchers;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test HBaseIO.
+ */
+@RunWith(JUnit4.class)
+public class HBaseIOTest {
+    @Rule public final transient TestPipeline p = TestPipeline.create();
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    private static HBaseTestingUtility htu;
+    private static HBaseAdmin admin;
+
+    private static Configuration conf = HBaseConfiguration.create();
+    private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
+    private static final byte[] COLUMN_NAME = Bytes.toBytes("name");
+    private static final byte[] COLUMN_EMAIL = Bytes.toBytes("email");
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+        // Try to bind the hostname to localhost to solve an issue when it is 
not configured or
+        // no DNS resolution available.
+        conf.setStrings("hbase.master.hostname", "localhost");
+        conf.setStrings("hbase.regionserver.hostname", "localhost");
+        htu = new HBaseTestingUtility(conf);
+        htu.startMiniCluster(1, 4);
+        admin = htu.getHBaseAdmin();
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        if (admin != null) {
+            admin.close();
+            admin = null;
+        }
+        if (htu != null) {
+            htu.shutdownMiniCluster();
+            htu = null;
+        }
+    }
+
+    @Test
+    public void testReadBuildsCorrectly() {
+        HBaseIO.Read read = 
HBaseIO.read().withConfiguration(conf).withTableId("table");
+        assertEquals("table", read.getTableId());
+        assertNotNull("configuration", read.getConfiguration());
+    }
+
+    @Test
+    public void testReadBuildsCorrectlyInDifferentOrder() {
+        HBaseIO.Read read = 
HBaseIO.read().withTableId("table").withConfiguration(conf);
+        assertEquals("table", read.getTableId());
+        assertNotNull("configuration", read.getConfiguration());
+    }
+
+    @Test
+    public void testWriteBuildsCorrectly() {
+        HBaseIO.Write write = 
HBaseIO.write().withConfiguration(conf).withTableId("table");
+        assertEquals("table", write.getTableId());
+        assertNotNull("configuration", write.getConfiguration());
+    }
+
+    @Test
+    public void testWriteBuildsCorrectlyInDifferentOrder() {
+        HBaseIO.Write write = 
HBaseIO.write().withTableId("table").withConfiguration(conf);
+        assertEquals("table", write.getTableId());
+        assertNotNull("configuration", write.getConfiguration());
+    }
+
+    @Test
+    public void testWriteValidationFailsMissingTable() {
+        HBaseIO.Write write = HBaseIO.write().withConfiguration(conf);
+        thrown.expect(IllegalArgumentException.class);
+        write.validate(null /* input */);
+    }
+
+    @Test
+    public void testWriteValidationFailsMissingConfiguration() {
+        HBaseIO.Write write = HBaseIO.write().withTableId("table");
+        thrown.expect(IllegalArgumentException.class);
+        write.validate(null /* input */);
+    }
+
+    /** Tests that when reading from a non-existent table, the read fails. */
+    @Test
+    @Category(NeedsRunner.class)
+    public void testReadingFailsTableDoesNotExist() throws Exception {
+        final String table = "TEST-TABLE-INVALID";
+        // Exception will be thrown by read.validate() when read is applied.
+        thrown.expect(IllegalArgumentException.class);
+        thrown.expectMessage(String.format("Table %s does not exist", table));
+        runReadTest(HBaseIO.read().withConfiguration(conf).withTableId(table),
+                new ArrayList<Result>());
+    }
+
+    /** Tests that when reading from an empty table, the read succeeds. */
+    @Test
+    @Category(NeedsRunner.class)
+    public void testReadingEmptyTable() throws Exception {
+        final String table = "TEST-EMPTY-TABLE";
+        createTable(table);
+        runReadTest(HBaseIO.read().withConfiguration(conf).withTableId(table),
+                new ArrayList<Result>());
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void testReading() throws Exception {
+        final String table = "TEST-MANY-ROWS-TABLE";
+        final int numRows = 1001;
+        createTable(table);
+        writeData(table, numRows);
+        
runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table), 
1001);
+    }
+
+    /** Tests reading all rows from a split table. */
+    @Test
+    public void testReadingWithSplits() throws Exception {
+        final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
+        final int numRows = 1500;
+        final int numRegions = 4;
+        final long bytesPerRow = 100L;
+
+        // Set up test table data and sample row keys for size estimation and 
splitting.
+        createTable(table);
+        writeData(table, numRows);
+
+        HBaseIO.Read read = 
HBaseIO.read().withConfiguration(conf).withTableId(table);
+        HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes 
*/);
+        List<? extends BoundedSource<Result>> splits =
+                source.splitIntoBundles(numRows * bytesPerRow / numRegions,
+                        null /* options */);
+
+        // Test num splits and split equality.
+        assertThat(splits, hasSize(4));
+        assertSourcesEqualReferenceSource(source, splits, null /* options */);
+    }
+
+
+    /** Tests reading all rows using a filter. */
+    @Test
+    @Category(NeedsRunner.class)
+    public void testReadingWithFilter() throws Exception {
+        final String table = "TEST-FILTER-TABLE";
+        final int numRows = 1001;
+
+        createTable(table);
+        writeData(table, numRows);
+
+        String regex = ".*17.*";
+        Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,
+                new RegexStringComparator(regex));
+        HBaseIO.Read read =
+                
HBaseIO.read().withConfiguration(conf).withTableId(table).withFilter(filter);
+        runReadTestLength(read, 20);
+    }
+
+    /**
+     * Tests reading all rows using key ranges. Tests a prefix [), a suffix 
(], and a restricted
+     * range [] and that some properties hold across them.
+     */
+    @Test
+    @Category(NeedsRunner.class)
+    public void testReadingWithKeyRange() throws Exception {
+        final String table = "TEST-KEY-RANGE-TABLE";
+        final int numRows = 1001;
+        final byte[] startRow = "2".getBytes();
+        final byte[] stopRow = "9".getBytes();
+        final ByteKey startKey = ByteKey.copyFrom(startRow);
+
+        createTable(table);
+        writeData(table, numRows);
+
+        // Test prefix: [beginning, startKey).
+        final ByteKeyRange prefixRange = 
ByteKeyRange.ALL_KEYS.withEndKey(startKey);
+        
runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table)
+                .withKeyRange(prefixRange), 126);
+
+        // Test suffix: [startKey, end).
+        final ByteKeyRange suffixRange = 
ByteKeyRange.ALL_KEYS.withStartKey(startKey);
+        
runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table)
+                .withKeyRange(suffixRange), 875);
+
+        // Test restricted range: [startKey, endKey).
+        // This one tests the second signature of .withKeyRange
+        
runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table)
+                .withKeyRange(startRow, stopRow), 441);
+    }
+
+    @Test
+    public void testReadingDisplayData() {
+        HBaseIO.Read read = 
HBaseIO.read().withConfiguration(conf).withTableId("fooTable");
+        DisplayData displayData = DisplayData.from(read);
+        assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
+        assertThat(displayData, hasDisplayItem("configuration"));
+    }
+
+    /** Tests that a record gets written to the service and messages are 
logged. */
+    @Test
+    @Category(NeedsRunner.class)
+    public void testWriting() throws Exception {
+        final String table = "table";
+        final String key = "key";
+        final String value = "value";
+
+        createTable(table);
+
+        p.apply("single row", Create.of(makeWrite(key, 
value)).withCoder(HBaseIO.WRITE_CODER))
+                .apply("write", 
HBaseIO.write().withConfiguration(conf).withTableId(table));
+        p.run().waitUntilFinish();
+
+        List<Result> results = readTable(table, new Scan());
+        assertEquals(1, results.size());
+    }
+
+    /** Tests that when writing to a non-existent table, the write fails. */
+    @Test
+    public void testWritingFailsTableDoesNotExist() throws Exception {
+        final String table = "TEST-TABLE";
+
+        PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput =
+                p.apply(Create.empty(HBaseIO.WRITE_CODER));
+
+        // Exception will be thrown by write.validate() when write is applied.
+        thrown.expect(IllegalArgumentException.class);
+        thrown.expectMessage(String.format("Table %s does not exist", table));
+
+        emptyInput.apply("write", 
HBaseIO.write().withConfiguration(conf).withTableId(table));
+    }
+
+    /** Tests that when writing an element fails, the write fails. */
+    @Test
+    @Category(NeedsRunner.class)
+    public void testWritingFailsBadElement() throws Exception {
+        final String table = "TEST-TABLE";
+        final String key = "KEY";
+        createTable(table);
+
+        p.apply(Create.of(makeBadWrite(key)).withCoder(HBaseIO.WRITE_CODER))
+                
.apply(HBaseIO.write().withConfiguration(conf).withTableId(table));
+
+        thrown.expect(Pipeline.PipelineExecutionException.class);
+        
thrown.expectCause(Matchers.<Throwable>instanceOf(IllegalArgumentException.class));
+        thrown.expectMessage("No columns to insert");
+        p.run().waitUntilFinish();
+    }
+
+    @Test
+    public void testWritingDisplayData() {
+        HBaseIO.Write write = 
HBaseIO.write().withTableId("fooTable").withConfiguration(conf);
+        DisplayData displayData = DisplayData.from(write);
+        assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
+    }
+
+    // HBase helper methods
+    private static void createTable(String tableId) throws Exception {
+        byte[][] splitKeys = {"4".getBytes(), "8".getBytes(), "C".getBytes()};
+        createTable(tableId, COLUMN_FAMILY, splitKeys);
+    }
+
+    private static void createTable(String tableId, byte[] columnFamily, 
byte[][] splitKeys)
+            throws Exception {
+        TableName tableName = TableName.valueOf(tableId);
+        HTableDescriptor desc = new HTableDescriptor(tableName);
+        HColumnDescriptor colDef = new HColumnDescriptor(columnFamily);
+        desc.addFamily(colDef);
+        admin.createTable(desc, splitKeys);
+    }
+
+    /**
+     * Helper function to create a table and return the rows that it created.
+     */
+    private static void writeData(String tableId, int numRows) throws 
Exception {
+        Connection connection = admin.getConnection();
+        TableName tableName = TableName.valueOf(tableId);
+        BufferedMutator mutator = connection.getBufferedMutator(tableName);
+        List<Mutation> mutations = makeTableData(numRows);
+        mutator.mutate(mutations);
+        mutator.flush();
+        mutator.close();
+    }
+
+    private static List<Mutation> makeTableData(int numRows) {
+        List<Mutation> mutations = new ArrayList<>(numRows);
+        for (int i = 0; i < numRows; ++i) {
+            // We pad values in hex order 0,1, ... ,F,0, ...
+            String prefix = String.format("%X", i % 16);
+            // This 21 is to have a key longer than an input
+            byte[] rowKey = Bytes.toBytes(
+                    StringUtils.leftPad("_" + String.valueOf(i), 21, prefix));
+            byte[] value = Bytes.toBytes(String.valueOf(i));
+            byte[] valueEmail = Bytes.toBytes(String.valueOf(i) + 
"@email.com");
+            mutations.add(new Put(rowKey).addColumn(COLUMN_FAMILY, 
COLUMN_NAME, value));
+            mutations.add(new Put(rowKey).addColumn(COLUMN_FAMILY, 
COLUMN_EMAIL, valueEmail));
+        }
+        return mutations;
+    }
+
+    private static ResultScanner scanTable(String tableId, Scan scan) throws 
Exception {
+        Connection connection = ConnectionFactory.createConnection(conf);
+        TableName tableName = TableName.valueOf(tableId);
+        Table table = connection.getTable(tableName);
+        return table.getScanner(scan);
+    }
+
+    private static List<Result> readTable(String tableId, Scan scan) throws 
Exception {
+        ResultScanner scanner = scanTable(tableId, scan);
+        List<Result> results = new ArrayList<>();
+        for (Result result : scanner) {
+            results.add(result);
+        }
+        scanner.close();
+        return results;
+    }
+
+    // Beam helper methods
+    /** Helper function to make a single row mutation to be written. */
+    private static KV<ByteString, Iterable<Mutation>> makeWrite(String key, 
String value) {
+        ByteString rowKey = ByteString.copyFromUtf8(key);
+        List<Mutation> mutations = new ArrayList<>();
+        mutations.add(makeMutation(key, value));
+        return KV.of(rowKey, (Iterable<Mutation>) mutations);
+    }
+
+
+    private static Mutation makeMutation(String key, String value) {
+        ByteString rowKey = ByteString.copyFromUtf8(key);
+        return new Put(rowKey.toByteArray())
+                    .addColumn(COLUMN_FAMILY, COLUMN_NAME, 
Bytes.toBytes(value))
+                    .addColumn(COLUMN_FAMILY, COLUMN_EMAIL, 
Bytes.toBytes(value + "@email.com"));
+    }
+
+    private static KV<ByteString, Iterable<Mutation>> makeBadWrite(String key) 
{
+        Put put = new Put(key.getBytes());
+        List<Mutation> mutations = new ArrayList<>();
+        mutations.add(put);
+        return KV.of(ByteString.copyFromUtf8(key), (Iterable<Mutation>) 
mutations);
+    }
+
+    private void runReadTest(HBaseIO.Read read, List<Result> expected) {
+        final String transformId = read.getTableId() + "_" + 
read.getKeyRange();
+        PCollection<Result> rows = p.apply("Read" + transformId, read);
+        PAssert.that(rows).containsInAnyOrder(expected);
+        p.run().waitUntilFinish();
+    }
+
+    private void runReadTestLength(HBaseIO.Read read, long numElements) {
+        final String transformId = read.getTableId() + "_" + 
read.getKeyRange();
+        PCollection<Result> rows = p.apply("Read" + transformId, read);
+        PAssert.thatSingleton(rows.apply("Count" + transformId,
+                Count.<Result>globally())).isEqualTo(numElements);
+        p.run().waitUntilFinish();
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 70ccf9d..b518b70 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -33,14 +33,15 @@
   (sources and sinks) to consume and produce data from systems.</description>
 
   <modules>
+    <module>elasticsearch</module>
     <module>google-cloud-platform</module>
+    <module>hbase</module>
     <module>hdfs</module>
+    <module>jdbc</module>
     <module>jms</module>
     <module>kafka</module>
     <module>kinesis</module>
     <module>mongodb</module>
-    <module>jdbc</module>
-    <module>elasticsearch</module>
     <module>mqtt</module>
   </modules>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/javadoc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml
index d39958d..3f5e8cc 100644
--- a/sdks/java/javadoc/pom.xml
+++ b/sdks/java/javadoc/pom.xml
@@ -114,6 +114,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-hbase</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-io-hdfs</artifactId>
     </dependency>
 

Reply via email to