Repository: hbase
Updated Branches:
  refs/heads/master 14217cef2 -> b29ce7f11


HBASE-15271 Spark bulk load should write to temporary location and then rename 
on success.

Signed-off-by: Ted Yu <[email protected]>
Signed-off-by: Jonathan Hsieh <[email protected]>
Signed-off-by: Sean Busbey <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b29ce7f1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b29ce7f1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b29ce7f1

Branch: refs/heads/master
Commit: b29ce7f1144e31bce9d9862d25324029def8dbad
Parents: 14217ce
Author: Ted Malaska <[email protected]>
Authored: Tue Mar 8 17:26:29 2016 -0800
Committer: Sean Busbey <[email protected]>
Committed: Tue Mar 8 17:28:38 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/spark/HBaseContext.scala       | 57 ++++++++++++--------
 1 file changed, 36 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b29ce7f1/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
index 61ed3cf..c16d45d 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.spark
 
 import java.net.InetSocketAddress
 import java.util
+import java.util.UUID
 import javax.management.openmbean.KeyAlreadyExistsException
 
 import org.apache.hadoop.hbase.fs.HFileSystem
@@ -680,7 +681,7 @@ class HBaseContext(@transient sc: SparkContext,
         //This will only roll if we have at least one column family file that 
is
         //bigger then maxSize and we have finished a given row key
         if (rollOverRequested && Bytes.compareTo(previousRow, 
keyFamilyQualifier.rowKey) != 0) {
-          rollWriters(writerMap,
+          rollWriters(fs, writerMap,
             regionSplitPartitioner,
             previousRow,
             compactionExclude)
@@ -690,7 +691,7 @@ class HBaseContext(@transient sc: SparkContext,
         previousRow = keyFamilyQualifier.rowKey
       }
       //We have finished all the data so lets close up the writers
-      rollWriters(writerMap,
+      rollWriters(fs, writerMap,
         regionSplitPartitioner,
         previousRow,
         compactionExclude)
@@ -830,7 +831,7 @@ class HBaseContext(@transient sc: SparkContext,
             //This will only roll if we have at least one column family file 
that is
             //bigger then maxSize and we have finished a given row key
             if (rollOverRequested) {
-              rollWriters(writerMap,
+              rollWriters(fs, writerMap,
                 regionSplitPartitioner,
                 previousRow,
                 compactionExclude)
@@ -844,7 +845,7 @@ class HBaseContext(@transient sc: SparkContext,
       //If there is no writer for a given column family then
       //it will get created here.
       //We have finished all the data so lets close up the writers
-      rollWriters(writerMap,
+      rollWriters(fs, writerMap,
         regionSplitPartitioner,
         previousRow,
         compactionExclude)
@@ -889,17 +890,15 @@ class HBaseContext(@transient sc: SparkContext,
       valueOf(familyOptions.dataBlockEncoding))
     val hFileContext = contextBuilder.build()
 
-    if (null == favoredNodes) {
-      new WriterLength(0, new StoreFile.WriterBuilder(conf, new 
CacheConfig(tempConf), fs)
-        
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
-        
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build())
-    } else {
-      new WriterLength(0,
-        new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new 
HFileSystem(fs))
-          
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
-          
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
-          .withFavoredNodes(favoredNodes).build())
-    }
+    //Add a '_' to the file name because this is a unfinished file.  A rename 
will happen
+    // to remove the '_' when the file is closed.
+    new WriterLength(0,
+      new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new 
HFileSystem(fs))
+        .withBloomType(BloomType.valueOf(familyOptions.bloomType))
+        
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
+        .withFilePath(new Path(familydir, "_" + 
UUID.randomUUID.toString.replaceAll("-", "")))
+        .withFavoredNodes(favoredNodes).build())
+
   }
 
   /**
@@ -1013,13 +1012,15 @@ class HBaseContext(@transient sc: SparkContext,
 
   /**
    * This will roll all Writers
+   * @param fs                     Hadoop FileSystem object
    * @param writerMap              HashMap that contains all the writers
    * @param regionSplitPartitioner The partitioner with knowledge of how the
    *                               Region's are split by row key
    * @param previousRow            The last row to fill the HFile ending range 
metadata
    * @param compactionExclude      The exclude compaction metadata flag for 
the HFile
    */
-  private def rollWriters(writerMap:mutable.HashMap[ByteArrayWrapper, 
WriterLength],
+  private def rollWriters(fs:FileSystem,
+                          writerMap:mutable.HashMap[ByteArrayWrapper, 
WriterLength],
                   regionSplitPartitioner: BulkLoadPartitioner,
                   previousRow: Array[Byte],
                   compactionExclude: Boolean): Unit = {
@@ -1027,7 +1028,7 @@ class HBaseContext(@transient sc: SparkContext,
       if (wl.writer != null) {
         logDebug("Writer=" + wl.writer.getPath +
           (if (wl.written == 0) "" else ", wrote=" + wl.written))
-        closeHFileWriter(wl.writer,
+        closeHFileWriter(fs, wl.writer,
           regionSplitPartitioner,
           previousRow,
           compactionExclude)
@@ -1039,16 +1040,18 @@ class HBaseContext(@transient sc: SparkContext,
 
   /**
    * Function to close an HFile
+   * @param fs                     Hadoop FileSystem object
    * @param w                      HFile Writer
    * @param regionSplitPartitioner The partitioner with knowledge of how the
    *                               Region's are split by row key
    * @param previousRow            The last row to fill the HFile ending range 
metadata
    * @param compactionExclude      The exclude compaction metadata flag for 
the HFile
    */
-  private def closeHFileWriter(w: StoreFile.Writer,
-            regionSplitPartitioner: BulkLoadPartitioner,
-            previousRow: Array[Byte],
-            compactionExclude: Boolean): Unit = {
+  private def closeHFileWriter(fs:FileSystem,
+                               w: StoreFile.Writer,
+                               regionSplitPartitioner: BulkLoadPartitioner,
+                               previousRow: Array[Byte],
+                               compactionExclude: Boolean): Unit = {
     if (w != null) {
       w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
         Bytes.toBytes(System.currentTimeMillis()))
@@ -1060,6 +1063,18 @@ class HBaseContext(@transient sc: SparkContext,
         Bytes.toBytes(compactionExclude))
       w.appendTrackedTimestampsToMetadata()
       w.close()
+
+      val srcPath = w.getPath
+
+      //In the new path you will see that we are using substring.  This is to
+      // remove the '_' character in front of the HFile name.  '_' is a 
character
+      // that will tell HBase that this file shouldn't be included in the bulk 
load
+      // This feature is to protect for unfinished HFiles being submitted to 
HBase
+      val newPath = new Path(w.getPath.getParent, 
w.getPath.getName.substring(1))
+      if (!fs.rename(srcPath, newPath)) {
+        throw new IOException("Unable to rename '" + srcPath +
+          "' to " + newPath)
+      }
     }
   }
 

Reply via email to