This is an automated email from the ASF dual-hosted git repository.

meszibalu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new d631afb  HBASE-22711 Spark connector doesn't use the given mapping 
when inserting data
d631afb is described below

commit d631afb8f656050c4d58f61afba85f5fa2662304
Author: Balazs Meszaros <[email protected]>
AuthorDate: Mon Jul 22 16:32:10 2019 +0200

    HBASE-22711 Spark connector doesn't use the given mapping when inserting 
data
    
    * insert always uses the given mapping
    
    * supports the following types:
      * binary
      * boolean
      * byte, short, int, long
      * float, double
      * date, timestamp
      * string
    
    Signed-off-by: Peter Somogyi <[email protected]>
---
 .../apache/hadoop/hbase/spark/DefaultSource.scala  | 99 +---------------------
 .../hadoop/hbase/spark/datasources/Utils.scala     | 50 +++++------
 .../hadoop/hbase/spark/DefaultSourceSuite.scala    | 92 +++++++++++++++++++-
 .../hadoop/hbase/spark/HBaseCatalogSuite.scala     |  8 +-
 4 files changed, 119 insertions(+), 130 deletions(-)

diff --git 
a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
 
b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index d9d5a66..70f5cab 100644
--- 
a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ 
b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -440,11 +440,9 @@ case class HBaseRelation (
         if (field != null) {
           if (field.isRowKey) {
             parentRowKeyFilter.mergeIntersect(new RowKeyFilter(
-              DefaultSourceStaticUtils.getByteValue(field,
-                value.toString), null))
+                Utils.toBytes(value, field), null))
           }
-          val byteValue =
-            DefaultSourceStaticUtils.getByteValue(field, value.toString)
+          val byteValue = Utils.toBytes(value, field)
           valueArray += byteValue
         }
         new EqualLogicExpression(attr, valueArray.length - 1, false)
@@ -929,12 +927,6 @@ class ColumnFilterCollection {
 @InterfaceAudience.Private
 object DefaultSourceStaticUtils {
 
-  val rawInteger = new RawInteger
-  val rawLong = new RawLong
-  val rawFloat = new RawFloat
-  val rawDouble = new RawDouble
-  val rawString = RawString.ASCENDING
-
   val byteRange = new ThreadLocal[PositionedByteRange] {
     override def initialValue(): PositionedByteRange = {
       val range = new SimplePositionedMutableByteRange()
@@ -973,93 +965,6 @@ object DefaultSourceStaticUtils {
       lastFiveExecutionRules.poll()
     }
   }
-
-  /**
-   * This method will convert the result content from HBase into the
-   * SQL value type that is requested by the Spark SQL schema definition
-   *
-   * @param field              The structure of the SparkSQL Column
-   * @param r                       The result object from HBase
-   * @return                        The converted object type
-   */
-  def getValue(field: Field,
-      r: Result): Any = {
-    if (field.isRowKey) {
-      val row = r.getRow
-
-      field.dt match {
-        case IntegerType => rawInteger.decode(getFreshByteRange(row))
-        case LongType => rawLong.decode(getFreshByteRange(row))
-        case FloatType => rawFloat.decode(getFreshByteRange(row))
-        case DoubleType => rawDouble.decode(getFreshByteRange(row))
-        case StringType => rawString.decode(getFreshByteRange(row))
-        case TimestampType => rawLong.decode(getFreshByteRange(row))
-        case _ => Bytes.toString(row)
-      }
-    } else {
-      val cellByteValue =
-        r.getColumnLatestCell(field.cfBytes, field.colBytes)
-      if (cellByteValue == null) null
-      else field.dt match {
-        case IntegerType => 
rawInteger.decode(getFreshByteRange(cellByteValue.getValueArray,
-          cellByteValue.getValueOffset, cellByteValue.getValueLength))
-        case LongType => 
rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
-          cellByteValue.getValueOffset, cellByteValue.getValueLength))
-        case FloatType => 
rawFloat.decode(getFreshByteRange(cellByteValue.getValueArray,
-          cellByteValue.getValueOffset, cellByteValue.getValueLength))
-        case DoubleType => 
rawDouble.decode(getFreshByteRange(cellByteValue.getValueArray,
-          cellByteValue.getValueOffset, cellByteValue.getValueLength))
-        case StringType => Bytes.toString(cellByteValue.getValueArray,
-          cellByteValue.getValueOffset, cellByteValue.getValueLength)
-        case TimestampType => 
rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
-          cellByteValue.getValueOffset, cellByteValue.getValueLength))
-        case _ => Bytes.toString(cellByteValue.getValueArray,
-          cellByteValue.getValueOffset, cellByteValue.getValueLength)
-      }
-    }
-  }
-
-  /**
-   * This will convert the value from SparkSQL to be stored into HBase using 
the
-   * right byte Type
-   *
-   * @param value                   String value from SparkSQL
-   * @return                        Returns the byte array to go into HBase
-   */
-  def getByteValue(field: Field,
-      value: String): Array[Byte] = {
-    field.dt match {
-      case IntegerType =>
-        val result = new Array[Byte](Bytes.SIZEOF_INT)
-        val localDataRange = getFreshByteRange(result)
-        rawInteger.encode(localDataRange, value.toInt)
-        localDataRange.getBytes
-      case LongType =>
-        val result = new Array[Byte](Bytes.SIZEOF_LONG)
-        val localDataRange = getFreshByteRange(result)
-        rawLong.encode(localDataRange, value.toLong)
-        localDataRange.getBytes
-      case FloatType =>
-        val result = new Array[Byte](Bytes.SIZEOF_FLOAT)
-        val localDataRange = getFreshByteRange(result)
-        rawFloat.encode(localDataRange, value.toFloat)
-        localDataRange.getBytes
-      case DoubleType =>
-        val result = new Array[Byte](Bytes.SIZEOF_DOUBLE)
-        val localDataRange = getFreshByteRange(result)
-        rawDouble.encode(localDataRange, value.toDouble)
-        localDataRange.getBytes
-      case StringType =>
-        Bytes.toBytes(value)
-      case TimestampType =>
-        val result = new Array[Byte](Bytes.SIZEOF_LONG)
-        val localDataRange = getFreshByteRange(result)
-        rawLong.encode(localDataRange, value.toLong)
-        localDataRange.getBytes
-
-      case _ => Bytes.toBytes(value)
-    }
-  }
 }
 
 /**
diff --git 
a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
 
b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
index 093c6ac..1e50585 100644
--- 
a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
+++ 
b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
@@ -18,18 +18,17 @@
 
 package org.apache.hadoop.hbase.spark.datasources
 
+import java.sql.{Date, Timestamp}
+
 import org.apache.hadoop.hbase.spark.AvroSerdes
 import org.apache.hadoop.hbase.util.Bytes
-//import org.apache.spark.sql.execution.SparkSqlSerializer
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
-
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
 object Utils {
 
-
   /**
     * Parses the hbase field to it's corresponding
     * scala type which can then be put into a Spark GenericRow
@@ -48,14 +47,16 @@ object Utils {
     } else  {
       // Fall back to atomic type
       f.dt match {
-        case BooleanType => toBoolean(src, offset)
+        case BooleanType => src(offset) != 0
         case ByteType => src(offset)
-        case DoubleType => Bytes.toDouble(src, offset)
-        case FloatType => Bytes.toFloat(src, offset)
-        case IntegerType => Bytes.toInt(src, offset)
-        case LongType|TimestampType => Bytes.toLong(src, offset)
         case ShortType => Bytes.toShort(src, offset)
-        case StringType => toUTF8String(src, offset, length)
+        case IntegerType => Bytes.toInt(src, offset)
+        case LongType => Bytes.toLong(src, offset)
+        case FloatType => Bytes.toFloat(src, offset)
+        case DoubleType => Bytes.toDouble(src, offset)
+        case DateType => new Date(Bytes.toLong(src, offset))
+        case TimestampType => new Timestamp(Bytes.toLong(src, offset))
+        case StringType => UTF8String.fromBytes(src, offset, length)
         case BinaryType =>
           val newArray = new Array[Byte](length)
           System.arraycopy(src, offset, newArray, 0, length)
@@ -73,28 +74,19 @@ object Utils {
       val record = field.catalystToAvro(input)
       AvroSerdes.serialize(record, field.schema.get)
     } else {
-      input match {
-        case data: Boolean => Bytes.toBytes(data)
-        case data: Byte => Array(data)
-        case data: Array[Byte] => data
-        case data: Double => Bytes.toBytes(data)
-        case data: Float => Bytes.toBytes(data)
-        case data: Int => Bytes.toBytes(data)
-        case data: Long => Bytes.toBytes(data)
-        case data: Short => Bytes.toBytes(data)
-        case data: UTF8String => data.getBytes
-        case data: String => Bytes.toBytes(data)
-        // TODO: add more data type support
+      field.dt match {
+        case BooleanType => Bytes.toBytes(input.asInstanceOf[Boolean])
+        case ByteType => Array(input.asInstanceOf[Number].byteValue)
+        case ShortType => Bytes.toBytes(input.asInstanceOf[Number].shortValue)
+        case IntegerType => Bytes.toBytes(input.asInstanceOf[Number].intValue)
+        case LongType => Bytes.toBytes(input.asInstanceOf[Number].longValue)
+        case FloatType => Bytes.toBytes(input.asInstanceOf[Number].floatValue)
+        case DoubleType => 
Bytes.toBytes(input.asInstanceOf[Number].doubleValue)
+        case DateType | TimestampType => 
Bytes.toBytes(input.asInstanceOf[java.util.Date].getTime)
+        case StringType => Bytes.toBytes(input.toString)
+        case BinaryType => input.asInstanceOf[Array[Byte]]
         case _ => throw new Exception(s"unsupported data type ${field.dt}")
       }
     }
   }
-
-  def toBoolean(input: Array[Byte], offset: Int): Boolean = {
-    input(offset) != 0
-  }
-
-  def toUTF8String(input: Array[Byte], offset: Int, length: Int): UTF8String = 
{
-    UTF8String.fromBytes(input.slice(offset, offset + length))
-  }
 }
diff --git 
a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
 
b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index afe515b..72a84cf 100644
--- 
a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ 
b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.hbase.spark
 
+import java.sql.{Date, Timestamp}
+
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericData
 import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
@@ -89,8 +91,11 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
   val t1TableName = "t1"
   val t2TableName = "t2"
+  val t3TableName = "t3"
   val columnFamily = "c"
 
+  val timestamp = 1234567890000L
+
   var sqlContext:SQLContext = null
   var df:DataFrame = null
 
@@ -109,12 +114,22 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     catch {
       case e: Exception => logInfo(" - no table " + t2TableName + " found")
     }
+    try
+      TEST_UTIL.deleteTable(TableName.valueOf(t3TableName))
+    catch {
+      case e: Exception => logInfo(" - no table " + t3TableName + " found")
+    }
+
     logInfo(" - creating table " + t1TableName)
     TEST_UTIL.createTable(TableName.valueOf(t1TableName), 
Bytes.toBytes(columnFamily))
     logInfo(" - created table")
     logInfo(" - creating table " + t2TableName)
     TEST_UTIL.createTable(TableName.valueOf(t2TableName), 
Bytes.toBytes(columnFamily))
     logInfo(" - created table")
+    logInfo(" - creating table " + t3TableName)
+    TEST_UTIL.createTable(TableName.valueOf(t3TableName), 
Bytes.toBytes(columnFamily))
+    logInfo(" - created table")
+
     val sparkConf = new SparkConf
     sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
     sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
@@ -124,7 +139,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     val connection = 
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration)
     try {
-      val t1Table = connection.getTable(TableName.valueOf("t1"))
+      val t1Table = connection.getTable(TableName.valueOf(t1TableName))
 
       try {
         var put = new Put(Bytes.toBytes("get1"))
@@ -158,7 +173,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
         t1Table.close()
       }
 
-      val t2Table = connection.getTable(TableName.valueOf("t2"))
+      val t2Table = connection.getTable(TableName.valueOf(t2TableName))
 
       try {
         var put = new Put(Bytes.toBytes(1))
@@ -191,6 +206,26 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
       } finally {
         t2Table.close()
       }
+
+      val t3Table = connection.getTable(TableName.valueOf(t3TableName))
+
+      try {
+        val put = new Put(Bytes.toBytes("row"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("binary"), 
Array(1.toByte, 2.toByte, 3.toByte))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("boolean"), 
Bytes.toBytes(true))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("byte"), 
Array(127.toByte))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("short"), 
Bytes.toBytes(32767.toShort))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("int"), 
Bytes.toBytes(1000000))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("long"), 
Bytes.toBytes(10000000000L))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("float"), 
Bytes.toBytes(0.5f))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("double"), 
Bytes.toBytes(0.125))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("date"), 
Bytes.toBytes(timestamp))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("timestamp"), 
Bytes.toBytes(timestamp))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("string"), 
Bytes.toBytes("string"))
+        t3Table.put(put)
+      } finally {
+        t3Table.close()
+      }
     } finally {
       connection.close()
     }
@@ -807,6 +842,59 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     assert(executionRules.dynamicLogicExpression == null)
   }
 
+  test("Test mapping") {
+    val catalog = s"""{
+                     |"table":{"namespace":"default", "name":"t3"},
+                     |"rowkey":"key",
+                     |"columns":{
+                     |"KEY_FIELD":{"cf":"rowkey", "col":"key", 
"type":"string"},
+                     |"BINARY_FIELD":{"cf":"c", "col":"binary", 
"type":"binary"},
+                     |"BOOLEAN_FIELD":{"cf":"c", "col":"boolean", 
"type":"boolean"},
+                     |"BYTE_FIELD":{"cf":"c", "col":"byte", "type":"byte"},
+                     |"SHORT_FIELD":{"cf":"c", "col":"short", "type":"short"},
+                     |"INT_FIELD":{"cf":"c", "col":"int", "type":"int"},
+                     |"LONG_FIELD":{"cf":"c", "col":"long", "type":"long"},
+                     |"FLOAT_FIELD":{"cf":"c", "col":"float", "type":"float"},
+                     |"DOUBLE_FIELD":{"cf":"c", "col":"double", 
"type":"double"},
+                     |"DATE_FIELD":{"cf":"c", "col":"date", "type":"date"},
+                     |"TIMESTAMP_FIELD":{"cf":"c", "col":"timestamp", 
"type":"timestamp"},
+                     |"STRING_FIELD":{"cf":"c", "col":"string", 
"type":"string"}
+                     |}
+                     |}""".stripMargin
+    df = sqlContext.load("org.apache.hadoop.hbase.spark",
+      Map(HBaseTableCatalog.tableCatalog->catalog))
+
+    df.registerTempTable("hbaseTestMapping")
+
+    val results = sqlContext.sql("SELECT binary_field, boolean_field, " +
+      "byte_field, short_field, int_field, long_field, " +
+      "float_field, double_field, date_field, timestamp_field, " +
+      "string_field FROM hbaseTestMapping").collect()
+
+    assert(results.length == 1)
+
+    val result = results(0)
+
+    System.out.println("row: " + result)
+    System.out.println("0: " + result.get(0))
+    System.out.println("1: " + result.get(1))
+    System.out.println("2: " + result.get(2))
+    System.out.println("3: " + result.get(3))
+
+    
assert(result.get(0).asInstanceOf[Array[Byte]].sameElements(Array(1.toByte, 
2.toByte, 3.toByte)))
+    assert(result.get(1) == true)
+    assert(result.get(2) == 127)
+    assert(result.get(3) == 32767)
+    assert(result.get(4) == 1000000)
+    assert(result.get(5) == 10000000000L)
+    assert(result.get(6) == 0.5)
+    assert(result.get(7) == 0.125)
+    // sql date stores only year, month and day, so checking it is within a day
+    assert(Math.abs(result.get(8).asInstanceOf[Date].getTime - timestamp) <= 
86400000)
+    assert(result.get(9).asInstanceOf[Timestamp].getTime == timestamp)
+    assert(result.get(10) == "string")
+  }
+
   def writeCatalog = s"""{
                     |"table":{"namespace":"default", "name":"table1"},
                     |"rowkey":"key",
diff --git 
a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala
 
b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala
index 74bf912..d6245a6 100644
--- 
a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala
+++ 
b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala
@@ -38,7 +38,9 @@ class HBaseCatalogSuite extends FunSuite with 
BeforeAndAfterEach with BeforeAndA
                     |"col5":{"cf":"cf1", "col":"col4", "type":"double", 
"serdes":"${classOf[DoubleSerDes].getName}"},
                     |"col6":{"cf":"cf1", "col":"col5", "type":"$map"},
                     |"col7":{"cf":"cf1", "col":"col6", "type":"$array"},
-                    |"col8":{"cf":"cf1", "col":"col7", "type":"$arrayMap"}
+                    |"col8":{"cf":"cf1", "col":"col7", "type":"$arrayMap"},
+                    |"col9":{"cf":"cf1", "col":"col8", "type":"date"},
+                    |"col10":{"cf":"cf1", "col":"col9", "type":"timestamp"}
                     |}
                     |}""".stripMargin
   val parameters = Map(HBaseTableCatalog.tableCatalog->catalog)
@@ -63,6 +65,8 @@ class HBaseCatalogSuite extends FunSuite with 
BeforeAndAfterEach with BeforeAndA
     assert(t.getField("col2").length == Bytes.SIZEOF_DOUBLE)
     assert(t.getField("col1").length == -1)
     assert(t.getField("col8").length == -1)
+    assert(t.getField("col9").dt == DateType)
+    assert(t.getField("col10").dt == TimestampType)
   }
 
   checkDataType(
@@ -95,7 +99,7 @@ class HBaseCatalogSuite extends FunSuite with 
BeforeAndAfterEach with BeforeAndA
     assert(DataTypeParserWrapper.parse("BINARY") === t.getField("C_FIELD").dt)
   }
 
-  test("compatiblity") {
+  test("compatibility") {
     val m = Map("hbase.columns.mapping" ->
       "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD DOUBLE c:b, C_FIELD 
BINARY c:c,",
       "hbase.table" -> "t1")

Reply via email to