Repository: hbase Updated Branches: refs/heads/master 14217cef2 -> b29ce7f11
HBASE-15271 Spark bulk load should write to temporary location and then rename on success. Signed-off-by: Ted Yu <[email protected]> Signed-off-by: Jonathan Hsieh <[email protected]> Signed-off-by: Sean Busbey <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b29ce7f1 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b29ce7f1 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b29ce7f1 Branch: refs/heads/master Commit: b29ce7f1144e31bce9d9862d25324029def8dbad Parents: 14217ce Author: Ted Malaska <[email protected]> Authored: Tue Mar 8 17:26:29 2016 -0800 Committer: Sean Busbey <[email protected]> Committed: Tue Mar 8 17:28:38 2016 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/spark/HBaseContext.scala | 57 ++++++++++++-------- 1 file changed, 36 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/b29ce7f1/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index 61ed3cf..c16d45d 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.spark import java.net.InetSocketAddress import java.util +import java.util.UUID import javax.management.openmbean.KeyAlreadyExistsException import org.apache.hadoop.hbase.fs.HFileSystem @@ -680,7 +681,7 @@ class HBaseContext(@transient sc: SparkContext, //This will only roll if we have at least one column family file that is //bigger then maxSize and we have finished a given row key if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) { - rollWriters(writerMap, + rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude) @@ -690,7 +691,7 @@ class HBaseContext(@transient sc: SparkContext, previousRow = keyFamilyQualifier.rowKey } //We have finished all the data so lets close up the writers - rollWriters(writerMap, + rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude) @@ -830,7 +831,7 @@ class HBaseContext(@transient sc: SparkContext, //This will only roll if we have at least one column family file that is //bigger then maxSize and we have finished a given row key if (rollOverRequested) { - rollWriters(writerMap, + rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude) @@ -844,7 +845,7 @@ class HBaseContext(@transient sc: SparkContext, //If there is no writer for a given column family then //it will get created here. //We have finished all the data so lets close up the writers - rollWriters(writerMap, + rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude) @@ -889,17 +890,15 @@ class HBaseContext(@transient sc: SparkContext, valueOf(familyOptions.dataBlockEncoding)) val hFileContext = contextBuilder.build() - if (null == favoredNodes) { - new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) - .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) - .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build()) - } else { - new WriterLength(0, - new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) - .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) - .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) - .withFavoredNodes(favoredNodes).build()) - } + //Add a '_' to the file name because this is a unfinished file. A rename will happen + // to remove the '_' when the file is closed. + new WriterLength(0, + new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) + .withBloomType(BloomType.valueOf(familyOptions.bloomType)) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) + .withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", ""))) + .withFavoredNodes(favoredNodes).build()) + } /** @@ -1013,13 +1012,15 @@ class HBaseContext(@transient sc: SparkContext, /** * This will roll all Writers + * @param fs Hadoop FileSystem object * @param writerMap HashMap that contains all the writers * @param regionSplitPartitioner The partitioner with knowledge of how the * Region's are split by row key * @param previousRow The last row to fill the HFile ending range metadata * @param compactionExclude The exclude compaction metadata flag for the HFile */ - private def rollWriters(writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength], + private def rollWriters(fs:FileSystem, + writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength], regionSplitPartitioner: BulkLoadPartitioner, previousRow: Array[Byte], compactionExclude: Boolean): Unit = { @@ -1027,7 +1028,7 @@ class HBaseContext(@transient sc: SparkContext, if (wl.writer != null) { logDebug("Writer=" + wl.writer.getPath + (if (wl.written == 0) "" else ", wrote=" + wl.written)) - closeHFileWriter(wl.writer, + closeHFileWriter(fs, wl.writer, regionSplitPartitioner, previousRow, compactionExclude) @@ -1039,16 +1040,18 @@ class HBaseContext(@transient sc: SparkContext, /** * Function to close an HFile + * @param fs Hadoop FileSystem object * @param w HFile Writer * @param regionSplitPartitioner The partitioner with knowledge of how the * Region's are split by row key * @param previousRow The last row to fill the HFile ending range metadata * @param compactionExclude The exclude compaction metadata flag for the HFile */ - private def closeHFileWriter(w: StoreFile.Writer, - regionSplitPartitioner: BulkLoadPartitioner, - previousRow: Array[Byte], - compactionExclude: Boolean): Unit = { + private def closeHFileWriter(fs:FileSystem, + w: StoreFile.Writer, + regionSplitPartitioner: BulkLoadPartitioner, + previousRow: Array[Byte], + compactionExclude: Boolean): Unit = { if (w != null) { w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())) @@ -1060,6 +1063,18 @@ class HBaseContext(@transient sc: SparkContext, Bytes.toBytes(compactionExclude)) w.appendTrackedTimestampsToMetadata() w.close() + + val srcPath = w.getPath + + //In the new path you will see that we are using substring. This is to + // remove the '_' character in front of the HFile name. '_' is a character + // that will tell HBase that this file shouldn't be included in the bulk load + // This feature is to protect for unfinished HFiles being submitted to HBase + val newPath = new Path(w.getPath.getParent, w.getPath.getName.substring(1)) + if (!fs.rename(srcPath, newPath)) { + throw new IOException("Unable to rename '" + srcPath + + "' to " + newPath) + } } }
