Repository: hbase
Updated Branches:
  refs/heads/master 5a7c8dcb6 -> dbdfd8e8d


HBASE-15572 Adding optional timestamp semantics to HBase-Spark

4 parameters, "timestamp", "minTimestamp", "maxiTimestamp" and
"maxVersions" are added to HBaseSparkConf. Users can select a
timestamp, they can also select a time range with minimum timestamp and
maximum timestamp.

Signed-off-by: Sean Busbey <[email protected]>
Signed-off-by: Ted Yu <[email protected]>
Signed-off-by: Jerry He <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/58177c10
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/58177c10
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/58177c10

Branch: refs/heads/master
Commit: 58177c103fe60a0519ea107270875d82bc58645d
Parents: 5a7c8dc
Author: Weiqing Yang <[email protected]>
Authored: Wed Apr 13 22:35:27 2016 -0500
Committer: Sean Busbey <[email protected]>
Committed: Wed Apr 13 22:39:14 2016 -0500

----------------------------------------------------------------------
 .../hadoop/hbase/spark/DefaultSource.scala      |   7 +-
 .../spark/datasources/HBaseSparkConf.scala      |   5 +
 .../spark/datasources/HBaseTableScanRDD.scala   |  26 +++++
 .../hadoop/hbase/spark/DefaultSourceSuite.scala | 106 ++++++++++++++++---
 src/main/asciidoc/_chapters/spark.adoc          |  35 ++++++
 5 files changed, 165 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/58177c10/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index 7970816..1697036 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -88,6 +88,11 @@ case class HBaseRelation (
     userSpecifiedSchema: Option[StructType]
   )(@transient val sqlContext: SQLContext)
   extends BaseRelation with PrunedFilteredScan  with InsertableRelation  with 
Logging {
+  val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong)
+  val minTimestamp = parameters.get(HBaseSparkConf.MIN_TIMESTAMP).map(_.toLong)
+  val maxTimestamp = parameters.get(HBaseSparkConf.MAX_TIMESTAMP).map(_.toLong)
+  val maxVersions = parameters.get(HBaseSparkConf.MAX_VERSIONS).map(_.toInt)
+
   val catalog = HBaseTableCatalog(parameters)
   def tableName = catalog.name
   val configResources = 
parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_RESOURCES_LOCATIONS, "")
@@ -204,7 +209,7 @@ case class HBaseRelation (
         System.arraycopy(x, 0, rBytes, offset, x.length)
         offset += x.length
       }
-      val put = new Put(rBytes)
+      val put = timestamp.fold(new Put(rBytes))(new Put(rBytes, _))
 
       colsIdxedFields.foreach { case (x, y) =>
         val b = Utils.toBytes(row(x), y)

http://git-wip-us.apache.org/repos/asf/hbase/blob/58177c10/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
index ca44d42..be2af30 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
@@ -36,4 +36,9 @@ object HBaseSparkConf{
   val USE_HBASE_CONTEXT = "hbase.use.hbase.context"
   val PUSH_DOWN_COLUMN_FILTER = "hbase.pushdown.column.filter"
   val defaultPushDownColumnFilter = true
+
+  val TIMESTAMP = "hbase.spark.query.timestamp"
+  val MIN_TIMESTAMP = "hbase.spark.query.minTimestamp"
+  val MAX_TIMESTAMP = "hbase.spark.query.maxTimestamp"
+  val MAX_VERSIONS = "hbase.spark.query.maxVersions"
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58177c10/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
index 2e05651..5b45ef9 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
@@ -105,6 +105,7 @@ class HBaseTableScanRDD(relation: HBaseRelation,
       val gets = new ArrayList[Get]()
       x.foreach{ y =>
         val g = new Get(y)
+        handleTimeSemantics(g)
         columns.foreach { d =>
           if (!d.isRowKey) {
             g.addColumn(d.cfBytes, d.colBytes)
@@ -157,6 +158,7 @@ class HBaseTableScanRDD(relation: HBaseRelation,
       case (Some(Bound(a, b)), None) => new Scan(a)
       case (None, None) => new Scan()
     }
+    handleTimeSemantics(scan)
 
     columns.foreach { d =>
       if (!d.isRowKey) {
@@ -226,6 +228,30 @@ class HBaseTableScanRDD(relation: HBaseRelation,
     } ++ gIt
     rIts
   }
+
+  private def handleTimeSemantics(query: Query): Unit = {
+    // Set timestamp related values if present
+    (query, relation.timestamp, relation.minTimestamp, relation.maxTimestamp)  
match {
+      case (q: Scan, Some(ts), None, None) => q.setTimeStamp(ts)
+      case (q: Get, Some(ts), None, None) => q.setTimeStamp(ts)
+
+      case (q:Scan, None, Some(minStamp), Some(maxStamp)) => 
q.setTimeRange(minStamp, maxStamp)
+      case (q:Get, None, Some(minStamp), Some(maxStamp)) => 
q.setTimeRange(minStamp, maxStamp)
+
+      case (q, None, None, None) =>
+
+      case _ => throw new IllegalArgumentException(s"Invalid combination of 
query/timestamp/time range provided. " +
+        s"timeStamp is: ${relation.timestamp.get}, minTimeStamp is: 
${relation.minTimestamp.get}, " +
+        s"maxTimeStamp is: ${relation.maxTimestamp.get}")
+    }
+    if (relation.maxVersions.isDefined) {
+      query match {
+        case q: Scan => q.setMaxVersions(relation.maxVersions.get)
+        case q: Get => q.setMaxVersions(relation.maxVersions.get)
+        case _ => throw new IllegalArgumentException("Invalid query provided 
with maxVersions")
+      }
+    }
+  }
 }
 
 case class SerializedFilter(b: Option[Array[Byte]])

http://git-wip-us.apache.org/repos/asf/hbase/blob/58177c10/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
 
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index 500967d..4312b38 100644
--- 
a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ 
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -19,32 +19,39 @@ package org.apache.hadoop.hbase.spark
 
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericData
-import org.apache.hadoop.hbase.client.{Put, ConnectionFactory}
+import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
 import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.{TableName, HBaseTestingUtility}
+import org.apache.hadoop.hbase.{HBaseTestingUtility, TableName}
 import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 
 case class HBaseRecord(
-    col0: String,
-    col1: String,
-    col2: Double,
-    col3: Float,
-    col4: Int,
-    col5: Long)
+  col0: String,
+  col1: Boolean,
+  col2: Double,
+  col3: Float,
+  col4: Int,
+  col5: Long,
+  col6: Short,
+  col7: String,
+  col8: Byte)
 
 object HBaseRecord {
   def apply(i: Int, t: String): HBaseRecord = {
     val s = s"""row${"%03d".format(i)}"""
     HBaseRecord(s,
-      s,
+      i % 2 == 0,
       i.toDouble,
       i.toFloat,
       i,
-      i.toLong)
+      i.toLong,
+      i.toShort,
+      s"String$i: $t",
+      i.toByte)
   }
 }
 
@@ -815,11 +822,14 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
                     |"rowkey":"key",
                     |"columns":{
                     |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
-                    |"col1":{"cf":"cf1", "col":"col1", "type":"string"},
+                    |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
                     |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
                     |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
                     |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
-                    |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}}
+                    |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
+                    |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
+                    |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
+                    |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
                     |}
                     |}""".stripMargin
 
@@ -866,6 +876,76 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     assert(s.count() == 6)
   }
 
+  test("Timestamp semantics") {
+    val sql = sqlContext
+    import sql.implicits._
+
+    // There's already some data in here from recently. Let's throw something 
in
+    // from 1993 which we can include/exclude and add some data with the 
implicit (now) timestamp.
+    // Then we should be able to cross-section it and only get points in 
between, get the most recent view
+    // and get an old view.
+    val oldMs = 754869600000L
+    val startMs = System.currentTimeMillis()
+    val oldData = (0 to 100).map { i =>
+      HBaseRecord(i, "old")
+    }
+    val newData = (200 to 255).map { i =>
+      HBaseRecord(i, "new")
+    }
+
+    sc.parallelize(oldData).toDF.write.options(
+      Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseTableCatalog.tableName -> "5",
+        HBaseSparkConf.TIMESTAMP -> oldMs.toString))
+      .format("org.apache.hadoop.hbase.spark")
+      .save()
+    sc.parallelize(newData).toDF.write.options(
+      Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseTableCatalog.tableName -> "5"))
+      .format("org.apache.hadoop.hbase.spark")
+      .save()
+
+    // Test specific timestamp -- Full scan, Timestamp
+    val individualTimestamp = sqlContext.read
+      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseSparkConf.TIMESTAMP -> oldMs.toString))
+      .format("org.apache.hadoop.hbase.spark")
+      .load()
+    assert(individualTimestamp.count() == 101)
+
+    // Test getting everything -- Full Scan, No range
+    val everything = sqlContext.read
+      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog))
+      .format("org.apache.hadoop.hbase.spark")
+      .load()
+    assert(everything.count() == 256)
+    // Test getting everything -- Pruned Scan, TimeRange
+    val element50 = everything.where(col("col0") === 
lit("row050")).select("col7").collect()(0)(0)
+    assert(element50 == "String50: extra")
+    val element200 = everything.where(col("col0") === 
lit("row200")).select("col7").collect()(0)(0)
+    assert(element200 == "String200: new")
+
+    // Test Getting old stuff -- Full Scan, TimeRange
+    val oldRange = sqlContext.read
+      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseSparkConf.MIN_TIMESTAMP -> "0",
+        HBaseSparkConf.MAX_TIMESTAMP -> (oldMs + 100).toString))
+      .format("org.apache.hadoop.hbase.spark")
+      .load()
+    assert(oldRange.count() == 101)
+    // Test Getting old stuff -- Pruned Scan, TimeRange
+    val oldElement50 = oldRange.where(col("col0") === 
lit("row050")).select("col7").collect()(0)(0)
+    assert(oldElement50 == "String50: old")
+
+    // Test Getting middle stuff -- Full Scan, TimeRange
+    val middleRange = sqlContext.read
+      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseSparkConf.MIN_TIMESTAMP -> "0",
+        HBaseSparkConf.MAX_TIMESTAMP -> (startMs + 100).toString))
+      .format("org.apache.hadoop.hbase.spark")
+      .load()
+    assert(middleRange.count() == 256)
+    // Test Getting middle stuff -- Pruned Scan, TimeRange
+    val middleElement200 = middleRange.where(col("col0") === 
lit("row200")).select("col7").collect()(0)(0)
+    assert(middleElement200 == "String200: extra")
+  }
+
+
   // catalog for insertion
   def avroWriteCatalog = s"""{
                              |"table":{"namespace":"default", 
"name":"avrotable"},

http://git-wip-us.apache.org/repos/asf/hbase/blob/58177c10/src/main/asciidoc/_chapters/spark.adoc
----------------------------------------------------------------------
diff --git a/src/main/asciidoc/_chapters/spark.adoc 
b/src/main/asciidoc/_chapters/spark.adoc
index b1bdb5d..22ed468 100644
--- a/src/main/asciidoc/_chapters/spark.adoc
+++ b/src/main/asciidoc/_chapters/spark.adoc
@@ -395,6 +395,41 @@ The HBase-Spark module includes support for Spark SQL and 
DataFrames, which allo
 you to write SparkSQL directly on HBase tables. In addition the HBase-Spark
 will push down query filtering logic to HBase.
 
+In HBaseSparkConf, four parameters related to timestamp can be set. They are 
TIMESTAMP,
+MIN_TIMESTAMP, MAX_TIMESTAMP and MAX_VERSIONS respectively. Users can query 
records
+with different timestamps or time ranges with MIN_TIMESTAMP and MAX_TIMESTAMP.
+In the meantime, use concrete value instead of tsSpecified and oldMs in the 
examples below.
+
+.Query with different timestamps
+====
+
+The example below shows how to load df DataFrame with different timestamps.
+tsSpecified is specified by the user.
+HBaseTableCatalog defines the HBase and Relation relation schema.
+writeCatalog defines catalog for the schema mapping.
+----
+val df = sqlContext.read
+      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseSparkConf.TIMESTAMP -> tsSpecified.toString))
+      .format("org.apache.hadoop.hbase.spark")
+      .load()
+----
+
+The example below shows how to load df DataFrame with different time ranges.
+oldMs is specified by the user.
+----
+val df = sqlContext.read
+      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseSparkConf.MIN_TIMESTAMP -> "0",
+        HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString))
+      .format("org.apache.hadoop.hbase.spark")
+      .load()
+----
+
+After loading df DataFrame, users can query data.
+----
+    df.registerTempTable("table")
+    sqlContext.sql("select count(col1) from table").show
+----
+
 === Predicate Push Down
 
 There are two examples of predicate push down in the HBase-Spark 
implementation.

Reply via email to