Repository: hbase Updated Branches: refs/heads/master fee67bcf1 -> 35d7a0cd0
HBASE-15597 Clean up configuration keys used in hbase-spark module (Yi Liang) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/35d7a0cd Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/35d7a0cd Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/35d7a0cd Branch: refs/heads/master Commit: 35d7a0cd0798cabe7df5766fcc993512eca6c92e Parents: fee67bc Author: Jerry He <jerry...@apache.org> Authored: Mon Mar 13 12:02:07 2017 -0700 Committer: Jerry He <jerry...@apache.org> Committed: Mon Mar 13 12:02:07 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/spark/DefaultSource.scala | 28 ++++----- .../hbase/spark/HBaseConnectionCache.scala | 2 +- .../spark/datasources/HBaseSparkConf.scala | 62 ++++++++++++-------- .../hadoop/hbase/spark/DefaultSourceSuite.scala | 16 ++--- .../spark/DynamicLogicExpressionSuite.scala | 2 +- .../hadoop/hbase/spark/HBaseTestSource.scala | 13 ++-- .../hbase/spark/PartitionFilterSuite.scala | 6 +- 7 files changed, 69 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala index a8b2ab8..b2b646a 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala @@ -97,36 +97,36 @@ case class HBaseRelation ( )(@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan with InsertableRelation with Logging { val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong) - val minTimestamp = parameters.get(HBaseSparkConf.MIN_TIMESTAMP).map(_.toLong) - val maxTimestamp = parameters.get(HBaseSparkConf.MAX_TIMESTAMP).map(_.toLong) + val minTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_START).map(_.toLong) + val maxTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_END).map(_.toLong) val maxVersions = parameters.get(HBaseSparkConf.MAX_VERSIONS).map(_.toInt) - val encoderClsName = parameters.get(HBaseSparkConf.ENCODER).getOrElse(HBaseSparkConf.defaultEncoder) + val encoderClsName = parameters.get(HBaseSparkConf.QUERY_ENCODER).getOrElse(HBaseSparkConf.DEFAULT_QUERY_ENCODER) @transient val encoder = JavaBytesEncoder.create(encoderClsName) val catalog = HBaseTableCatalog(parameters) def tableName = catalog.name - val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_RESOURCES_LOCATIONS, "") - val useHBaseContext = parameters.get(HBaseSparkConf.USE_HBASE_CONTEXT).map(_.toBoolean).getOrElse(true) - val usePushDownColumnFilter = parameters.get(HBaseSparkConf.PUSH_DOWN_COLUMN_FILTER) - .map(_.toBoolean).getOrElse(true) + val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_LOCATION, "") + val useHBaseContext = parameters.get(HBaseSparkConf.USE_HBASECONTEXT).map(_.toBoolean).getOrElse(HBaseSparkConf.DEFAULT_USE_HBASECONTEXT) + val usePushDownColumnFilter = parameters.get(HBaseSparkConf.PUSHDOWN_COLUMN_FILTER) + .map(_.toBoolean).getOrElse(HBaseSparkConf.DEFAULT_PUSHDOWN_COLUMN_FILTER) // The user supplied per table parameter will overwrite global ones in SparkConf - val blockCacheEnable = parameters.get(HBaseSparkConf.BLOCK_CACHE_ENABLE).map(_.toBoolean) + val blockCacheEnable = parameters.get(HBaseSparkConf.QUERY_CACHEBLOCKS).map(_.toBoolean) .getOrElse( sqlContext.sparkContext.getConf.getBoolean( - HBaseSparkConf.BLOCK_CACHE_ENABLE, HBaseSparkConf.defaultBlockCacheEnable)) - val cacheSize = parameters.get(HBaseSparkConf.CACHE_SIZE).map(_.toInt) + HBaseSparkConf.QUERY_CACHEBLOCKS, HBaseSparkConf.DEFAULT_QUERY_CACHEBLOCKS)) + val cacheSize = parameters.get(HBaseSparkConf.QUERY_CACHEDROWS).map(_.toInt) .getOrElse( sqlContext.sparkContext.getConf.getInt( - HBaseSparkConf.CACHE_SIZE, HBaseSparkConf.defaultCachingSize)) - val batchNum = parameters.get(HBaseSparkConf.BATCH_NUM).map(_.toInt) + HBaseSparkConf.QUERY_CACHEDROWS, -1)) + val batchNum = parameters.get(HBaseSparkConf.QUERY_BATCHSIZE).map(_.toInt) .getOrElse(sqlContext.sparkContext.getConf.getInt( - HBaseSparkConf.BATCH_NUM, HBaseSparkConf.defaultBatchNum)) + HBaseSparkConf.QUERY_BATCHSIZE, -1)) val bulkGetSize = parameters.get(HBaseSparkConf.BULKGET_SIZE).map(_.toInt) .getOrElse(sqlContext.sparkContext.getConf.getInt( - HBaseSparkConf.BULKGET_SIZE, HBaseSparkConf.defaultBulkGetSize)) + HBaseSparkConf.BULKGET_SIZE, HBaseSparkConf.DEFAULT_BULKGET_SIZE)) //create or get latest HBaseContext val hbaseContext:HBaseContext = if (useHBaseContext) { http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala index fb5833e..2858da8 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala @@ -37,7 +37,7 @@ private[spark] object HBaseConnectionCache extends Logging { val cacheStat = HBaseConnectionCacheStat(0, 0, 0) // in milliseconds - private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.connectionCloseDelay + private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.DEFAULT_CONNECTION_CLOSE_DELAY private var timeout = DEFAULT_TIME_OUT private var closed: Boolean = false http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala index 0f20d1d..8c1cb35 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala @@ -20,35 +20,45 @@ package org.apache.hadoop.hbase.spark.datasources import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +/** + * This is the hbase configuration. User can either set them in SparkConf, which + * will take effect globally, or configure it per table, which will overwrite the value + * set in SparkConf. If not set, the default value will take effect. + */ @InterfaceAudience.Public @InterfaceStability.Evolving object HBaseSparkConf{ - // This is the hbase configuration. User can either set them in SparkConf, which - // will take effect globally, or configure it per table, which will overwrite the value - // set in SparkConf. If not setted, the default value will take effect. - val BLOCK_CACHE_ENABLE = "spark.hbase.blockcache.enable" - // default block cache is set to true by default following hbase convention, but note that - // this potentially may slow down the system - val defaultBlockCacheEnable = true - val CACHE_SIZE = "spark.hbase.cacheSize" - val defaultCachingSize = 1000 - val BATCH_NUM = "spark.hbase.batchNum" - val defaultBatchNum = 1000 - val BULKGET_SIZE = "spark.hbase.bulkGetSize" - val defaultBulkGetSize = 1000 - - val HBASE_CONFIG_RESOURCES_LOCATIONS = "hbase.config.resources" - val USE_HBASE_CONTEXT = "hbase.use.hbase.context" - val PUSH_DOWN_COLUMN_FILTER = "hbase.pushdown.column.filter" - val defaultPushDownColumnFilter = true - + /** Set to false to disable server-side caching of blocks for this scan, + * false by default, since full table scans generate too much BC churn. + */ + val QUERY_CACHEBLOCKS = "hbase.spark.query.cacheblocks" + val DEFAULT_QUERY_CACHEBLOCKS = false + /** The number of rows for caching that will be passed to scan. */ + val QUERY_CACHEDROWS = "hbase.spark.query.cachedrows" + /** Set the maximum number of values to return for each call to next() in scan. */ + val QUERY_BATCHSIZE = "hbase.spark.query.batchsize" + /** The number of BulkGets send to HBase. */ + val BULKGET_SIZE = "hbase.spark.bulkget.size" + val DEFAULT_BULKGET_SIZE = 1000 + /** Set to specify the location of hbase configuration file. */ + val HBASE_CONFIG_LOCATION = "hbase.spark.config.location" + /** Set to specify whether create or use latest cached HBaseContext*/ + val USE_HBASECONTEXT = "hbase.spark.use.hbasecontext" + val DEFAULT_USE_HBASECONTEXT = true + /** Pushdown the filter to data source engine to increase the performance of queries. */ + val PUSHDOWN_COLUMN_FILTER = "hbase.spark.pushdown.columnfilter" + val DEFAULT_PUSHDOWN_COLUMN_FILTER= true + /** Class name of the encoder, which encode data types from Spark to HBase bytes. */ + val QUERY_ENCODER = "hbase.spark.query.encoder" + val DEFAULT_QUERY_ENCODER = classOf[NaiveEncoder].getCanonicalName + /** The timestamp used to filter columns with a specific timestamp. */ val TIMESTAMP = "hbase.spark.query.timestamp" - val MIN_TIMESTAMP = "hbase.spark.query.minTimestamp" - val MAX_TIMESTAMP = "hbase.spark.query.maxTimestamp" + /** The starting timestamp used to filter columns with a specific range of versions. */ + val TIMERANGE_START = "hbase.spark.query.timerange.start" + /** The ending timestamp used to filter columns with a specific range of versions. */ + val TIMERANGE_END = "hbase.spark.query.timerange.end" + /** The maximum number of version to return. */ val MAX_VERSIONS = "hbase.spark.query.maxVersions" - val ENCODER = "hbase.spark.query.encoder" - val defaultEncoder = classOf[NaiveEncoder].getCanonicalName - - // in milliseconds - val connectionCloseDelay = 10 * 60 * 1000 + /** Delayed time to close hbase-spark connection when no reference to this connection, in milliseconds. */ + val DEFAULT_CONNECTION_CLOSE_DELAY = 10 * 60 * 1000 } http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala index 7b8b844..3bce041 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala @@ -116,9 +116,9 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily)) logInfo(" - created table") val sparkConf = new SparkConf - sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true") - sparkConf.set(HBaseSparkConf.BATCH_NUM, "100") - sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100") + sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true") + sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100") + sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100") sc = new SparkContext("local", "test", sparkConf) @@ -791,7 +791,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { |}""".stripMargin df = sqlContext.load("org.apache.hadoop.hbase.spark", Map(HBaseTableCatalog.tableCatalog->catalog, - HBaseSparkConf.PUSH_DOWN_COLUMN_FILTER -> "false")) + HBaseSparkConf.PUSHDOWN_COLUMN_FILTER -> "false")) df.registerTempTable("hbaseNoPushDownTmp") @@ -913,8 +913,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { // Test Getting old stuff -- Full Scan, TimeRange val oldRange = sqlContext.read - .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0", - HBaseSparkConf.MAX_TIMESTAMP -> (oldMs + 100).toString)) + .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0", + HBaseSparkConf.TIMERANGE_END -> (oldMs + 100).toString)) .format("org.apache.hadoop.hbase.spark") .load() assert(oldRange.count() == 101) @@ -924,8 +924,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { // Test Getting middle stuff -- Full Scan, TimeRange val middleRange = sqlContext.read - .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0", - HBaseSparkConf.MAX_TIMESTAMP -> (startMs + 100).toString)) + .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0", + HBaseSparkConf.TIMERANGE_END -> (startMs + 100).toString)) .format("org.apache.hadoop.hbase.spark") .load() assert(middleRange.count() == 256) http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala index b9c15ce..bc833e8 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} class DynamicLogicExpressionSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll with Logging { - val encoder = JavaBytesEncoder.create(HBaseSparkConf.defaultEncoder) + val encoder = JavaBytesEncoder.create(HBaseSparkConf.DEFAULT_QUERY_ENCODER) test("Basic And Test") { val leftLogic = new LessThanLogicExpression("Col1", 0) http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala index 83465d9..ccb4625 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala @@ -49,13 +49,12 @@ case class DummyScan( override def buildScan(): RDD[Row] = sqlContext.sparkContext.parallelize(0 until rowNum) .map(Row(_)) .map{ x => - if (sparkConf.getInt(HBaseSparkConf.BATCH_NUM, - HBaseSparkConf.defaultBatchNum) != batchNum || - sparkConf.getInt(HBaseSparkConf.CACHE_SIZE, - HBaseSparkConf.defaultCachingSize) != cacheSize || - sparkConf.getBoolean(HBaseSparkConf.BLOCK_CACHE_ENABLE, - HBaseSparkConf.defaultBlockCacheEnable) - != blockCachingEnable) { + if (sparkConf.getInt(HBaseSparkConf.QUERY_BATCHSIZE, + -1) != batchNum || + sparkConf.getInt(HBaseSparkConf.QUERY_CACHEDROWS, + -1) != cacheSize || + sparkConf.getBoolean(HBaseSparkConf.QUERY_CACHEBLOCKS, + false) != blockCachingEnable) { throw new Exception("HBase Spark configuration cannot be set properly") } x http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala index d33ced9..f47a319 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala @@ -69,9 +69,9 @@ class PartitionFilterSuite extends FunSuite with TEST_UTIL.startMiniCluster val sparkConf = new SparkConf - sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true") - sparkConf.set(HBaseSparkConf.BATCH_NUM, "100") - sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100") + sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true") + sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100") + sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100") sc = new SparkContext("local", "test", sparkConf) new HBaseContext(sc, TEST_UTIL.getConfiguration)