[ 
https://issues.apache.org/jira/browse/HBASE-20748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Charles PORROT updated HBASE-20748:
-----------------------------------
    Description: 
The _bulkLoad_ methods of _class org.apache.hadoop.hbase.spark.HBaseContext_ 
use the system's current time for the version of the cells to bulk-load. This 
makes this method, and its twin _bulkLoadThinRows_, useless if you need to use 
your own versionning system:
{code:java}
//Here is where we finally iterate through the data in this partition of the 
//RDD that has been sorted and partitioned 
it.foreach{ 
    case (keyFamilyQualifier, cellValue:Array[Byte]) => val wl = 
writeValueToHFile(
keyFamilyQualifier.rowKey, 
keyFamilyQualifier.family, 
keyFamilyQualifier.qualifier, 
cellValue, 
nowTimeStamp, 
fs, 
conn, 
localTableName, 
conf, 
familyHFileWriteOptionsMapInternal, 
hfileCompression, 
writerMap, 
stagingDir)
...{code}
 

Thus, I propose a third _bulkLoad_ method, based on the original method. 
Instead of using an _Iterator(KeyFamilyQualifier, Array[Byte])_ as the basis 
for the writes, this new method would use an _Iterator(KeyFamilyQualifier, 
Array[Byte], Long_), with the _Long_ being the version.

 

Definition of _bulkLoad_:
{code:java}
def bulkLoad[T](
rdd:RDD[T], 
tableName: TableName, 
flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], 
stagingDir:String, 
familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] = 
new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude: Boolean = false, 
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):{code}
Definition of a _bulkLoadWithCustomVersions_ method:
{code:java}
def bulkLoadCustomVersions[T](rdd:RDD[T],
                  tableName: TableName,
                  flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte], 
Long)],
                  stagingDir:String,
                  familyHFileWriteOptionsMap:
                  util.Map[Array[Byte], FamilyHFileWriteOptions] =
                  new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
                  compactionExclude: Boolean = false,
                  maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):{code}
In case of illogical version (for instance, a negative version), the method 
would throw back to the current timestamp.
{code:java}
val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
              keyFamilyQualifier.family,
              keyFamilyQualifier.qualifier,
              cellValue,
              if (version > 0) version else nowTimeStamp,
              fs,
              conn,
              localTableName,
              conf,
              familyHFileWriteOptionsMapInternal,
              hfileCompression,
              writerMap,
              stagingDir){code}
See the attached file for the file with the full proposed method.

  was:
The _bulkLoad_ methods of _class org.apache.hadoop.hbase.spark.HBaseContext_ 
use the system's current time for the version of the cells to bulk-load. This 
makes this method, and its twin _bulkLoadThinRows_, useless if you need to use 
your own versionning system:
{code:java}
//Here is where we finally iterate through the data in this partition of the 
//RDD that has been sorted and partitioned 
it.foreach{ 
    case (keyFamilyQualifier, cellValue:Array[Byte]) => val wl = 
writeValueToHFile(
keyFamilyQualifier.rowKey, 
keyFamilyQualifier.family, 
keyFamilyQualifier.qualifier, 
cellValue, 
nowTimeStamp, 
fs, 
conn, 
localTableName, 
conf, 
familyHFileWriteOptionsMapInternal, 
hfileCompression, 
writerMap, 
stagingDir)
...{code}
 

Thus, I propose a third _bulkLoad_ method, based on the original method. 
Instead of using an _Iterator(KeyFamilyQualifier, Array[Byte])_ as the basis 
for the writes, this new method would use an _Iterator(KeyFamilyQualifier, 
Array[Byte], Long_), with the _Long_ being the version.

 

Definition of _bulkLoad_:
{code:java}
def bulkLoad[T](
rdd:RDD[T], 
tableName: TableName, 
flatMap: (T) => Iterator[(KeyFamilyQualifier, 
Array[Byte])], 
stagingDir:String, 
familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] = 
new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude: Boolean = false, 
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):{code}
Definition of a _bulkLoadWithCustomVersions_ method:
{code:java}
def bulkLoadCustomVersions[T](rdd:RDD[T],
                  tableName: TableName,
                  flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte], 
Long)],
                  stagingDir:String,
                  familyHFileWriteOptionsMap:
                  util.Map[Array[Byte], FamilyHFileWriteOptions] =
                  new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
                  compactionExclude: Boolean = false,
                  maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):{code}
In case of illogical version (for instance, a negative version), the method 
would throw back to the current timestamp.
{code:java}
val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
              keyFamilyQualifier.family,
              keyFamilyQualifier.qualifier,
              cellValue,
              if (version > 0) version else nowTimeStamp,
              fs,
              conn,
              localTableName,
              conf,
              familyHFileWriteOptionsMapInternal,
              hfileCompression,
              writerMap,
              stagingDir){code}
See the attached file for the file with the full proposed method.


> HBaseContext bulkLoad: being able to use custom versions
> --------------------------------------------------------
>
>                 Key: HBASE-20748
>                 URL: https://issues.apache.org/jira/browse/HBASE-20748
>             Project: HBase
>          Issue Type: Improvement
>          Components: spark
>            Reporter: Charles PORROT
>            Priority: Major
>              Labels: HBaseContext, bulkload, spark, versions
>         Attachments: bulkLoadCustomVersions.scala
>
>
> The _bulkLoad_ methods of _class org.apache.hadoop.hbase.spark.HBaseContext_ 
> use the system's current time for the version of the cells to bulk-load. This 
> makes this method, and its twin _bulkLoadThinRows_, useless if you need to 
> use your own versionning system:
> {code:java}
> //Here is where we finally iterate through the data in this partition of the 
> //RDD that has been sorted and partitioned 
> it.foreach{ 
>     case (keyFamilyQualifier, cellValue:Array[Byte]) => val wl = 
> writeValueToHFile(
> keyFamilyQualifier.rowKey, 
> keyFamilyQualifier.family, 
> keyFamilyQualifier.qualifier, 
> cellValue, 
> nowTimeStamp, 
> fs, 
> conn, 
> localTableName, 
> conf, 
> familyHFileWriteOptionsMapInternal, 
> hfileCompression, 
> writerMap, 
> stagingDir)
> ...{code}
>  
> Thus, I propose a third _bulkLoad_ method, based on the original method. 
> Instead of using an _Iterator(KeyFamilyQualifier, Array[Byte])_ as the basis 
> for the writes, this new method would use an _Iterator(KeyFamilyQualifier, 
> Array[Byte], Long_), with the _Long_ being the version.
>  
> Definition of _bulkLoad_:
> {code:java}
> def bulkLoad[T](
> rdd:RDD[T], 
> tableName: TableName, 
> flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], 
> stagingDir:String, 
> familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] = 
> new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
> compactionExclude: Boolean = false, 
> maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):{code}
> Definition of a _bulkLoadWithCustomVersions_ method:
> {code:java}
> def bulkLoadCustomVersions[T](rdd:RDD[T],
>                   tableName: TableName,
>                   flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte], 
> Long)],
>                   stagingDir:String,
>                   familyHFileWriteOptionsMap:
>                   util.Map[Array[Byte], FamilyHFileWriteOptions] =
>                   new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
>                   compactionExclude: Boolean = false,
>                   maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):{code}
> In case of illogical version (for instance, a negative version), the method 
> would throw back to the current timestamp.
> {code:java}
> val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
>               keyFamilyQualifier.family,
>               keyFamilyQualifier.qualifier,
>               cellValue,
>               if (version > 0) version else nowTimeStamp,
>               fs,
>               conn,
>               localTableName,
>               conf,
>               familyHFileWriteOptionsMapInternal,
>               hfileCompression,
>               writerMap,
>               stagingDir){code}
> See the attached file for the file with the full proposed method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to