This is an automated email from the ASF dual-hosted git repository.
meszibalu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new d631afb HBASE-22711 Spark connector doesn't use the given mapping
when inserting data
d631afb is described below
commit d631afb8f656050c4d58f61afba85f5fa2662304
Author: Balazs Meszaros <[email protected]>
AuthorDate: Mon Jul 22 16:32:10 2019 +0200
HBASE-22711 Spark connector doesn't use the given mapping when inserting
data
* insert always uses the given mapping
* supports the following types:
* binary
* boolean
* byte, short, int, long
* float, double
* date, timestamp
* string
Signed-off-by: Peter Somogyi <[email protected]>
---
.../apache/hadoop/hbase/spark/DefaultSource.scala | 99 +---------------------
.../hadoop/hbase/spark/datasources/Utils.scala | 50 +++++------
.../hadoop/hbase/spark/DefaultSourceSuite.scala | 92 +++++++++++++++++++-
.../hadoop/hbase/spark/HBaseCatalogSuite.scala | 8 +-
4 files changed, 119 insertions(+), 130 deletions(-)
diff --git
a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index d9d5a66..70f5cab 100644
---
a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++
b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -440,11 +440,9 @@ case class HBaseRelation (
if (field != null) {
if (field.isRowKey) {
parentRowKeyFilter.mergeIntersect(new RowKeyFilter(
- DefaultSourceStaticUtils.getByteValue(field,
- value.toString), null))
+ Utils.toBytes(value, field), null))
}
- val byteValue =
- DefaultSourceStaticUtils.getByteValue(field, value.toString)
+ val byteValue = Utils.toBytes(value, field)
valueArray += byteValue
}
new EqualLogicExpression(attr, valueArray.length - 1, false)
@@ -929,12 +927,6 @@ class ColumnFilterCollection {
@InterfaceAudience.Private
object DefaultSourceStaticUtils {
- val rawInteger = new RawInteger
- val rawLong = new RawLong
- val rawFloat = new RawFloat
- val rawDouble = new RawDouble
- val rawString = RawString.ASCENDING
-
val byteRange = new ThreadLocal[PositionedByteRange] {
override def initialValue(): PositionedByteRange = {
val range = new SimplePositionedMutableByteRange()
@@ -973,93 +965,6 @@ object DefaultSourceStaticUtils {
lastFiveExecutionRules.poll()
}
}
-
- /**
- * This method will convert the result content from HBase into the
- * SQL value type that is requested by the Spark SQL schema definition
- *
- * @param field The structure of the SparkSQL Column
- * @param r The result object from HBase
- * @return The converted object type
- */
- def getValue(field: Field,
- r: Result): Any = {
- if (field.isRowKey) {
- val row = r.getRow
-
- field.dt match {
- case IntegerType => rawInteger.decode(getFreshByteRange(row))
- case LongType => rawLong.decode(getFreshByteRange(row))
- case FloatType => rawFloat.decode(getFreshByteRange(row))
- case DoubleType => rawDouble.decode(getFreshByteRange(row))
- case StringType => rawString.decode(getFreshByteRange(row))
- case TimestampType => rawLong.decode(getFreshByteRange(row))
- case _ => Bytes.toString(row)
- }
- } else {
- val cellByteValue =
- r.getColumnLatestCell(field.cfBytes, field.colBytes)
- if (cellByteValue == null) null
- else field.dt match {
- case IntegerType =>
rawInteger.decode(getFreshByteRange(cellByteValue.getValueArray,
- cellByteValue.getValueOffset, cellByteValue.getValueLength))
- case LongType =>
rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
- cellByteValue.getValueOffset, cellByteValue.getValueLength))
- case FloatType =>
rawFloat.decode(getFreshByteRange(cellByteValue.getValueArray,
- cellByteValue.getValueOffset, cellByteValue.getValueLength))
- case DoubleType =>
rawDouble.decode(getFreshByteRange(cellByteValue.getValueArray,
- cellByteValue.getValueOffset, cellByteValue.getValueLength))
- case StringType => Bytes.toString(cellByteValue.getValueArray,
- cellByteValue.getValueOffset, cellByteValue.getValueLength)
- case TimestampType =>
rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
- cellByteValue.getValueOffset, cellByteValue.getValueLength))
- case _ => Bytes.toString(cellByteValue.getValueArray,
- cellByteValue.getValueOffset, cellByteValue.getValueLength)
- }
- }
- }
-
- /**
- * This will convert the value from SparkSQL to be stored into HBase using
the
- * right byte Type
- *
- * @param value String value from SparkSQL
- * @return Returns the byte array to go into HBase
- */
- def getByteValue(field: Field,
- value: String): Array[Byte] = {
- field.dt match {
- case IntegerType =>
- val result = new Array[Byte](Bytes.SIZEOF_INT)
- val localDataRange = getFreshByteRange(result)
- rawInteger.encode(localDataRange, value.toInt)
- localDataRange.getBytes
- case LongType =>
- val result = new Array[Byte](Bytes.SIZEOF_LONG)
- val localDataRange = getFreshByteRange(result)
- rawLong.encode(localDataRange, value.toLong)
- localDataRange.getBytes
- case FloatType =>
- val result = new Array[Byte](Bytes.SIZEOF_FLOAT)
- val localDataRange = getFreshByteRange(result)
- rawFloat.encode(localDataRange, value.toFloat)
- localDataRange.getBytes
- case DoubleType =>
- val result = new Array[Byte](Bytes.SIZEOF_DOUBLE)
- val localDataRange = getFreshByteRange(result)
- rawDouble.encode(localDataRange, value.toDouble)
- localDataRange.getBytes
- case StringType =>
- Bytes.toBytes(value)
- case TimestampType =>
- val result = new Array[Byte](Bytes.SIZEOF_LONG)
- val localDataRange = getFreshByteRange(result)
- rawLong.encode(localDataRange, value.toLong)
- localDataRange.getBytes
-
- case _ => Bytes.toBytes(value)
- }
- }
}
/**
diff --git
a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
index 093c6ac..1e50585 100644
---
a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
+++
b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
@@ -18,18 +18,17 @@
package org.apache.hadoop.hbase.spark.datasources
+import java.sql.{Date, Timestamp}
+
import org.apache.hadoop.hbase.spark.AvroSerdes
import org.apache.hadoop.hbase.util.Bytes
-//import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
-
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
object Utils {
-
/**
* Parses the hbase field to it's corresponding
* scala type which can then be put into a Spark GenericRow
@@ -48,14 +47,16 @@ object Utils {
} else {
// Fall back to atomic type
f.dt match {
- case BooleanType => toBoolean(src, offset)
+ case BooleanType => src(offset) != 0
case ByteType => src(offset)
- case DoubleType => Bytes.toDouble(src, offset)
- case FloatType => Bytes.toFloat(src, offset)
- case IntegerType => Bytes.toInt(src, offset)
- case LongType|TimestampType => Bytes.toLong(src, offset)
case ShortType => Bytes.toShort(src, offset)
- case StringType => toUTF8String(src, offset, length)
+ case IntegerType => Bytes.toInt(src, offset)
+ case LongType => Bytes.toLong(src, offset)
+ case FloatType => Bytes.toFloat(src, offset)
+ case DoubleType => Bytes.toDouble(src, offset)
+ case DateType => new Date(Bytes.toLong(src, offset))
+ case TimestampType => new Timestamp(Bytes.toLong(src, offset))
+ case StringType => UTF8String.fromBytes(src, offset, length)
case BinaryType =>
val newArray = new Array[Byte](length)
System.arraycopy(src, offset, newArray, 0, length)
@@ -73,28 +74,19 @@ object Utils {
val record = field.catalystToAvro(input)
AvroSerdes.serialize(record, field.schema.get)
} else {
- input match {
- case data: Boolean => Bytes.toBytes(data)
- case data: Byte => Array(data)
- case data: Array[Byte] => data
- case data: Double => Bytes.toBytes(data)
- case data: Float => Bytes.toBytes(data)
- case data: Int => Bytes.toBytes(data)
- case data: Long => Bytes.toBytes(data)
- case data: Short => Bytes.toBytes(data)
- case data: UTF8String => data.getBytes
- case data: String => Bytes.toBytes(data)
- // TODO: add more data type support
+ field.dt match {
+ case BooleanType => Bytes.toBytes(input.asInstanceOf[Boolean])
+ case ByteType => Array(input.asInstanceOf[Number].byteValue)
+ case ShortType => Bytes.toBytes(input.asInstanceOf[Number].shortValue)
+ case IntegerType => Bytes.toBytes(input.asInstanceOf[Number].intValue)
+ case LongType => Bytes.toBytes(input.asInstanceOf[Number].longValue)
+ case FloatType => Bytes.toBytes(input.asInstanceOf[Number].floatValue)
+ case DoubleType =>
Bytes.toBytes(input.asInstanceOf[Number].doubleValue)
+ case DateType | TimestampType =>
Bytes.toBytes(input.asInstanceOf[java.util.Date].getTime)
+ case StringType => Bytes.toBytes(input.toString)
+ case BinaryType => input.asInstanceOf[Array[Byte]]
case _ => throw new Exception(s"unsupported data type ${field.dt}")
}
}
}
-
- def toBoolean(input: Array[Byte], offset: Int): Boolean = {
- input(offset) != 0
- }
-
- def toUTF8String(input: Array[Byte], offset: Int, length: Int): UTF8String =
{
- UTF8String.fromBytes(input.slice(offset, offset + length))
- }
}
diff --git
a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index afe515b..72a84cf 100644
---
a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++
b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -17,6 +17,8 @@
package org.apache.hadoop.hbase.spark
+import java.sql.{Date, Timestamp}
+
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
@@ -89,8 +91,11 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
val t1TableName = "t1"
val t2TableName = "t2"
+ val t3TableName = "t3"
val columnFamily = "c"
+ val timestamp = 1234567890000L
+
var sqlContext:SQLContext = null
var df:DataFrame = null
@@ -109,12 +114,22 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
catch {
case e: Exception => logInfo(" - no table " + t2TableName + " found")
}
+ try
+ TEST_UTIL.deleteTable(TableName.valueOf(t3TableName))
+ catch {
+ case e: Exception => logInfo(" - no table " + t3TableName + " found")
+ }
+
logInfo(" - creating table " + t1TableName)
TEST_UTIL.createTable(TableName.valueOf(t1TableName),
Bytes.toBytes(columnFamily))
logInfo(" - created table")
logInfo(" - creating table " + t2TableName)
TEST_UTIL.createTable(TableName.valueOf(t2TableName),
Bytes.toBytes(columnFamily))
logInfo(" - created table")
+ logInfo(" - creating table " + t3TableName)
+ TEST_UTIL.createTable(TableName.valueOf(t3TableName),
Bytes.toBytes(columnFamily))
+ logInfo(" - created table")
+
val sparkConf = new SparkConf
sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
@@ -124,7 +139,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
val connection =
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration)
try {
- val t1Table = connection.getTable(TableName.valueOf("t1"))
+ val t1Table = connection.getTable(TableName.valueOf(t1TableName))
try {
var put = new Put(Bytes.toBytes("get1"))
@@ -158,7 +173,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
t1Table.close()
}
- val t2Table = connection.getTable(TableName.valueOf("t2"))
+ val t2Table = connection.getTable(TableName.valueOf(t2TableName))
try {
var put = new Put(Bytes.toBytes(1))
@@ -191,6 +206,26 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
} finally {
t2Table.close()
}
+
+ val t3Table = connection.getTable(TableName.valueOf(t3TableName))
+
+ try {
+ val put = new Put(Bytes.toBytes("row"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("binary"),
Array(1.toByte, 2.toByte, 3.toByte))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("boolean"),
Bytes.toBytes(true))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("byte"),
Array(127.toByte))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("short"),
Bytes.toBytes(32767.toShort))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("int"),
Bytes.toBytes(1000000))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("long"),
Bytes.toBytes(10000000000L))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("float"),
Bytes.toBytes(0.5f))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("double"),
Bytes.toBytes(0.125))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("date"),
Bytes.toBytes(timestamp))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("timestamp"),
Bytes.toBytes(timestamp))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("string"),
Bytes.toBytes("string"))
+ t3Table.put(put)
+ } finally {
+ t3Table.close()
+ }
} finally {
connection.close()
}
@@ -807,6 +842,59 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
assert(executionRules.dynamicLogicExpression == null)
}
+ test("Test mapping") {
+ val catalog = s"""{
+ |"table":{"namespace":"default", "name":"t3"},
+ |"rowkey":"key",
+ |"columns":{
+ |"KEY_FIELD":{"cf":"rowkey", "col":"key",
"type":"string"},
+ |"BINARY_FIELD":{"cf":"c", "col":"binary",
"type":"binary"},
+ |"BOOLEAN_FIELD":{"cf":"c", "col":"boolean",
"type":"boolean"},
+ |"BYTE_FIELD":{"cf":"c", "col":"byte", "type":"byte"},
+ |"SHORT_FIELD":{"cf":"c", "col":"short", "type":"short"},
+ |"INT_FIELD":{"cf":"c", "col":"int", "type":"int"},
+ |"LONG_FIELD":{"cf":"c", "col":"long", "type":"long"},
+ |"FLOAT_FIELD":{"cf":"c", "col":"float", "type":"float"},
+ |"DOUBLE_FIELD":{"cf":"c", "col":"double",
"type":"double"},
+ |"DATE_FIELD":{"cf":"c", "col":"date", "type":"date"},
+ |"TIMESTAMP_FIELD":{"cf":"c", "col":"timestamp",
"type":"timestamp"},
+ |"STRING_FIELD":{"cf":"c", "col":"string",
"type":"string"}
+ |}
+ |}""".stripMargin
+ df = sqlContext.load("org.apache.hadoop.hbase.spark",
+ Map(HBaseTableCatalog.tableCatalog->catalog))
+
+ df.registerTempTable("hbaseTestMapping")
+
+ val results = sqlContext.sql("SELECT binary_field, boolean_field, " +
+ "byte_field, short_field, int_field, long_field, " +
+ "float_field, double_field, date_field, timestamp_field, " +
+ "string_field FROM hbaseTestMapping").collect()
+
+ assert(results.length == 1)
+
+ val result = results(0)
+
+ System.out.println("row: " + result)
+ System.out.println("0: " + result.get(0))
+ System.out.println("1: " + result.get(1))
+ System.out.println("2: " + result.get(2))
+ System.out.println("3: " + result.get(3))
+
+
assert(result.get(0).asInstanceOf[Array[Byte]].sameElements(Array(1.toByte,
2.toByte, 3.toByte)))
+ assert(result.get(1) == true)
+ assert(result.get(2) == 127)
+ assert(result.get(3) == 32767)
+ assert(result.get(4) == 1000000)
+ assert(result.get(5) == 10000000000L)
+ assert(result.get(6) == 0.5)
+ assert(result.get(7) == 0.125)
+ // sql date stores only year, month and day, so checking it is within a day
+ assert(Math.abs(result.get(8).asInstanceOf[Date].getTime - timestamp) <=
86400000)
+ assert(result.get(9).asInstanceOf[Timestamp].getTime == timestamp)
+ assert(result.get(10) == "string")
+ }
+
def writeCatalog = s"""{
|"table":{"namespace":"default", "name":"table1"},
|"rowkey":"key",
diff --git
a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala
b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala
index 74bf912..d6245a6 100644
---
a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala
+++
b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala
@@ -38,7 +38,9 @@ class HBaseCatalogSuite extends FunSuite with
BeforeAndAfterEach with BeforeAndA
|"col5":{"cf":"cf1", "col":"col4", "type":"double",
"serdes":"${classOf[DoubleSerDes].getName}"},
|"col6":{"cf":"cf1", "col":"col5", "type":"$map"},
|"col7":{"cf":"cf1", "col":"col6", "type":"$array"},
- |"col8":{"cf":"cf1", "col":"col7", "type":"$arrayMap"}
+ |"col8":{"cf":"cf1", "col":"col7", "type":"$arrayMap"},
+ |"col9":{"cf":"cf1", "col":"col8", "type":"date"},
+ |"col10":{"cf":"cf1", "col":"col9", "type":"timestamp"}
|}
|}""".stripMargin
val parameters = Map(HBaseTableCatalog.tableCatalog->catalog)
@@ -63,6 +65,8 @@ class HBaseCatalogSuite extends FunSuite with
BeforeAndAfterEach with BeforeAndA
assert(t.getField("col2").length == Bytes.SIZEOF_DOUBLE)
assert(t.getField("col1").length == -1)
assert(t.getField("col8").length == -1)
+ assert(t.getField("col9").dt == DateType)
+ assert(t.getField("col10").dt == TimestampType)
}
checkDataType(
@@ -95,7 +99,7 @@ class HBaseCatalogSuite extends FunSuite with
BeforeAndAfterEach with BeforeAndA
assert(DataTypeParserWrapper.parse("BINARY") === t.getField("C_FIELD").dt)
}
- test("compatiblity") {
+ test("compatibility") {
val m = Map("hbase.columns.mapping" ->
"KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD DOUBLE c:b, C_FIELD
BINARY c:c,",
"hbase.table" -> "t1")