[ https://issues.apache.org/jira/browse/HBASE-20748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515834#comment-16515834 ]
Sean Busbey commented on HBASE-20748: ------------------------------------- Hi! Please upload your contributions using format-patch, e.g. {{git format-patch --stdout origin/master > /some/path/to/HBASE-20748.2.patch}} (where the "2" is an incrementing count for the version of the patch). Alternatively if you find a reviewer who doesn't mind using GitHub linking a PR to this issue will work. Please put the issue in "Patch Available" state once you have either a PR link or a patch uploaded. > HBaseContext bulkLoad: being able to use custom versions > -------------------------------------------------------- > > Key: HBASE-20748 > URL: https://issues.apache.org/jira/browse/HBASE-20748 > Project: HBase > Issue Type: Improvement > Components: spark > Reporter: Charles PORROT > Assignee: Charles PORROT > Priority: Major > Labels: HBaseContext, bulkload, spark, versions > Attachments: bulkLoadCustomVersions.scala > > > The _bulkLoad_ methods of _class org.apache.hadoop.hbase.spark.HBaseContext_ > use the system's current time for the version of the cells to bulk-load. This > makes this method, and its twin _bulkLoadThinRows_, useless if you need to > use your own versionning system: > {code:java} > //Here is where we finally iterate through the data in this partition of the > //RDD that has been sorted and partitioned > val wl = writeValueToHFile( > keyFamilyQualifier.rowKey, > keyFamilyQualifier.family, > keyFamilyQualifier.qualifier, > cellValue, > nowTimeStamp, > fs, > conn, > localTableName, > conf, > familyHFileWriteOptionsMapInternal, > hfileCompression, > writerMap, > stagingDir > ){code} > > Thus, I propose a third _bulkLoad_ method, based on the original method. > Instead of using an _Iterator(KeyFamilyQualifier, Array[Byte])_ as the basis > for the writes, this new method would use an _Iterator(KeyFamilyQualifier, > Array[Byte], Long_), with the _Long_ being the version. > > Definition of _bulkLoad_: > {code:java} > def bulkLoad[T]( > rdd:RDD[T], > tableName: TableName, > flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], > stagingDir:String, > familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] = > new util.HashMap[Array[Byte], FamilyHFileWriteOptions], > compactionExclude: Boolean = false, > maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):{code} > Definition of a _bulkLoadWithCustomVersions_ method: > {code:java} > def bulkLoadCustomVersions[T](rdd:RDD[T], > tableName: TableName, > flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte], > Long)], > stagingDir:String, > familyHFileWriteOptionsMap: > util.Map[Array[Byte], FamilyHFileWriteOptions] = > new util.HashMap[Array[Byte], FamilyHFileWriteOptions], > compactionExclude: Boolean = false, > maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):{code} > In case of illogical version (for instance, a negative version), the method > would throw back to the current timestamp. > {code:java} > val wl = writeValueToHFile(keyFamilyQualifier.rowKey, > keyFamilyQualifier.family, > keyFamilyQualifier.qualifier, > cellValue, > if (version > 0) version else nowTimeStamp, > fs, > conn, > localTableName, > conf, > familyHFileWriteOptionsMapInternal, > hfileCompression, > writerMap, > stagingDir){code} > See the attached file for the file with the full proposed method. > > +Edit:+ > The same could be done with bulkLoadThinRows: instead of a: > {code:java} > Iterator[Pair[ByteArrayWrapper, FamiliesQualifiersValues]]{code} > We expect an: > {code:java} > Iterator[Triple[ByteArrayWrapper, FamiliesQualifiersValues, Long]]{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)