Repository: bahir Updated Branches: refs/heads/master abfdc706f -> fd4c35fc9
[BAHIR-102] Initial support of Cloudant Query and examples Add optimization to use query in particular scenarios. Closes #41. Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/fd4c35fc Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/fd4c35fc Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/fd4c35fc Branch: refs/heads/master Commit: fd4c35fc9f7ebb57464d231cf5d66e7bc4096a1b Parents: abfdc70 Author: Yang Lei <genia...@gmail.com> Authored: Fri Apr 7 19:23:43 2017 -0400 Committer: Luciano Resende <lrese...@apache.org> Committed: Thu Apr 13 12:15:10 2017 -0700 ---------------------------------------------------------------------- sql-cloudant/README.md | 2 + sql-cloudant/examples/python/CloudantQuery.py | 65 ++++++++++ sql-cloudant/examples/python/CloudantQueryDF.py | 61 +++++++++ .../src/main/resources/application.conf | 2 + .../apache/bahir/cloudant/CloudantConfig.scala | 94 ++++++-------- .../apache/bahir/cloudant/DefaultSource.scala | 32 +---- .../common/JsonStoreConfigManager.scala | 62 ++------- .../cloudant/common/JsonStoreDataAccess.scala | 79 ++++++------ .../bahir/cloudant/common/JsonStoreRDD.scala | 129 ++++++++++++++++--- 9 files changed, 338 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/fd4c35fc/sql-cloudant/README.md ---------------------------------------------------------------------- diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md index eaa8893..38d2bbb 100644 --- a/sql-cloudant/README.md +++ b/sql-cloudant/README.md @@ -62,6 +62,8 @@ cloudant.protocol|https|protocol to use to transfer data: http or https cloudant.host||cloudant host url cloudant.username||cloudant userid cloudant.password||cloudant password +cloudant.useQuery|false|By default, _all_docs endpoint is used if configuration 'view' and 'index' (see below) are not set. When useQuery is enabled, _find endpoint will be used in place of _all_docs when query condition is not on primary key field (_id), so that query predicates may be driven into datastore. +cloudant.queryLimit|25|The maximum number of results returned when querying the _find endpoint. jsonstore.rdd.partitions|10|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited jsonstore.rdd.minInPartition|10|the min rows in a partition. http://git-wip-us.apache.org/repos/asf/bahir/blob/fd4c35fc/sql-cloudant/examples/python/CloudantQuery.py ---------------------------------------------------------------------- diff --git a/sql-cloudant/examples/python/CloudantQuery.py b/sql-cloudant/examples/python/CloudantQuery.py new file mode 100644 index 0000000..5ca5c44 --- /dev/null +++ b/sql-cloudant/examples/python/CloudantQuery.py @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pprint +from pyspark.sql import SparkSession + +# define cloudant related configuration +# set protocol to http if needed, default value=https +# config("cloudant.protocol","http") +spark = SparkSession\ + .builder\ + .appName("Cloudant Spark SQL Example in Python using query")\ + .config("cloudant.host","ACCOUNT.cloudant.com")\ + .config("cloudant.username", "USERNAME")\ + .config("cloudant.password","PASSWORD")\ + .config("jsonstore.rdd.partitions", 8)\ + .config("cloudant.useQuery", "true")\ + .config("schemaSampleSize",1)\ + .getOrCreate() + + +spark.sql(" CREATE TEMPORARY VIEW airportTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')") +airportData = spark.sql("SELECT _id, airportName FROM airportTable1 WHERE airportName == 'Moscow' ") +airportData.printSchema() +print 'Total # of rows in airportData: ' + str(airportData.count()) +airportData.show() + +spark.sql(" CREATE TEMPORARY VIEW airportTable2 USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')") +airportData = spark.sql("SELECT _id, airportName FROM airportTable2 WHERE airportName > 'Moscow' ORDER BY _id") +airportData.printSchema() +print 'Total # of rows in airportData: ' + str(airportData.count()) +airportData.show() + +spark.sql(" CREATE TEMPORARY VIEW airportTable3 USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')") +airportData = spark.sql("SELECT _id, airportName FROM airportTable3 WHERE airportName > 'Moscow' AND airportName < 'Sydney' ORDER BY _id") +airportData.printSchema() +print 'Total # of rows in airportData: ' + str(airportData.count()) +airportData.show() + +spark.sql(" CREATE TEMPORARY VIEW flight1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight')") +flightData = spark.sql("SELECT flightSegmentId, economyClassBaseCost, numFirstClassSeats FROM flight1 WHERE economyClassBaseCost >=200 AND numFirstClassSeats<=10") +flightData.printSchema() +print 'Total # of rows in airportData: ' + str(flightData.count()) +flightData.show() + +spark.sql(" CREATE TEMPORARY VIEW flight2 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight')") +flightData = spark.sql("SELECT flightSegmentId, scheduledDepartureTime, scheduledArrivalTime FROM flight2 WHERE scheduledDepartureTime >='2014-12-15T05:00:00.000Z' AND scheduledArrivalTime <='2014-12-15T11:04:00.000Z'") +flightData.printSchema() +print 'Total # of rows in airportData: ' + str(flightData.count()) +flightData.show() + + http://git-wip-us.apache.org/repos/asf/bahir/blob/fd4c35fc/sql-cloudant/examples/python/CloudantQueryDF.py ---------------------------------------------------------------------- diff --git a/sql-cloudant/examples/python/CloudantQueryDF.py b/sql-cloudant/examples/python/CloudantQueryDF.py new file mode 100644 index 0000000..c8fa296 --- /dev/null +++ b/sql-cloudant/examples/python/CloudantQueryDF.py @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pprint +from pyspark.sql import SparkSession + +# define cloudant related configuration +# set protocol to http if needed, default value=https +# config("cloudant.protocol","http") +spark = SparkSession\ + .builder\ + .appName("Cloudant Spark SQL Example in Python using query")\ + .config("cloudant.host","ACCOUNT.cloudant.com")\ + .config("cloudant.username", "USERNAME")\ + .config("cloudant.password","PASSWORD")\ + .config("jsonstore.rdd.partitions", 8)\ + .config("cloudant.useQuery", "true")\ + .config("schemaSampleSize",1)\ + .getOrCreate() + + +# ***0. Loading dataframe from Cloudant db with one String field condition +df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant") +df.printSchema() +df.filter(df.airportName == 'Moscow').select("_id",'airportName').show() + + +# ***1. Loading dataframe from Cloudant db with one String field condition +df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant") +df.printSchema() +df.filter(df.airportName > 'Moscow').select("_id",'airportName').show() + +# ***2. Loading dataframe from Cloudant db with two String field condition +df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant") +df.printSchema() +df.filter(df.airportName > 'Moscow').filter(df.airportName < 'Sydney').select("_id",'airportName').show() + +# ***3. Loading dataframe from Cloudant db with two int field condition +df = spark.read.load("n_flight", "org.apache.bahir.cloudant") +df.printSchema() +df.filter(df.economyClassBaseCost >= 200).filter(df.numFirstClassSeats <=10).select('flightSegmentId','scheduledDepartureTime', 'scheduledArrivalTime').show() + +# ***4. Loading dataframe from Cloudant db with two timestamp field condition +df = spark.read.load("n_flight", "org.apache.bahir.cloudant") +df.printSchema() +df.filter(df.scheduledDepartureTime >= "2014-12-15T05:00:00.000Z").filter(df.scheduledArrivalTime <="2014-12-15T11:04:00.000Z").select('flightSegmentId','scheduledDepartureTime', 'scheduledArrivalTime').show() + + http://git-wip-us.apache.org/repos/asf/bahir/blob/fd4c35fc/sql-cloudant/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/resources/application.conf b/sql-cloudant/src/main/resources/application.conf index 2d8b236..80dea91 100644 --- a/sql-cloudant/src/main/resources/application.conf +++ b/sql-cloudant/src/main/resources/application.conf @@ -10,5 +10,7 @@ spark-sql { } cloudant = { protocol = https + useQuery = false + queryLimit = 25 } } http://git-wip-us.apache.org/repos/asf/bahir/blob/fd4c35fc/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala index ac14f4b..c4e27b9 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala @@ -34,20 +34,16 @@ class CloudantConfig(val protocol: String, val host: String, (implicit val username: String, val password: String, val partitions: Int, val maxInPartition: Int, val minInPartition: Int, val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int, - val createDBOnSave: Boolean, val selector: String) + val createDBOnSave: Boolean, val selector: String, val useQuery: Boolean = false, + val queryLimit: Int) extends Serializable{ - private val SCHEMA_FOR_ALL_DOCS_NUM = -1 private lazy val dbUrl = {protocol + "://" + host + "/" + dbName} val pkField = "_id" val defaultIndex = "_all_docs" // "_changes" does not work for partition val default_filter: String = "*:*" - def getChangesUrl(): String = { - dbUrl + "/_changes?include_docs=true&feed=normal" - } - def getContinuousChangesUrl(): String = { var url = dbUrl + "/_changes?include_docs=true&feed=continuous&heartbeat=3000" if (selector != null) { @@ -64,11 +60,6 @@ class CloudantConfig(val protocol: String, val host: String, dbUrl } - def getLastUrl(skip: Int): String = { - if (skip ==0 ) null - else s"$dbUrl/$defaultIndex?limit=$skip" - } - def getSchemaSampleSize(): Int = { schemaSampleSize } @@ -77,8 +68,6 @@ class CloudantConfig(val protocol: String, val host: String, createDBOnSave } - def getLastNum(result: JsValue): JsValue = (result \ "last_seq").get - def getTotalUrl(url: String): String = { if (url.contains('?')) { url + "&limit=1" @@ -91,37 +80,24 @@ class CloudantConfig(val protocol: String, val host: String, dbName } - def allowPartition(): Boolean = {indexName==null} + def queryEnabled(): Boolean = {useQuery && indexName==null && viewName==null} - def getOneUrl(): String = { - dbUrl + "/_all_docs?limit=1&include_docs=true" - } + def allowPartition(queryUsed: Boolean): Boolean = {indexName==null && !queryUsed} - def getOneUrlExcludeDDoc1(): String = { - dbUrl + "/_all_docs?endkey=%22_design/%22&limit=1&include_docs=true" - } + def getAllDocsUrl(limit: Int, excludeDDoc: Boolean = false): String = { - def getOneUrlExcludeDDoc2(): String = { - dbUrl + "/_all_docs?startkey=%22_design0/%22&limit=1&include_docs=true" - } - - def getAllDocsUrlExcludeDDoc(limit: Int): String = { if (viewName == null) { - dbUrl + "/_all_docs?startkey=%22_design0/%22&limit=" + limit + "&include_docs=true" - } else { - dbUrl + "/" + viewName + "?limit=1" - } - } - - def getAllDocsUrl(limit: Int): String = { - if (viewName == null) { - if (limit == SCHEMA_FOR_ALL_DOCS_NUM) { - dbUrl + "/_all_docs?include_docs=true" + val baseUrl = ( + if ( excludeDDoc) dbUrl + "/_all_docs?startkey=%22_design0/%22&include_docs=true" + else dbUrl + "/_all_docs?include_docs=true" + ) + if (limit == JsonStoreConfigManager.ALL_DOCS_LIMIT) { + baseUrl } else { - dbUrl + "/_all_docs?limit=" + limit + "&include_docs=true" + baseUrl + "&limit=" + limit } } else { - if (limit == JsonStoreConfigManager.SCHEMA_FOR_ALL_DOCS_NUM) { + if (limit == JsonStoreConfigManager.ALL_DOCS_LIMIT) { dbUrl + "/" + viewName } else { dbUrl + "/" + viewName + "?limit=" + limit @@ -132,22 +108,23 @@ class CloudantConfig(val protocol: String, val host: String, def getRangeUrl(field: String = null, start: Any = null, startInclusive: Boolean = false, end: Any = null, endInclusive: Boolean = false, - includeDoc: Boolean = true): (String, Boolean) = { - val (url: String, pusheddown: Boolean) = - calculate(field, start, startInclusive, end, endInclusive) - if (includeDoc) { + includeDoc: Boolean = true, + allowQuery: Boolean = false): (String, Boolean, Boolean) = { + val (url: String, pusheddown: Boolean, queryUsed: Boolean) = + calculate(field, start, startInclusive, end, endInclusive, allowQuery) + if (includeDoc && !queryUsed ) { if (url.indexOf('?') > 0) { - (url + "&include_docs=true", pusheddown) + (url + "&include_docs=true", pusheddown, queryUsed) } else { - (url + "?include_docs=true", pusheddown) + (url + "?include_docs=true", pusheddown, queryUsed) } } else { - (url, pusheddown) + (url, pusheddown, queryUsed) } } private def calculate(field: String, start: Any, startInclusive: Boolean, - end: Any, endInclusive: Boolean): (String, Boolean) = { + end: Any, endInclusive: Boolean, allowQuery: Boolean): (String, Boolean, Boolean) = { if (field != null && field.equals(pkField)) { var condition = "" if (start != null && end != null && start.equals(end)) { @@ -166,16 +143,18 @@ class CloudantConfig(val protocol: String, val host: String, condition += "endkey=%22" + URLEncoder.encode(end.toString(), "UTF-8") + "%22" } } - (dbUrl + "/_all_docs" + condition, true) + (dbUrl + "/_all_docs" + condition, true, false) } else if (indexName!=null) { // push down to indexName val condition = calculateCondition(field, start, startInclusive, end, endInclusive) - (dbUrl + "/" + indexName + "?q=" + condition, true) + (dbUrl + "/" + indexName + "?q=" + condition, true, false) } else if (viewName != null) { - (dbUrl + "/" + viewName, true) + (dbUrl + "/" + viewName, false, false) + } else if (allowQuery && useQuery) { + (s"$dbUrl/_find", false, true) } else { - (s"$dbUrl/$defaultIndex", false) + (s"$dbUrl/$defaultIndex", false, false) } } @@ -215,20 +194,21 @@ class CloudantConfig(val protocol: String, val host: String, } } - def getSubSetUrl (url: String, skip: Int, limit: Int) - (implicit convertSkip: (Int) => String): String = { + def getSubSetUrl (url: String, skip: Int, limit: Int, queryUsed: Boolean): String = { val suffix = { if (url.indexOf("_all_docs")>0) "include_docs=true&limit=" + limit + "&skip=" + skip - else if (url.indexOf("_changes")>0) "include_docs=true&limit=" + - limit + "&since=" + convertSkip(skip) else if (viewName != null) { "limit=" + limit + "&skip=" + skip + } else if (queryUsed) { + "" } else { "include_docs=true&limit=" + limit } // TODO Index query does not support subset query. Should disable Partitioned loading? } - if (url.indexOf('?') > 0) { + if (suffix.length==0) { + url + } else if (url.indexOf('?') > 0) { url + "&" + suffix } else { @@ -246,8 +226,10 @@ class CloudantConfig(val protocol: String, val host: String, } } - def getRows(result: JsValue): Seq[JsValue] = { - if (viewName == null) { + def getRows(result: JsValue, queryUsed: Boolean): Seq[JsValue] = { + if ( queryUsed ) { + ((result \ "docs").as[JsArray]).value.map(row => row) + } else if ( viewName == null) { ((result \ "rows").as[JsArray]).value.map(row => (row \ "doc").get) } else { ((result \ "rows").as[JsArray]).value.map(row => row) http://git-wip-us.apache.org/repos/asf/bahir/blob/fd4c35fc/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala index 4c973f7..deab22a 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -import org.apache.bahir.cloudant.common.{FilterInterpreter, JsonStoreDataAccess, JsonStoreRDD, _} +import org.apache.bahir.cloudant.common.{JsonStoreDataAccess, JsonStoreRDD, _} case class CloudantReadWriteRelation (config: CloudantConfig, schema: StructType, @@ -49,23 +49,11 @@ case class CloudantReadWriteRelation (config: CloudantConfig, allDocsDF.select(requiredColumns(0), colsExceptCol0: _*).rdd } } else { - val filterInterpreter = new FilterInterpreter(filters) - var searchField: String = { - if (filterInterpreter.containsFiltersFor(config.pkField)) { - config.pkField - } else { - filterInterpreter.firstField - } - } - - val (min, minInclusive, max, maxInclusive) = filterInterpreter.getInfo(searchField) - implicit val columns = requiredColumns - val (url: String, pusheddown: Boolean) = config.getRangeUrl(searchField, - min, minInclusive, max, maxInclusive, false) - if (!pusheddown) searchField = null - implicit val attrToFilters = filterInterpreter.getFiltersForPostProcess(searchField) + implicit val columns : Array[String] = requiredColumns + implicit val origFilters : Array[Filter] = filters - val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config, url) + logger.info("buildScan:" + columns + "," + origFilters) + val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) val df = sqlContext.read.json(cloudantRDD) if (colsLength > 1) { val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i) @@ -117,16 +105,10 @@ class DefaultSource extends RelationProvider inSchema } else { val df = if (config.getSchemaSampleSize() == - JsonStoreConfigManager.SCHEMA_FOR_ALL_DOCS_NUM && + JsonStoreConfigManager.ALL_DOCS_LIMIT && config.viewName == null && config.indexName == null) { - val filterInterpreter = new FilterInterpreter(null) - var searchField = null - val (min, minInclusive, max, maxInclusive) = - filterInterpreter.getInfo(searchField) - val (url: String, pusheddown: Boolean) = config.getRangeUrl(searchField, - min, minInclusive, max, maxInclusive, false) - val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config, url) + val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) allDocsDF = sqlContext.read.json(cloudantRDD) allDocsDF } else { http://git-wip-us.apache.org/repos/asf/bahir/blob/fd4c35fc/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala index 92192bb..38c5006 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala @@ -25,13 +25,14 @@ import org.apache.bahir.cloudant.CloudantConfig object JsonStoreConfigManager { val CLOUDANT_CONNECTOR_VERSION = "2.0.0" - val SCHEMA_FOR_ALL_DOCS_NUM = -1 + val ALL_DOCS_LIMIT = -1 private val CLOUDANT_HOST_CONFIG = "cloudant.host" private val CLOUDANT_USERNAME_CONFIG = "cloudant.username" private val CLOUDANT_PASSWORD_CONFIG = "cloudant.password" private val CLOUDANT_PROTOCOL_CONFIG = "cloudant.protocol" - + private val USE_QUERY_CONFIG = "cloudant.useQuery" + private val QUERY_LIMIT_CONFIG = "cloudant.queryLimit" private val PARTITION_CONFIG = "jsonstore.rdd.partitions" private val MAX_IN_PARTITION_CONFIG = "jsonstore.rdd.maxInPartition" @@ -39,7 +40,7 @@ import org.apache.bahir.cloudant.CloudantConfig private val REQUEST_TIMEOUT_CONFIG = "jsonstore.rdd.requestTimeout" private val BULK_SIZE_CONFIG = "bulkSize" private val SCHEMA_SAMPLE_SIZE_CONFIG = "schemaSampleSize" - private val CREATE_DB_ON_SAVE = "createDBOnSave" + private val CREATE_DB_ON_SAVE_CONFIG = "createDBOnSave" private val configFactory = ConfigFactory.load() @@ -139,6 +140,10 @@ import org.apache.bahir.cloudant.CloudantConfig def getConfig(context: SQLContext, parameters: Map[String, String]): CloudantConfig = { val sparkConf = context.sparkContext.getConf + getConfig(sparkConf, parameters) + } + + def getConfig (sparkConf: SparkConf, parameters: Map[String, String]): CloudantConfig = { implicit val total = getInt(sparkConf, parameters, PARTITION_CONFIG) implicit val max = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG) @@ -146,67 +151,28 @@ import org.apache.bahir.cloudant.CloudantConfig implicit val requestTimeout = getLong(sparkConf, parameters, REQUEST_TIMEOUT_CONFIG) implicit val bulkSize = getInt(sparkConf, parameters, BULK_SIZE_CONFIG) implicit val schemaSampleSize = getInt(sparkConf, parameters, SCHEMA_SAMPLE_SIZE_CONFIG) - implicit val createDBOnSave = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE) + implicit val createDBOnSave = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE_CONFIG) + + implicit val useQuery = getBool(sparkConf, parameters, USE_QUERY_CONFIG) + implicit val queryLimit = getInt(sparkConf, parameters, QUERY_LIMIT_CONFIG) val dbName = parameters.getOrElse("database", parameters.getOrElse("path", null)) val indexName = parameters.getOrElse("index", null) val viewName = parameters.getOrElse("view", null) - - // FIXME: Add logger - // scalastyle:off println - println(s"Use connectorVersion=$CLOUDANT_CONNECTOR_VERSION, dbName=$dbName, " + - s"indexName=$indexName, viewName=$viewName," + - s"$PARTITION_CONFIG=$total, $MAX_IN_PARTITION_CONFIG=$max," + - s"$MIN_IN_PARTITION_CONFIG=$min, $REQUEST_TIMEOUT_CONFIG=$requestTimeout," + - s"$BULK_SIZE_CONFIG=$bulkSize, $SCHEMA_SAMPLE_SIZE_CONFIG=$schemaSampleSize") - // scalastyle:on println + val selector = parameters.getOrElse("selector", null) val protocol = getString(sparkConf, parameters, CLOUDANT_PROTOCOL_CONFIG) val host = getString( sparkConf, parameters, CLOUDANT_HOST_CONFIG) val user = getString(sparkConf, parameters, CLOUDANT_USERNAME_CONFIG) val passwd = getString(sparkConf, parameters, CLOUDANT_PASSWORD_CONFIG) - val selector = getString(sparkConf, parameters, "selector") if (host != null) { new CloudantConfig(protocol, host, dbName, indexName, viewName) (user, passwd, total, max, min, requestTimeout, bulkSize, - schemaSampleSize, createDBOnSave, selector) + schemaSampleSize, createDBOnSave, selector, useQuery, queryLimit) } else { throw new RuntimeException("Spark configuration is invalid! " + "Please make sure to supply required values for cloudant.host.") } } - - def getConfig(sparkConf: SparkConf, parameters: Map[String, String]): CloudantConfig = { - - implicit val total = getInt(sparkConf, parameters, PARTITION_CONFIG) - implicit val max = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG) - implicit val min = getInt(sparkConf, parameters, MIN_IN_PARTITION_CONFIG) - implicit val requestTimeout = getLong(sparkConf, parameters, REQUEST_TIMEOUT_CONFIG) - implicit val bulkSize = getInt(sparkConf, parameters, BULK_SIZE_CONFIG) - implicit val schemaSampleSize = getInt(sparkConf, parameters, SCHEMA_SAMPLE_SIZE_CONFIG) - implicit val createDBOnSave = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE) - - val dbName = parameters.getOrElse("database", null) - - // scalastyle:off println - println(s"Use connectorVersion=$CLOUDANT_CONNECTOR_VERSION, dbName=$dbName, " + - s"$REQUEST_TIMEOUT_CONFIG=$requestTimeout") - // scalastyle:on println - - val protocol = getString(sparkConf, parameters, CLOUDANT_PROTOCOL_CONFIG) - val host = getString( sparkConf, parameters, CLOUDANT_HOST_CONFIG) - val user = getString(sparkConf, parameters, CLOUDANT_USERNAME_CONFIG) - val passwd = getString(sparkConf, parameters, CLOUDANT_PASSWORD_CONFIG) - val selector = getString(sparkConf, parameters, "selector") - - if (host != null) { - new CloudantConfig(protocol, host, dbName)(user, passwd, - total, max, min, requestTimeout, bulkSize, - schemaSampleSize, createDBOnSave, selector) - } else { - throw new RuntimeException("Cloudant parameters are invalid!" + - "Please make sure to supply required values for cloudant.host.") - } - } } http://git-wip-us.apache.org/repos/asf/bahir/blob/fd4c35fc/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala index e84a44c..ac79359 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala @@ -39,19 +39,6 @@ class JsonStoreDataAccess (config: CloudantConfig) { lazy val logger = LoggerFactory.getLogger(getClass) implicit lazy val timeout = config.requestTimeout - def getOne()( implicit columns: Array[String] = null): Seq[String] = { - var r = this.getQueryResult[Seq[String]](config.getOneUrlExcludeDDoc1(), processAll) - if (r.size == 0 ) { - r = this.getQueryResult[Seq[String]](config.getOneUrlExcludeDDoc2(), processAll) - } - if (r.size == 0) { - throw new RuntimeException("Database " + config.getDbname() + - " doesn't have any non-design documents!") - } else { - r - } - } - def getMany(limit: Int)(implicit columns: Array[String] = null): Seq[String] = { if (limit == 0) { throw new RuntimeException("Database " + config.getDbname() + @@ -63,7 +50,7 @@ class JsonStoreDataAccess (config: CloudantConfig) { } var r = this.getQueryResult[Seq[String]](config.getAllDocsUrl(limit), processAll) if (r.size == 0) { - r = this.getQueryResult[Seq[String]](config.getAllDocsUrlExcludeDDoc(limit), processAll) + r = this.getQueryResult[Seq[String]](config.getAllDocsUrl(limit, true), processAll) } if (r.size == 0) { throw new RuntimeException("Database " + config.getDbname() + @@ -74,40 +61,34 @@ class JsonStoreDataAccess (config: CloudantConfig) { } def getAll[T](url: String) - (implicit columns: Array[String] = null, - attrToFilters: Map[String, Array[Filter]] = null): Seq[String] = { + (implicit columns: Array[String] = null): Seq[String] = { this.getQueryResult[Seq[String]](url, processAll) } def getIterator(skip: Int, limit: Int, url: String) (implicit columns: Array[String] = null, - attrToFilters: Map[String, Array[Filter]] = null): Iterator[String] = { - implicit def convertSkip(skip: Int): String = { - val url = config.getLastUrl(skip) - if (url == null) { - skip.toString() - } else { - this.getQueryResult[String](url, - { result => config.getLastNum(Json.parse(result)).as[JsString].value}) - } - } - val newUrl = config.getSubSetUrl(url, skip, limit) + postData: String = null): Iterator[String] = { + val newUrl = config.getSubSetUrl(url, skip, limit, postData!=null) this.getQueryResult[Iterator[String]](newUrl, processIterator) } - def getTotalRows(url: String): Int = { - val totalUrl = config.getTotalUrl(url) - this.getQueryResult[Int](totalUrl, - { result => config.getTotalRows(Json.parse(result))}) + def getTotalRows(url: String, queryUsed: Boolean) + (implicit postData: String = null): Int = { + if (queryUsed) config.queryLimit // Query can not retrieve total row now. + else { + val totalUrl = config.getTotalUrl(url) + this.getQueryResult[Int](totalUrl, + { result => config.getTotalRows(Json.parse(result))}) + } } private def processAll (result: String) (implicit columns: Array[String], - attrToFilters: Map[String, Array[Filter]] = null) = { - logger.debug(s"processAll columns:$columns, attrToFilters:$attrToFilters") + postData: String = null) = { + logger.debug(s"processAll:$result, columns:$columns") val jsonResult: JsValue = Json.parse(result) - var rows = config.getRows(jsonResult) - if (config.viewName == null) { + var rows = config.getRows(jsonResult, postData!=null ) + if (config.viewName == null && postData==null) { // filter design docs rows = rows.filter(r => FilterDDocs.filter(r)) } @@ -116,7 +97,7 @@ class JsonStoreDataAccess (config: CloudantConfig) { private def processIterator (result: String) (implicit columns: Array[String], - attrToFilters: Map[String, Array[Filter]] = null): Iterator[String] = { + postData: String = null): Iterator[String] = { processAll(result).iterator } @@ -137,23 +118,39 @@ class JsonStoreDataAccess (config: CloudantConfig) { getQueryResult(url, processResults) } - private def getQueryResult[T] (url: String, postProcessor: (String) => T) (implicit columns: Array[String] = null, - attrToFilters: Map[String, Array[Filter]] = null) : T = { - logger.warn("Loading data from Cloudant using query: " + url) + postData: String = null) : T = { + logger.info(s"Loading data from Cloudant using: $url , postData: $postData") val requestTimeout = config.requestTimeout.toInt val clRequest: HttpRequest = config.username match { case null => - Http(url) + if (postData!=null) { + Http(url) + .postData(postData) + .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout) + .header("Content-Type", "application/json") + .header("User-Agent", "spark-cloudant") + } else { + Http(url) .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout) .header("User-Agent", "spark-cloudant") + } case _ => - Http(url) + if (postData!=null) { + Http(url) + .postData(postData) + .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout) + .header("Content-Type", "application/json") + .header("User-Agent", "spark-cloudant") + .auth(config.username, config.password) + } else { + Http(url) .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout) .header("User-Agent", "spark-cloudant") .auth(config.username, config.password) + } } val clResponse: HttpResponse[String] = clRequest.execute() http://git-wip-us.apache.org/repos/asf/bahir/blob/fd4c35fc/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala index 46774f5..46ba912 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala @@ -17,12 +17,13 @@ package org.apache.bahir.cloudant.common import org.slf4j.LoggerFactory +import play.api.libs.json.{JsNull, Json, JsString, JsValue} import org.apache.spark.Partition import org.apache.spark.SparkContext import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources._ import org.apache.bahir.cloudant.CloudantConfig @@ -31,9 +32,9 @@ import org.apache.bahir.cloudant.CloudantConfig * the limit rows returns and the skipped rows. */ -private[cloudant] class JsonStoreRDDPartition(val skip: Int, val limit: Int, - val idx: Int, val config: CloudantConfig, - val attrToFilters: Map[String, Array[Filter]]) +private[cloudant] class JsonStoreRDDPartition(val url: String, val skip: Int, val limit: Int, + val idx: Int, val config: CloudantConfig, val selector: JsValue, val fields: JsValue, + val queryUsed: Boolean) extends Partition with Serializable{ val index = idx } @@ -46,16 +47,15 @@ private[cloudant] class JsonStoreRDDPartition(val skip: Int, val limit: Int, * and minInPartition / maxInPartition ) * maxRowsInPartition: -1 means unlimited */ -class JsonStoreRDD(sc: SparkContext, config: CloudantConfig, - url: String)(implicit requiredcolumns: Array[String] = null, - attrToFilters: Map[String, Array[Filter]] = null) +class JsonStoreRDD(sc: SparkContext, config: CloudantConfig) + (implicit requiredcolumns: Array[String] = null, + filters: Array[Filter] = null) extends RDD[String](sc, Nil) { - lazy val totalRows = { - new JsonStoreDataAccess(config).getTotalRows(url) - } - lazy val totalPartition = { - if (totalRows == 0 || ! config.allowPartition() ) 1 + private val logger = LoggerFactory.getLogger(getClass) + + private def getTotalPartition(totalRows: Int, queryUsed: Boolean): Int = { + if (totalRows == 0 || ! config.allowPartition(queryUsed) ) 1 else if (totalRows < config.partitions * config.minInPartition) { val total = totalRows / config.minInPartition if (total == 0 ) { @@ -76,7 +76,7 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig, } } - lazy val limitPerPartition = { + private def getLimitPerPartition(totalRows: Int, totalPartition: Int): Int = { val limit = totalRows/totalPartition if (totalRows % totalPartition != 0) { limit + 1 @@ -85,22 +85,115 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig, } } + private def convertToMangoJson(f: Filter): (String, JsValue) = { + val (op, value): (String, Any) = f match { + case EqualTo(attr, v) => ("$eq", v) + case GreaterThan(attr, v) => ("$gt", v) + case LessThan(attr, v) => ("$lt", v) + case GreaterThanOrEqual(attr, v) => ("$gte", v) + case LessThanOrEqual(attr, v) => ("$lte", v) + case _ => (null, null) + } + val convertedV: JsValue = { + // TODO Better handing of other types + if (value != null) { + value match { + case s: String => Json.toJson(s) + case l: Long => Json.toJson(l) + case d: Double => Json.toJson(d) + case i: Int => Json.toJson(i) + case b: Boolean => Json.toJson(b) + case t: java.sql.Timestamp => Json.toJson(t) + case a: Any => logger.debug(s"Ignore field:$name, cannot handle its datatype: $a"); null + } + } else null + } + (op, convertedV) + } + + private def convertAttrToMangoJson(filters: Array[Filter]): Map[String, JsValue] = { + filters.map(af => convertToMangoJson(af)) + .filter(x => x._2 != null) + .toMap + } + override def getPartitions: Array[Partition] = { - val logger = LoggerFactory.getLogger(getClass) + + logger.info("getPartitions:" + requiredcolumns + "," + filters) + + val filterInterpreter = new FilterInterpreter(filters) + val origAttrToFilters = ( if (filters==null || filters.length==0) null + else filterInterpreter.getFiltersForPostProcess(null)) + + val (selector, fields) : (JsValue, JsValue) = { + if (!config.queryEnabled() || origAttrToFilters == null) (null, null) + else { + val selectors: Map[String, Map[String, JsValue]] = + origAttrToFilters.transform( (name, attrFilters) => convertAttrToMangoJson(attrFilters)) + val filteredSelectors = selectors.filter((t) => ! t._2.isEmpty) + + if (! filteredSelectors.isEmpty) { + val queryColumns = ( + if (requiredcolumns == null || requiredcolumns.size == 0) null + else Json.toJson(requiredcolumns)) + (Json.toJson(filteredSelectors), queryColumns) + } else (null, null) + } + } + + logger.info("calculated selector and fields:" + selector + "," + fields) + + var searchField: String = { + if (origAttrToFilters ==null ) null + else if (filterInterpreter.containsFiltersFor(config.pkField)) { + config.pkField + } else { + filterInterpreter.firstField + } + } + + val (min, minInclusive, max, maxInclusive) = filterInterpreter.getInfo(searchField) + val (url: String, pusheddown: Boolean, queryUsed: Boolean) = config.getRangeUrl(searchField, + min, minInclusive, max, maxInclusive, false, selector!=null) + + implicit val postData : String = { + if (queryUsed) { + Json.stringify(Json.obj("selector" -> selector, "limit" -> 1)) + } else { + null + } + } + val totalRows = new JsonStoreDataAccess(config).getTotalRows(url, queryUsed) + val totalPartition = getTotalPartition(totalRows, queryUsed) + val limitPerPartition = getLimitPerPartition(totalRows, totalPartition) + logger.info(s"Partition config - total=$totalPartition, " + s"limit=$limitPerPartition for totalRows of $totalRows") - (0 until totalPartition).map(i => { + logger.info(s"Partition query info - url=$url, queryUsed=$queryUsed") + + (0 until totalPartition).map(i => { val skip = i * limitPerPartition - new JsonStoreRDDPartition(skip, limitPerPartition, i, config, - attrToFilters).asInstanceOf[Partition] + new JsonStoreRDDPartition(url, skip, limitPerPartition, i, + config, selector, fields, queryUsed).asInstanceOf[Partition] }).toArray } override def compute(splitIn: Partition, context: TaskContext): Iterator[String] = { val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition] + implicit val postData : String = { + if (myPartition.queryUsed && myPartition.fields !=null) { + Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> myPartition.fields, + "limit" -> myPartition.limit, "skip" -> myPartition.skip)) + } else if (myPartition.queryUsed) { + Json.stringify(Json.obj("selector" -> myPartition.selector, "limit" -> myPartition.limit, + "skip" -> myPartition.skip)) + } else { + null + } + } new JsonStoreDataAccess(myPartition.config).getIterator(myPartition.skip, - myPartition.limit, url) + myPartition.limit, myPartition.url) } }