HBASE-15036 Update HBase Spark documentation to include bulk load with thin 
records (Ted Malaska)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cb539917
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cb539917
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cb539917

Branch: refs/heads/hbase-12439
Commit: cb539917a2cb307060b0ea4330a52cd40fe7e150
Parents: 4ca27a6
Author: Jonathan M Hsieh <[email protected]>
Authored: Tue Jan 26 11:02:48 2016 -0800
Committer: Jonathan M Hsieh <[email protected]>
Committed: Tue Jan 26 11:02:48 2016 -0800

----------------------------------------------------------------------
 src/main/asciidoc/_chapters/spark.adoc | 89 +++++++++++++++++++++++++----
 1 file changed, 78 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cb539917/src/main/asciidoc/_chapters/spark.adoc
----------------------------------------------------------------------
diff --git a/src/main/asciidoc/_chapters/spark.adoc 
b/src/main/asciidoc/_chapters/spark.adoc
index 37503e9..b1bdb5d 100644
--- a/src/main/asciidoc/_chapters/spark.adoc
+++ b/src/main/asciidoc/_chapters/spark.adoc
@@ -210,19 +210,25 @@ to the HBase Connections in the executors
 
 == Bulk Load
 
-Spark bulk load follows very closely to the MapReduce implementation of bulk
-load. In short, a partitioner partitions based on region splits and
+There are two options for bulk loading data into HBase with Spark.  There is 
the
+basic bulk load functionality that will work for cases where your rows have
+millions of columns and cases where your columns are not consolidated and
+partitions before the on the map side of the Spark bulk load process.
+
+There is also a thin record bulk load option with Spark, this second option is
+designed for tables that have less then 10k columns per row.  The advantage
+of this second option is higher throughput and less over all load on the Spark
+shuffle operation.
+
+Both implementations work more or less like the MapReduce bulk load process in
+that a partitioner partitions the rowkeys based on region splits and
 the row keys are sent to the reducers in order, so that HFiles can be written
-out. In Spark terms, the bulk load will be focused around a
-`repartitionAndSortWithinPartitions` followed by a `foreachPartition`.
+out directly from the reduce phase.
 
-The only major difference with the Spark implementation compared to the
-MapReduce implementation is that the column qualifier is included in the 
shuffle
-ordering process. This was done because the MapReduce bulk load implementation
-would have memory issues with loading rows with a large numbers of columns, as 
a
-result of the sorting of those columns being done in the memory of the reducer 
JVM.
-Instead, that ordering is done in the Spark Shuffle, so there should no longer
-be a limit to the number of columns in a row for bulk loading.
+In Spark terms, the bulk load will be implemented around a the Spark
+`repartitionAndSortWithinPartitions` followed by a Spark `foreachPartition`.
+
+First lets look at an example of using the basic bulk load functionality
 
 .Bulk Loading Example
 ====
@@ -237,6 +243,11 @@ val config = new HBaseConfiguration()
 val hbaseContext = new HBaseContext(sc, config)
 
 val stagingFolder = ...
+val rdd = sc.parallelize(Array(
+      (Bytes.toBytes("1"),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo1"))),
+      (Bytes.toBytes("3"),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("foo2.b"))), ...
 
 rdd.hbaseBulkLoad(TableName.valueOf(tableName),
   t => {
@@ -290,6 +301,11 @@ val config = new HBaseConfiguration()
 val hbaseContext = new HBaseContext(sc, config)
 
 val stagingFolder = ...
+val rdd = sc.parallelize(Array(
+      (Bytes.toBytes("1"),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo1"))),
+      (Bytes.toBytes("3"),
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("foo2.b"))), ...
 
 val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], 
FamilyHFileWriteOptions]
 val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")
@@ -318,6 +334,57 @@ load.doBulkLoad(new Path(stagingFolder.getPath),
 ----
 ====
 
+Now lets look at how you would call the thin record bulk load implementation
+
+.Using thin record bulk load
+====
+
+[source, scala]
+----
+val sc = new SparkContext("local", "test")
+val config = new HBaseConfiguration()
+
+val hbaseContext = new HBaseContext(sc, config)
+
+val stagingFolder = ...
+val rdd = sc.parallelize(Array(
+      ("1",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), 
Bytes.toBytes("foo1"))),
+      ("3",
+        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), 
Bytes.toBytes("foo2.b"))), ...
+
+rdd.hbaseBulkLoadThinRows(hbaseContext,
+      TableName.valueOf(tableName),
+      t => {
+        val rowKey = t._1
+
+        val familyQualifiersValues = new FamiliesQualifiersValues
+        t._2.foreach(f => {
+          val family:Array[Byte] = f._1
+          val qualifier = f._2
+          val value:Array[Byte] = f._3
+
+          familyQualifiersValues +=(family, qualifier, value)
+        })
+        (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
+      },
+      stagingFolder.getPath,
+      new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
+      compactionExclude = false,
+      20)
+
+val load = new LoadIncrementalHFiles(config)
+load.doBulkLoad(new Path(stagingFolder.getPath),
+  conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
+----
+====
+
+Note that the big difference in using bulk load for thin rows is the function
+returns a tuple with the first value being the row key and the second value
+being an object of FamiliesQualifiersValues, which will contain all the
+values for this row for all column families.
+
+
 == SparkSQL/DataFrames
 
 http://spark.apache.org/sql/[SparkSQL] is a subproject of Spark that supports

Reply via email to