[
https://issues.apache.org/jira/browse/HBASE-20748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Charles PORROT updated HBASE-20748:
-----------------------------------
Description:
The _bulkLoad_ methods of _class org.apache.hadoop.hbase.spark.HBaseContext_
use the system's current time for the version of the cells to bulk-load. This
makes this method, and its twin _bulkLoadThinRows_, useless if you need to use
your own versionning system:
{code:java}
//Here is where we finally iterate through the data in this partition of the
//RDD that has been sorted and partitioned
val wl = writeValueToHFile(
keyFamilyQualifier.rowKey,
keyFamilyQualifier.family,
keyFamilyQualifier.qualifier,
cellValue,
nowTimeStamp,
fs,
conn,
localTableName,
conf,
familyHFileWriteOptionsMapInternal,
hfileCompression,
writerMap,
stagingDir
){code}
Thus, I propose a third _bulkLoad_ method, based on the original method.
Instead of using an _Iterator(KeyFamilyQualifier, Array[Byte])_ as the basis
for the writes, this new method would use an _Iterator(KeyFamilyQualifier,
Array[Byte], Long_), with the _Long_ being the version.
Definition of _bulkLoad_:
{code:java}
def bulkLoad[T](
rdd:RDD[T],
tableName: TableName,
flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
stagingDir:String,
familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] =
new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude: Boolean = false,
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):{code}
Definition of a _bulkLoadWithCustomVersions_ method:
{code:java}
def bulkLoadCustomVersions[T](rdd:RDD[T],
tableName: TableName,
flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte],
Long)],
stagingDir:String,
familyHFileWriteOptionsMap:
util.Map[Array[Byte], FamilyHFileWriteOptions] =
new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude: Boolean = false,
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):{code}
In case of illogical version (for instance, a negative version), the method
would throw back to the current timestamp.
{code:java}
val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
keyFamilyQualifier.family,
keyFamilyQualifier.qualifier,
cellValue,
if (version > 0) version else nowTimeStamp,
fs,
conn,
localTableName,
conf,
familyHFileWriteOptionsMapInternal,
hfileCompression,
writerMap,
stagingDir){code}
See the attached file for the file with the full proposed method.
was:
The _bulkLoad_ methods of _class org.apache.hadoop.hbase.spark.HBaseContext_
use the system's current time for the version of the cells to bulk-load. This
makes this method, and its twin _bulkLoadThinRows_, useless if you need to use
your own versionning system:
{code:java}
//Here is where we finally iterate through the data in this partition of the
//RDD that has been sorted and partitioned
it.foreach{
case (keyFamilyQualifier, cellValue:Array[Byte]) => val wl =
writeValueToHFile(
keyFamilyQualifier.rowKey,
keyFamilyQualifier.family,
keyFamilyQualifier.qualifier,
cellValue,
nowTimeStamp,
fs,
conn,
localTableName,
conf,
familyHFileWriteOptionsMapInternal,
hfileCompression,
writerMap,
stagingDir)
...{code}
Thus, I propose a third _bulkLoad_ method, based on the original method.
Instead of using an _Iterator(KeyFamilyQualifier, Array[Byte])_ as the basis
for the writes, this new method would use an _Iterator(KeyFamilyQualifier,
Array[Byte], Long_), with the _Long_ being the version.
Definition of _bulkLoad_:
{code:java}
def bulkLoad[T](
rdd:RDD[T],
tableName: TableName,
flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
stagingDir:String,
familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] =
new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude: Boolean = false,
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):{code}
Definition of a _bulkLoadWithCustomVersions_ method:
{code:java}
def bulkLoadCustomVersions[T](rdd:RDD[T],
tableName: TableName,
flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte],
Long)],
stagingDir:String,
familyHFileWriteOptionsMap:
util.Map[Array[Byte], FamilyHFileWriteOptions] =
new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude: Boolean = false,
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):{code}
In case of illogical version (for instance, a negative version), the method
would throw back to the current timestamp.
{code:java}
val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
keyFamilyQualifier.family,
keyFamilyQualifier.qualifier,
cellValue,
if (version > 0) version else nowTimeStamp,
fs,
conn,
localTableName,
conf,
familyHFileWriteOptionsMapInternal,
hfileCompression,
writerMap,
stagingDir){code}
See the attached file for the file with the full proposed method.
> HBaseContext bulkLoad: being able to use custom versions
> --------------------------------------------------------
>
> Key: HBASE-20748
> URL: https://issues.apache.org/jira/browse/HBASE-20748
> Project: HBase
> Issue Type: Improvement
> Components: spark
> Reporter: Charles PORROT
> Priority: Major
> Labels: HBaseContext, bulkload, spark, versions
> Attachments: bulkLoadCustomVersions.scala
>
>
> The _bulkLoad_ methods of _class org.apache.hadoop.hbase.spark.HBaseContext_
> use the system's current time for the version of the cells to bulk-load. This
> makes this method, and its twin _bulkLoadThinRows_, useless if you need to
> use your own versionning system:
> {code:java}
> //Here is where we finally iterate through the data in this partition of the
> //RDD that has been sorted and partitioned
> val wl = writeValueToHFile(
> keyFamilyQualifier.rowKey,
> keyFamilyQualifier.family,
> keyFamilyQualifier.qualifier,
> cellValue,
> nowTimeStamp,
> fs,
> conn,
> localTableName,
> conf,
> familyHFileWriteOptionsMapInternal,
> hfileCompression,
> writerMap,
> stagingDir
> ){code}
>
> Thus, I propose a third _bulkLoad_ method, based on the original method.
> Instead of using an _Iterator(KeyFamilyQualifier, Array[Byte])_ as the basis
> for the writes, this new method would use an _Iterator(KeyFamilyQualifier,
> Array[Byte], Long_), with the _Long_ being the version.
>
> Definition of _bulkLoad_:
> {code:java}
> def bulkLoad[T](
> rdd:RDD[T],
> tableName: TableName,
> flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
> stagingDir:String,
> familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] =
> new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
> compactionExclude: Boolean = false,
> maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):{code}
> Definition of a _bulkLoadWithCustomVersions_ method:
> {code:java}
> def bulkLoadCustomVersions[T](rdd:RDD[T],
> tableName: TableName,
> flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte],
> Long)],
> stagingDir:String,
> familyHFileWriteOptionsMap:
> util.Map[Array[Byte], FamilyHFileWriteOptions] =
> new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
> compactionExclude: Boolean = false,
> maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):{code}
> In case of illogical version (for instance, a negative version), the method
> would throw back to the current timestamp.
> {code:java}
> val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
> keyFamilyQualifier.family,
> keyFamilyQualifier.qualifier,
> cellValue,
> if (version > 0) version else nowTimeStamp,
> fs,
> conn,
> localTableName,
> conf,
> familyHFileWriteOptionsMapInternal,
> hfileCompression,
> writerMap,
> stagingDir){code}
> See the attached file for the file with the full proposed method.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)