Repository: bahir Updated Branches: refs/heads/master 785ee1e1a -> 6ea42a896
[BAHIR-154] Refactor sql-cloudant to use cloudant-client library - Use java-cloudantâs executeRequest for HTTP requests against _all_docs endpoint - Added HTTP 429 backoff with default settings - Simplified caught exception and message for schema size - Replaced scala http library with okhttp library for changes receiver - Updated streaming CloudantReceiver class to use improved ChangesRowScanner method - Replaced Play JSON with GSON library - Updated save operation to use java-cloudant bulk API - Use _changes feed filter option for Cloudant/CouchDB 2.x and greater Closes #61 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/6ea42a89 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/6ea42a89 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/6ea42a89 Branch: refs/heads/master Commit: 6ea42a8965d98773a342ad5cd31aab6b64e9d9bd Parents: 785ee1e Author: Esteban Laver <[email protected]> Authored: Sun Dec 17 13:30:04 2017 -0500 Committer: Luciano Resende <[email protected]> Committed: Fri Jan 26 07:43:07 2018 -0800 ---------------------------------------------------------------------- sql-cloudant/README.md | 1 + sql-cloudant/pom.xml | 21 +-- .../src/main/resources/application.conf | 1 + .../bahir/cloudant/CloudantChangesConfig.scala | 15 +- .../apache/bahir/cloudant/CloudantConfig.scala | 136 +++++++++------ .../bahir/cloudant/CloudantReceiver.scala | 77 ++++----- .../apache/bahir/cloudant/DefaultSource.scala | 13 +- .../bahir/cloudant/common/FilterUtil.scala | 15 +- .../common/JsonStoreConfigManager.scala | 63 ++++--- .../cloudant/common/JsonStoreDataAccess.scala | 172 ++++++------------- .../bahir/cloudant/common/JsonStoreRDD.scala | 55 +++--- .../apache/bahir/cloudant/common/JsonUtil.scala | 33 ++-- .../cloudant/internal/ChangesReceiver.scala | 69 ++++---- .../bahir/cloudant/CloudantOptionSuite.scala | 33 +++- 14 files changed, 358 insertions(+), 346 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/README.md ---------------------------------------------------------------------- diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md index 160e279..b651990 100644 --- a/sql-cloudant/README.md +++ b/sql-cloudant/README.md @@ -63,6 +63,7 @@ cloudant.protocol|https|protocol to use to transfer data: http or https cloudant.host| |cloudant host url cloudant.username| |cloudant userid cloudant.password| |cloudant password +cloudant.numberOfRetries|3| number of times to replay a request that received a 429 `Too Many Requests` response cloudant.useQuery|false|by default, `_all_docs` endpoint is used if configuration 'view' and 'index' (see below) are not set. When useQuery is enabled, `_find` endpoint will be used in place of `_all_docs` when query condition is not on primary key field (_id), so that query predicates may be driven into datastore. cloudant.queryLimit|25|the maximum number of results returned when querying the `_find` endpoint. cloudant.storageLevel|MEMORY_ONLY|the storage level for persisting Spark RDDs during load when `cloudant.endpoint` is set to `_changes`. See [RDD Persistence section](https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence) in Spark's Progamming Guide for all available storage level options. http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/pom.xml ---------------------------------------------------------------------- diff --git a/sql-cloudant/pom.xml b/sql-cloudant/pom.xml index 55a5210..672418a 100644 --- a/sql-cloudant/pom.xml +++ b/sql-cloudant/pom.xml @@ -36,8 +36,14 @@ <dependencies> <dependency> - <groupId>com.typesafe.play</groupId> - <artifactId>play-json_${scala.binary.version}</artifactId> + <groupId>com.cloudant</groupId> + <artifactId>cloudant-client</artifactId> + <version>2.11.0</version> + </dependency> + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>okhttp</artifactId> + <version>3.9.0</version> </dependency> <dependency> <groupId>com.typesafe</groupId> @@ -45,11 +51,6 @@ <version>1.3.1</version> </dependency> <dependency> - <groupId>org.joda</groupId> - <artifactId>joda-convert</artifactId> - <version>1.8.1</version> - </dependency> - <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.6.7</version> @@ -106,12 +107,6 @@ <artifactId>scalacheck_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>com.cloudant</groupId> - <artifactId>cloudant-client</artifactId> - <version>2.11.0</version> - <scope>test</scope> - </dependency> </dependencies> <build> <resources> http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/resources/application.conf b/sql-cloudant/src/main/resources/application.conf index 6ff4139..7f7beaf 100644 --- a/sql-cloudant/src/main/resources/application.conf +++ b/sql-cloudant/src/main/resources/application.conf @@ -11,6 +11,7 @@ spark-sql { cloudant = { batchInterval = 8 endpoint = "_all_docs" + numberOfRetries = 3 protocol = https useQuery = false queryLimit = 25 http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala index 9f2a7ba..0615e16 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala @@ -27,10 +27,10 @@ class CloudantChangesConfig(protocol: String, host: String, dbName: String, bulkSize: Int, schemaSampleSize: Int, createDBOnSave: Boolean, endpoint: String, selector: String, timeout: Int, storageLevel: StorageLevel, useQuery: Boolean, - queryLimit: Int, batchInterval: Int) + queryLimit: Int, batchInterval: Int, numberOfRetries: Int) extends CloudantConfig(protocol, host, dbName, indexName, viewName)(username, password, partitions, maxInPartition, minInPartition, requestTimeout, bulkSize, schemaSampleSize, - createDBOnSave, endpoint, useQuery, queryLimit) { + createDBOnSave, endpoint, useQuery, queryLimit, numberOfRetries) { override val defaultIndex: String = endpoint @@ -42,9 +42,14 @@ class CloudantChangesConfig(protocol: String, host: String, dbName: String, if (selector != null && !selector.isEmpty) { selector } else { - // Exclude design docs and deleted=true docs - "{ \"_id\": { \"$regex\": \"^(?!_design/)\" }, " + - "\"_deleted\": { \"$exists\": false } }" + val version = getClient.serverVersion + if (version.matches("1.*")) { + null + } else { + // Exclude design docs and deleted=true docs + "{ \"_id\": { \"$regex\": \"^(?!_design/)\" }, " + + "\"_deleted\": { \"$exists\": false } }" + } } } http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala index 8affd4f..c7370f0 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala @@ -16,11 +16,21 @@ */ package org.apache.bahir.cloudant -import java.net.URLEncoder +import java.net.{URL, URLEncoder} -import play.api.libs.json.{JsArray, JsObject, Json, JsValue} +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.reflect.io.File + +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database} +import com.cloudant.client.api.model.SearchResult +import com.cloudant.client.api.views._ +import com.cloudant.http.{Http, HttpConnection} +import com.cloudant.http.interceptors.Replay429Interceptor +import com.google.gson.{JsonObject, JsonParser} import org.apache.bahir.cloudant.common._ +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter /* * Only allow one field pushdown now @@ -28,22 +38,48 @@ import org.apache.bahir.cloudant.common._ */ class CloudantConfig(val protocol: String, val host: String, - val dbName: String, val indexName: String, val viewName: String) + val dbName: String, val indexPath: String, val viewPath: String) (implicit val username: String, val password: String, val partitions: Int, val maxInPartition: Int, val minInPartition: Int, val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int, val createDBOnSave: Boolean, val endpoint: String, - val useQuery: Boolean = false, val queryLimit: Int) + val useQuery: Boolean = false, val queryLimit: Int, + val numberOfRetries: Int) extends Serializable { + @transient private lazy val client: CloudantClient = ClientBuilder + .url(getClientUrl) + .username(username) + .password(password) + .interceptors(new Replay429Interceptor(numberOfRetries, 250L)) + .build + @transient private lazy val database: Database = client.database(dbName, false) lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName} val pkField = "_id" val defaultIndex: String = endpoint val default_filter: String = "*:*" - def getDbUrl: String = { - dbUrl + def executeRequest(stringUrl: String, postData: String = null): HttpConnection = { + val url = new URL(stringUrl) + if(postData != null) { + val conn = Http.POST(url, "application/json") + conn.setRequestBody(postData) + conn.requestProperties.put("User-Agent", "spark-cloudant") + client.executeRequest(conn) + } else { + val conn = Http.GET(url) + conn.requestProperties.put("User-Agent", "spark-cloudant") + client.executeRequest(conn) + } + } + + def getClient: CloudantClient = { + client + } + + def getDatabase: Database = { + database } def getSchemaSampleSize: Int = { @@ -54,20 +90,20 @@ class CloudantConfig(val protocol: String, val host: String, createDBOnSave } - def getLastNum(result: JsValue): JsValue = (result \ "last_seq").get + def getClientUrl: URL = { + new URL(protocol + "://" + host) + } + + def getLastNum(result: JsonObject): JsonObject = result.get("last_seq").getAsJsonObject /* Url containing limit for docs in a Cloudant database. * If a view is not defined, use the _all_docs endpoint. * @return url with one doc limit for retrieving total doc count */ def getUrl(limit: Int, excludeDDoc: Boolean = false): String = { - if (viewName == null) { + if (viewPath == null) { val baseUrl = { - if (excludeDDoc) { - dbUrl + "/_all_docs?startkey=%22_design0/%22&include_docs=true" - } else { - dbUrl + "/_all_docs?include_docs=true" - } + dbUrl + "/_all_docs?include_docs=true" } if (limit == JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) { baseUrl @@ -76,16 +112,16 @@ class CloudantConfig(val protocol: String, val host: String, } } else { if (limit == JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) { - dbUrl + "/" + viewName + dbUrl + "/" + viewPath } else { - dbUrl + "/" + viewName + "?limit=" + limit + dbUrl + "/" + viewPath + "?limit=" + limit } } } - /* Url containing limit to count total docs in a Cloudant database. + /* Total count of documents in a Cloudant database. * - * @return url with one doc limit for retrieving total doc count + * @return total doc count number */ def getTotalUrl(url: String): String = { if (url.contains('?')) { @@ -100,10 +136,10 @@ class CloudantConfig(val protocol: String, val host: String, } def queryEnabled: Boolean = { - useQuery && indexName == null && viewName == null + useQuery && indexPath == null && viewPath == null } - def allowPartition(queryUsed: Boolean): Boolean = {indexName==null && !queryUsed} + def allowPartition(queryUsed: Boolean): Boolean = {indexPath == null && !queryUsed} def getRangeUrl(field: String = null, start: Any = null, startInclusive: Boolean = false, end: Any = null, @@ -112,7 +148,7 @@ class CloudantConfig(val protocol: String, val host: String, allowQuery: Boolean = false): (String, Boolean, Boolean) = { val (url: String, pusheddown: Boolean, queryUsed: Boolean) = calculate(field, start, startInclusive, end, endInclusive, allowQuery) - if (includeDoc && !queryUsed ) { + if (includeDoc && !queryUsed) { if (url.indexOf('?') > 0) { (url + "&include_docs=true", pusheddown, queryUsed) } else { @@ -123,14 +159,12 @@ class CloudantConfig(val protocol: String, val host: String, } } - /* - * Url for paging using skip and limit options when loading docs with partitions. - */ + def getSubSetUrl(url: String, skip: Int, limit: Int, queryUsed: Boolean): String = { val suffix = { if (url.indexOf(JsonStoreConfigManager.ALL_DOCS_INDEX) > 0) { "include_docs=true&limit=" + limit + "&skip=" + skip - } else if (viewName != null) { + } else if (viewPath != null) { "limit=" + limit + "&skip=" + skip } else if (queryUsed) { "" @@ -170,13 +204,13 @@ class CloudantConfig(val protocol: String, val host: String, } } (dbUrl + "/" + defaultIndex + condition, true, false) - } else if (indexName != null) { + } else if (indexPath != null) { // push down to indexName val condition = calculateCondition(field, start, startInclusive, end, endInclusive) - (dbUrl + "/" + indexName + "?q=" + condition, true, false) - } else if (viewName != null) { - (dbUrl + "/" + viewName, false, false) + (dbUrl + "/" + indexPath + "?q=" + condition, true, false) + } else if (viewPath != null) { + (dbUrl + "/" + viewPath, false, false) } else if (allowQuery && useQuery) { (s"$dbUrl/_find", false, true) } else { @@ -220,39 +254,41 @@ class CloudantConfig(val protocol: String, val host: String, } } - def getTotalRows(result: JsValue): Int = { - val resultKeys = result.as[JsObject].keys - if(resultKeys.contains("total_rows")) { - (result \ "total_rows").as[Int] - } else if (resultKeys.contains("pending")) { - (result \ "pending").as[Int] + 1 + def getResultCount(result: String): Int = { + val jsonResult: JsonObject = new JsonParser().parse(result).getAsJsonObject + if (jsonResult.has("total_rows")) { + jsonResult.get("total_rows").getAsInt + } else if (jsonResult.has("pending")) { + jsonResult.get("pending").getAsInt + 1 } else { 1 } } - def getRows(result: JsValue, queryUsed: Boolean): Seq[JsValue] = { - if ( queryUsed ) { - (result \ "docs").as[JsArray].value.map(row => row) + def getRows(result: String, queryUsed: Boolean): Seq[JsonObject] = { + val jsonResult: JsonObject = new JsonParser().parse(result).getAsJsonObject + if (queryUsed) { + if (jsonResult.has("docs")) { + jsonResult.get("docs").getAsJsonArray.asScala + .map(row => row.getAsJsonObject).toSeq + } else { + Seq() + } } else { - val containsResultsKey: Boolean = result.as[JsObject].keys.contains("results") - if (containsResultsKey) { - (result \ "results").as[JsArray].value.map(row => (row \ "doc").get) - } else if (viewName == null) { - (result \ "rows").as[JsArray].value.map(row => (row \ "doc").get) + if (jsonResult.has("results")) { + jsonResult.get("result").getAsJsonArray.asScala.map(row => row.getAsJsonObject + .get("doc").getAsJsonObject).toSeq + } else if (viewPath == null) { + jsonResult.get("rows").getAsJsonArray.asScala.map(row => row.getAsJsonObject + .get("doc").getAsJsonObject).toSeq } else { - (result \ "rows").as[JsArray].value.map(row => row) + jsonResult.get("rows").getAsJsonArray.asScala.map(row => row.getAsJsonObject).toSeq } } } - def getBulkPostUrl: String = { - dbUrl + "/_bulk_docs" - } - - def getBulkRows(rows: List[String]): String = { - val docs = rows.map { x => Json.parse(x) } - Json.stringify(Json.obj("docs" -> Json.toJson(docs.toSeq))) + def getBulkRows(rows: List[String]): List[JsonObject] = { + rows.map { x => JsonConverter.toJson(x).getAsJsonObject } } def getConflictErrStr: String = { http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala index 60a7d4a..029cabc 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala @@ -16,8 +16,10 @@ */ package org.apache.bahir.cloudant -import play.api.libs.json.Json -import scalaj.http._ +import java.io.{BufferedReader, InputStreamReader} +import java.util.concurrent.TimeUnit + +import okhttp3._ import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel @@ -42,50 +44,43 @@ class CloudantReceiver(sparkConf: SparkConf, cloudantParams: Map[String, String] } private def receive(): Unit = { - val url = config.getContinuousChangesUrl.toString - val selector: String = if (config.getSelector != null) { - "{\"selector\":" + config.getSelector + "}" - } else { - "{}" - } + val okHttpClient: OkHttpClient = new OkHttpClient.Builder() + .connectTimeout(5, TimeUnit.SECONDS) + .readTimeout(60, TimeUnit.SECONDS) + .build + val url = config.getChangesReceiverUrl.toString - val clRequest: HttpRequest = config.username match { - case null => - Http(url) - .postData(selector) - .timeout(connTimeoutMs = 1000, readTimeoutMs = 0) - .header("Content-Type", "application/json") - .header("User-Agent", "spark-cloudant") - case _ => - Http(url) - .postData(selector) - .timeout(connTimeoutMs = 1000, readTimeoutMs = 0) - .header("Content-Type", "application/json") - .header("User-Agent", "spark-cloudant") - .auth(config.username, config.password) + val builder = new Request.Builder().url(url) + if (config.username != null) { + val credential = Credentials.basic(config.username, config.password) + builder.header("Authorization", credential) + } + if(config.getSelector != null) { + val jsonType = MediaType.parse("application/json; charset=utf-8") + val selector = "{\"selector\":" + config.getSelector + "}" + val selectorBody = RequestBody.create(jsonType, selector) + builder.post(selectorBody) } - clRequest.exec((code, headers, is) => { - if (code == 200) { - scala.io.Source.fromInputStream(is, "utf-8").getLines().foreach(line => { - if (line.length() > 0) { - val json = Json.parse(line) - val jsonDoc = (json \ "doc").getOrElse(null) - var doc = "" - if(jsonDoc != null) { - doc = Json.stringify(jsonDoc) - if(!isStopped() && doc.nonEmpty) { - store(doc) - } - } + val request = builder.build + val response = okHttpClient.newCall(request).execute + val status_code = response.code + + if (status_code == 200) { + val changesInputStream = response.body.byteStream + var json = new ChangesRow() + if (changesInputStream != null) { + val bufferedReader = new BufferedReader(new InputStreamReader(changesInputStream)) + while ((json = ChangesRowScanner.readRowFromReader(bufferedReader)) != null) { + if (!isStopped() && json != null && !json.getDoc.has("_deleted")) { + store(json.getDoc.toString) } - }) - } else { - val status = headers.getOrElse("Status", IndexedSeq.empty) - val errorMsg = "Error retrieving _changes feed " + config.getDbname + ": " + status(0) - reportError(errorMsg, new CloudantException(errorMsg)) + } } - }) + } else { + val errorMsg = "Error retrieving _changes feed " + config.getDbname + ": " + status_code + reportError(errorMsg, new CloudantException(errorMsg)) + } } def onStop(): Unit = { http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala index 2685993..47643cc 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala @@ -109,11 +109,11 @@ class DefaultSource extends RelationProvider if (inSchema != null) { inSchema } else if (!config.isInstanceOf[CloudantChangesConfig] - || config.viewName != null || config.indexName != null) { + || config.viewPath != null || config.indexPath != null) { val df = if (config.getSchemaSampleSize == JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT && - config.viewName == null - && config.indexName == null) { + config.viewPath == null + && config.indexPath == null) { val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) dataFrame = sqlContext.read.json(cloudantRDD.toDS()) dataFrame @@ -144,12 +144,7 @@ class DefaultSource extends RelationProvider // Collect and union each RDD to convert all RDDs to a DataFrame changes.foreachRDD((rdd: RDD[String]) => { if (!rdd.isEmpty()) { - if (globalRDD != null) { - // Union RDDs in foreach loop - globalRDD = globalRDD.union(rdd) - } else { - globalRDD = rdd - } + globalRDD = rdd ++ globalRDD } else { // Convert final global RDD[String] to DataFrame dataFrame = sqlContext.sparkSession.read.json(globalRDD.toDS()) http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala index ef1f7da..79f681d 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala @@ -16,12 +16,11 @@ */ package org.apache.bahir.cloudant.common +import com.google.gson.JsonObject import org.slf4j.LoggerFactory -import play.api.libs.json.{JsObject, JsString, JsValue} import org.apache.spark.sql.sources._ - /** * Only handles the following filter condition * 1. EqualTo,GreaterThan,LessThan,GreaterThanOrEqual,LessThanOrEqual,In @@ -118,11 +117,11 @@ class FilterInterpreter(origFilters: Array[Filter]) { */ class FilterUtil(filters: Map[String, Array[Filter]]) { private val logger = LoggerFactory.getLogger(getClass) - def apply(implicit r: JsValue = null): Boolean = { + def apply(implicit r: JsonObject = null): Boolean = { if (r == null) return true val satisfied = filters.forall({ case (attr, filters) => - val field = JsonUtil.getField(r, attr).getOrElse(null) + val field = JsonUtil.getField(r, attr).orNull if (field == null) { logger.debug(s"field $attr not exisit:$r") false @@ -136,12 +135,10 @@ class FilterUtil(filters: Map[String, Array[Filter]]) { object FilterDDocs { - def filter(row: JsValue): Boolean = { + def filter(row: JsonObject): Boolean = { if (row == null) return true - val id : String = if (row.as[JsObject].keys.contains("_id")) { - JsonUtil.getField(row, "_id").orNull.as[JsString].value - } else if (row.as[JsObject].keys.contains("id")) { - JsonUtil.getField(row, "id").orNull.as[JsString].value + val id : String = if (row.has("_id")) { + row.get("_id").getAsString } else { null } http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala index 9cd495d..0e66f03 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala @@ -40,6 +40,7 @@ object JsonStoreConfigManager { private val CLOUDANT_CHANGES_TIMEOUT = "cloudant.timeout" private val USE_QUERY_CONFIG = "cloudant.useQuery" private val QUERY_LIMIT_CONFIG = "cloudant.queryLimit" + private val NUMBER_OF_RETRIES = "cloudant.numberOfRetries" private val FILTER_SELECTOR = "selector" private val PARTITION_CONFIG = "jsonstore.rdd.partitions" @@ -68,22 +69,27 @@ object JsonStoreConfigManager { private def getInt(sparkConf: SparkConf, parameters: Map[String, String], key: String) : Int = { - val valueS = parameters.getOrElse(key, null) - if (sparkConf != null) { - val default = { + try { + val valueS = parameters.getOrElse(key, null) + if (sparkConf != null) { + val default = { + if (valueS == null) { + sparkConf.getInt(key, rootConfig.getInt(key)) + } else { + valueS.toInt + } + } + sparkConf.getInt(s"spark.$key", default) + } else { if (valueS == null) { - sparkConf.getInt(key, rootConfig.getInt(key)) + rootConfig.getInt(key) } else { valueS.toInt } } - sparkConf.getInt(s"spark.$key", default) - } else { - if (valueS == null) { - rootConfig.getInt(key) - } else { - valueS.toInt - } + } catch { + case e: NumberFormatException => + throw new CloudantException(s"Option \'$key\' failed with exception $e") } } @@ -162,22 +168,24 @@ object JsonStoreConfigManager { def getConfig (sparkConf: SparkConf, parameters: Map[String, String]): CloudantConfig = { - implicit val total = getInt(sparkConf, parameters, PARTITION_CONFIG) - implicit val max = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG) - implicit val min = getInt(sparkConf, parameters, MIN_IN_PARTITION_CONFIG) - implicit val requestTimeout = getLong(sparkConf, parameters, REQUEST_TIMEOUT_CONFIG) - implicit val bulkSize = getInt(sparkConf, parameters, BULK_SIZE_CONFIG) - implicit val schemaSampleSize = getInt(sparkConf, parameters, SCHEMA_SAMPLE_SIZE_CONFIG) - implicit val createDBOnSave = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE_CONFIG) - implicit val endpoint = getString(sparkConf, parameters, CLOUDANT_API_ENDPOINT) - implicit val selector = getString(sparkConf, parameters, FILTER_SELECTOR) - implicit val storageLevel = getStorageLevel( + implicit val total: Int = getInt(sparkConf, parameters, PARTITION_CONFIG) + implicit val max: Int = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG) + implicit val min: Int = getInt(sparkConf, parameters, MIN_IN_PARTITION_CONFIG) + implicit val requestTimeout: Long = getLong(sparkConf, parameters, REQUEST_TIMEOUT_CONFIG) + implicit val bulkSize: Int = getInt(sparkConf, parameters, BULK_SIZE_CONFIG) + implicit val schemaSampleSize: Int = getInt(sparkConf, parameters, SCHEMA_SAMPLE_SIZE_CONFIG) + implicit val createDBOnSave: Boolean = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE_CONFIG) + implicit val endpoint: String = getString(sparkConf, parameters, CLOUDANT_API_ENDPOINT) + implicit val selector: String = getString(sparkConf, parameters, FILTER_SELECTOR) + implicit val storageLevel: StorageLevel = getStorageLevel( sparkConf, parameters, STORAGE_LEVEL_FOR_CHANGES_INDEX) - implicit val timeout = getInt(sparkConf, parameters, CLOUDANT_CHANGES_TIMEOUT) - implicit val batchInterval = getInt(sparkConf, parameters, CLOUDANT_STREAMING_BATCH_INTERVAL) + implicit val timeout: Int = getInt(sparkConf, parameters, CLOUDANT_CHANGES_TIMEOUT) + implicit val batchInterval: Int = getInt( + sparkConf, parameters, CLOUDANT_STREAMING_BATCH_INTERVAL) + implicit val numberOfRetries: Int = getInt(sparkConf, parameters, NUMBER_OF_RETRIES) - implicit val useQuery = getBool(sparkConf, parameters, USE_QUERY_CONFIG) - implicit val queryLimit = getInt(sparkConf, parameters, QUERY_LIMIT_CONFIG) + implicit val useQuery: Boolean = getBool(sparkConf, parameters, USE_QUERY_CONFIG) + implicit val queryLimit: Int = getInt(sparkConf, parameters, QUERY_LIMIT_CONFIG) val dbName = parameters.getOrElse("database", parameters.getOrElse("path", throw new CloudantException(s"Cloudant database name is empty. " + @@ -193,13 +201,12 @@ object JsonStoreConfigManager { if (endpoint == ALL_DOCS_INDEX) { new CloudantConfig(protocol, host, dbName, indexName, viewName) (user, passwd, total, max, min, requestTimeout, bulkSize, - schemaSampleSize, createDBOnSave, endpoint, useQuery, - queryLimit) + schemaSampleSize, createDBOnSave, endpoint, useQuery, queryLimit, numberOfRetries) } else if (endpoint == CHANGES_INDEX) { new CloudantChangesConfig(protocol, host, dbName, indexName, viewName) (user, passwd, total, max, min, requestTimeout, bulkSize, schemaSampleSize, createDBOnSave, endpoint, selector, - timeout, storageLevel, useQuery, queryLimit, batchInterval) + timeout, storageLevel, useQuery, queryLimit, batchInterval, numberOfRetries) } else { throw new CloudantException(s"spark.$CLOUDANT_API_ENDPOINT parameter " + s"is invalid. Please supply the valid option '" + ALL_DOCS_INDEX + "' or '" + http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala index 9d09ecb..3400468 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala @@ -18,32 +18,28 @@ package org.apache.bahir.cloudant.common import java.util.concurrent.atomic.AtomicInteger +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent._ import scala.concurrent.duration._ import scala.language.implicitConversions import scala.util.{Failure, Success} -import scalaj.http.{Http, HttpRequest, HttpResponse} import ExecutionContext.Implicits.global +import com.cloudant.client.api.model.Response +import com.cloudant.http.HttpConnection +import com.google.gson.{Gson, JsonElement, JsonObject} import org.slf4j.{Logger, LoggerFactory} -import play.api.libs.json._ import org.apache.bahir.cloudant.CloudantConfig - class JsonStoreDataAccess (config: CloudantConfig) { lazy val logger: Logger = LoggerFactory.getLogger(getClass) implicit lazy val timeout: Long = config.requestTimeout def getMany(limit: Int)(implicit columns: Array[String] = null): Seq[String] = { - if (limit == 0) { - throw new CloudantException("Database " + config.getDbname + - " schema sample size is 0!") - } - if (limit < -1) { - throw new CloudantException("Database " + config.getDbname + - " schema sample size is " + limit + "!") + if (limit == 0 || limit < -1) { + throw new CloudantException("Schema size '" + limit + "' is not valid.") } var r = this.getQueryResult[Seq[String]](config.getUrl(limit), processAll) if (r.isEmpty) { @@ -58,35 +54,30 @@ class JsonStoreDataAccess (config: CloudantConfig) { } } - def getAll[T](url: String) - (implicit columns: Array[String] = null): Seq[String] = { - this.getQueryResult[Seq[String]](url, processAll) - } - def getIterator(skip: Int, limit: Int, url: String) (implicit columns: Array[String] = null, postData: String = null): Iterator[String] = { + logger.info(s"Loading data from Cloudant using: $url , postData: $postData") val newUrl = config.getSubSetUrl(url, skip, limit, postData != null) this.getQueryResult[Iterator[String]](newUrl, processIterator) } def getTotalRows(url: String, queryUsed: Boolean) - (implicit postData: String = null): Int = { - if (queryUsed) config.queryLimit // Query can not retrieve total row now. - else { - val totalUrl = config.getTotalUrl(url) - this.getQueryResult[Int](totalUrl, - { result => config.getTotalRows(Json.parse(result))}) - } + (implicit postData: String = null): Int = { + if (queryUsed) config.queryLimit // Query can not retrieve total row now. + else { + val totalUrl = config.getTotalUrl(url) + this.getQueryResult[Int](totalUrl, + { result => config.getResultCount(result)}) + } } - private def processAll (result: String) + private def processAll(result: String) (implicit columns: Array[String], postData: String = null) = { logger.debug(s"processAll:$result, columns:$columns") - val jsonResult: JsValue = Json.parse(result) - var rows = config.getRows(jsonResult, postData != null ) - if (config.viewName == null && postData == null) { + var rows = config.getRows(result, postData != null) + if (config.viewPath == null && postData == null) { // filter design docs rows = rows.filter(r => FilterDDocs.filter(r)) } @@ -99,104 +90,43 @@ class JsonStoreDataAccess (config: CloudantConfig) { processAll(result).iterator } - private def convert(rec: JsValue)(implicit columns: Array[String]): String = { - if (columns == null) return Json.stringify(Json.toJson(rec)) - val m = new mutable.HashMap[String, JsValue]() - for ( x <- columns) { - val field = JsonUtil.getField(rec, x).getOrElse(JsNull) - m.put(x, field) + private def convert(rec: JsonElement)(implicit columns: Array[String]): String = { + if (columns == null) { + rec.getAsJsonObject.toString + } else { + val jsonObject = new JsonObject + for (x <- columns) { + val field = JsonUtil.getField(rec, x).orNull + jsonObject.add(x, field) + } + val result = jsonObject.toString + logger.debug(s"converted: $result") + result } - val result = Json.stringify(Json.toJson(m.toMap)) - logger.debug(s"converted: $result") - result - } - - - def getChanges(url: String, processResults: (String) => String): String = { - getQueryResult(url, processResults) } private def getQueryResult[T] - (url: String, postProcessor: (String) => T) - (implicit columns: Array[String] = null, - postData: String = null) : T = { + (url: String, postProcessor: (String) => T) + (implicit columns: Array[String] = null, + postData: String = null) : T = { logger.info(s"Loading data from Cloudant using: $url , postData: $postData") - val clRequest: HttpRequest = getClRequest(url, postData) + val clRequest: HttpConnection = config.executeRequest(url, postData) - val clResponse: HttpResponse[String] = clRequest.execute() - if (! clResponse.isSuccess) { + val clResponse: HttpConnection = clRequest.execute() + if (clResponse.getConnection.getResponseCode != 200) { throw new CloudantException("Database " + config.getDbname + - " request error: " + clResponse.body) + " request error: " + clResponse.responseAsString) } - val data = postProcessor(clResponse.body) - logger.debug(s"got result: $data") + val data = postProcessor(clResponse.responseAsString) + logger.debug(s"got result:$data") data } def createDB(): Unit = { - val url = config.getDbUrl.toString - val clRequest: HttpRequest = getClRequest(url, null, "PUT") - - val clResponse: HttpResponse[String] = clRequest.execute() - if (! clResponse.isSuccess) { - throw new CloudantException("Database " + config.getDbname + - " create error: " + clResponse.body) - } else { - logger.warn(s"Database ${config.getDbname} was created.") - } + config.getClient.createDB(config.getDbname) } - - def getClRequest(url: String, postData: String = null, - httpMethod: String = null): HttpRequest = { - val requestTimeout = config.requestTimeout.toInt - config.username match { - case null => - if (postData != null) { - Http(url) - .postData(postData) - .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout) - .header("Content-Type", "application/json") - .header("User-Agent", "spark-cloudant") - } else { - if (httpMethod != null) { - Http(url) - .method(httpMethod) - .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout) - .header("User-Agent", "spark-cloudant") - } else { - Http(url) - .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout) - .header("User-Agent", "spark-cloudant") - } - } - case _ => - if (postData != null) { - Http(url) - .postData(postData) - .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout) - .header("Content-Type", "application/json") - .header("User-Agent", "spark-cloudant") - .auth(config.username, config.password) - } else { - if (httpMethod != null) { - Http(url) - .method(httpMethod) - .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout) - .header("User-Agent", "spark-cloudant") - .auth(config.username, config.password) - } else { - Http(url) - .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout) - .header("User-Agent", "spark-cloudant") - .auth(config.username, config.password) - } - } - } - } - - def saveAll(rows: List[String]): Unit = { if (rows.isEmpty) return val bulkSize = config.bulkSize @@ -205,31 +135,29 @@ class JsonStoreDataAccess (config: CloudantConfig) { logger.debug(s"total records:${rows.size}=bulkSize:$bulkSize * totalBulks:$totalBulks") val futures = bulks.map( bulk => { - val data = config.getBulkRows(bulk) - val url = config.getBulkPostUrl.toString - val clRequest: HttpRequest = getClRequest(url, data) + val jsonData = config.getBulkRows(bulk) Future { - clRequest.execute() + config.getDatabase.bulk(jsonData.asJava) } } ) // remaining - number of requests remained to succeed val remaining = new AtomicInteger(futures.length) - val p = Promise[HttpResponse[String]] + val p = Promise[java.util.List[Response]] futures foreach { _ onComplete { - case Success(clResponse: HttpResponse[String]) => - // find if there was error in saving at least one of docs - val resBody: String = clResponse.body - val isErr = (resBody contains config.getConflictErrStr) || - (resBody contains config.getForbiddenErrStr) - if (!clResponse.isSuccess || isErr) { + case Success(clResponses) => + if (clResponses contains config.getConflictErrStr) { + val e = new CloudantException("Save to database:" + config.getDbname + + " failed with reason: " + config.getConflictErrStr) + p.tryFailure(e) + } else if (clResponses contains config.getForbiddenErrStr) { val e = new CloudantException("Save to database:" + config.getDbname + - " failed with reason: " + clResponse.body) + " failed with reason: " + config.getForbiddenErrStr) p.tryFailure(e) } else if (remaining.decrementAndGet() == 0) { // succeed the whole save operation if all requests success - p.trySuccess(clResponse) + p.trySuccess(clResponses) } // if a least one save request fails - fail the whole save operation case Failure(e) => http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala index c4f828d..2ba480d 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala @@ -16,8 +16,8 @@ */ package org.apache.bahir.cloudant.common +import com.google.gson._ import org.slf4j.LoggerFactory -import play.api.libs.json.{Json, JsValue} import org.apache.spark.Partition import org.apache.spark.SparkContext @@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources._ import org.apache.bahir.cloudant.CloudantConfig +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter /** * JsonStoreRDDPartition defines each partition as a subset of a query result: @@ -33,8 +34,9 @@ import org.apache.bahir.cloudant.CloudantConfig */ private[cloudant] class JsonStoreRDDPartition(val url: String, val skip: Int, val limit: Int, - val idx: Int, val config: CloudantConfig, val selector: JsValue, val fields: JsValue, - val queryUsed: Boolean) + val idx: Int, val config: CloudantConfig, + val selector: JsonElement, val fields: JsonElement, + val queryUsed: Boolean) extends Partition with Serializable{ val index: Int = idx } @@ -92,7 +94,9 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig) } } - private def convertToMangoJson(f: Filter): (String, JsValue) = { + private def convertToMangoJson(f: Filter): (String, JsonElement) = { + val gson = new Gson + val parser = new JsonParser() val (op, value): (String, Any) = f match { case EqualTo(attr, v) => ("$eq", v) case GreaterThan(attr, v) => ("$gt", v) @@ -101,16 +105,16 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig) case LessThanOrEqual(attr, v) => ("$lte", v) case _ => (null, null) } - val convertedV: JsValue = { + val convertedV: JsonElement = { // TODO Better handing of other types if (value != null) { value match { - case s: String => Json.toJson(s) - case l: Long => Json.toJson(l) - case d: Double => Json.toJson(d) - case i: Int => Json.toJson(i) - case b: Boolean => Json.toJson(b) - case t: java.sql.Timestamp => Json.toJson(t) + case s: String => parser.parse(s) + case l: Long => parser.parse(l.toString) + case d: Double => parser.parse(d.toString) + case i: Int => parser.parse(i.toString) + case b: Boolean => parser.parse(b.toString) + case t: java.sql.Timestamp => parser.parse(t.toString) case a: Any => logger.debug(s"Ignore field:$name, cannot handle its datatype: $a"); null } } else null @@ -118,7 +122,7 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig) (op, convertedV) } - private def convertAttrToMangoJson(filters: Array[Filter]): Map[String, JsValue] = { + private def convertAttrToMangoJson(filters: Array[Filter]): Map[String, JsonElement] = { filters.map(af => convertToMangoJson(af)) .filter(x => x._2 != null) .toMap @@ -137,10 +141,10 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig) } } - val (selector, fields) : (JsValue, JsValue) = { + val (selector, fields) : (JsonElement, JsonElement) = { if (!config.queryEnabled || origAttrToFilters == null) (null, null) else { - val selectors: Map[String, Map[String, JsValue]] = + val selectors: Map[String, Map[String, JsonElement]] = origAttrToFilters.transform( (name, attrFilters) => convertAttrToMangoJson(attrFilters)) val filteredSelectors = selectors.filter((t) => t._2.nonEmpty) @@ -149,10 +153,10 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig) if (requiredcolumns == null || requiredcolumns.length == 0) { null } else { - Json.toJson(requiredcolumns) + JsonConverter.toJson(requiredcolumns) } } - (Json.toJson(filteredSelectors), queryColumns) + (JsonConverter.toJson(filteredSelectors), queryColumns) } else (null, null) } } @@ -174,7 +178,10 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig) implicit val postData : String = { if (queryUsed) { - Json.stringify(Json.obj("selector" -> selector, "limit" -> 1)) + val jsonSelector = new JsonObject + jsonSelector.addProperty("selector", selector.toString) + jsonSelector.addProperty("limit", 1) + jsonSelector.toString } else { null } @@ -191,7 +198,8 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig) (0 until totalPartition).map(i => { val skip = i * limitPerPartition new JsonStoreRDDPartition(url, skip, limitPerPartition, i, - config, selector, fields, queryUsed).asInstanceOf[Partition] + config, selector, fields, queryUsed) + .asInstanceOf[Partition] }).toArray } @@ -199,12 +207,15 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig) Iterator[String] = { val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition] implicit val postData : String = { + val jsonObject = new JsonObject + jsonObject.add("selector", myPartition.selector) + jsonObject.addProperty("skip", myPartition.skip) if (myPartition.queryUsed && myPartition.fields != null) { - Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> myPartition.fields, - "limit" -> myPartition.limit, "skip" -> myPartition.skip)) + jsonObject.add("fields", myPartition.fields) + jsonObject.toString } else if (myPartition.queryUsed) { - Json.stringify(Json.obj("selector" -> myPartition.selector, "limit" -> myPartition.limit, - "skip" -> myPartition.skip)) + jsonObject.addProperty("limit", myPartition.limit) + jsonObject.toString } else { null } http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala index 82d9afc..eadceea 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala @@ -16,27 +16,38 @@ */ package org.apache.bahir.cloudant.common -import play.api.libs.json.JsValue import scala.util.control.Breaks._ +import com.google.gson.{JsonElement, JsonParser} + object JsonUtil { - def getField(row: JsValue, field: String) : Option[JsValue] = { + def getField(row: JsonElement, field: String) : Option[JsonElement] = { var path = field.split('.') var currentValue = row - var finalValue: Option[JsValue] = None + var finalValue: Option[JsonElement] = None breakable { for (i <- path.indices) { - val f: Option[JsValue] = (currentValue \ path(i)).toOption - f match { - case Some(f2) => currentValue = f2 - case None => break - } - if (i == path.length -1) { - // The leaf node - finalValue = Some(currentValue) + if (currentValue != null && currentValue.isJsonObject) { + val f: Option[JsonElement] = + Option(currentValue.getAsJsonObject.get(path(i))) + f match { + case Some(f2) => currentValue = f2 + case None => break + } + if (i == path.length - 1) { + // The leaf node + finalValue = Some(currentValue) + } } } } finalValue } + + object JsonConverter { + val parser = new JsonParser + def toJson(value: Any): JsonElement = { + parser.parse(value.toString) + } + } } http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala index 323aab6..56671b3 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala @@ -17,8 +17,10 @@ package org.apache.bahir.cloudant.internal import java.io.{BufferedReader, InputStreamReader} +import java.util.concurrent.TimeUnit -import scalaj.http._ +import com.google.gson.JsonParser +import okhttp3._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver @@ -37,45 +39,46 @@ class ChangesReceiver(config: CloudantChangesConfig) } private def receive(): Unit = { - // Get normal _changes url + val okHttpClient: OkHttpClient = new OkHttpClient.Builder() + .connectTimeout(5, TimeUnit.SECONDS) + .readTimeout(60, TimeUnit.SECONDS) + .build val url = config.getChangesReceiverUrl.toString - val selector: String = { - "{\"selector\":" + config.getSelector + "}" - } - val clRequest: HttpRequest = config.username match { - case null => - Http(url) - .postData(selector) - .header("Content-Type", "application/json") - .header("User-Agent", "spark-cloudant") - case _ => - Http(url) - .postData(selector) - .header("Content-Type", "application/json") - .header("User-Agent", "spark-cloudant") - .auth(config.username, config.password) + val builder = new Request.Builder().url(url) + if (config.username != null) { + val credential = Credentials.basic(config.username, config.password) + builder.header("Authorization", credential) + } + if(config.getSelector != null) { + val jsonType = MediaType.parse("application/json; charset=utf-8") + val selector = "{\"selector\":" + config.getSelector + "}" + val selectorBody = RequestBody.create(jsonType, selector) + builder.post(selectorBody) } - clRequest.exec((code, headers, is) => { - if (code == 200) { - var json = new ChangesRow() - if (is != null) { - val bufferedReader = new BufferedReader(new InputStreamReader(is)) - while ((json = ChangesRowScanner.readRowFromReader(bufferedReader)) != null) { - if (!isStopped() && json != null && !json.getDoc.has("_deleted")) { - store(json.getDoc.toString) - } + val request = builder.build + val response = okHttpClient.newCall(request).execute + val status_code = response.code + + if (status_code == 200) { + val changesInputStream = response.body.byteStream + var json = new ChangesRow() + if (changesInputStream != null) { + val bufferedReader = new BufferedReader(new InputStreamReader(changesInputStream)) + while ((json = ChangesRowScanner.readRowFromReader(bufferedReader)) != null) { + if (!isStopped() && json != null && !json.getDoc.has("_deleted")) { + store(json.getDoc.toString) } } - } else { - val status = headers.getOrElse("Status", IndexedSeq.empty) - val errorMsg = "Error retrieving _changes feed data from database " + - "'" + config.getDbname + "': " + status(0) - reportError(errorMsg, new CloudantException(errorMsg)) - CloudantChangesConfig.receiverErrorMsg = errorMsg } - }) + } else { + val responseAsJson = new JsonParser().parse(response.body.string) + val errorMsg = "Error retrieving _changes feed data from database " + "'" + + config.getDbname + "' with response code " + status_code + ": " + responseAsJson.toString + reportError(errorMsg, new CloudantException(errorMsg)) + CloudantChangesConfig.receiverErrorMsg = errorMsg + } } override def onStop(): Unit = { http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala index 8495026..c487937 100644 --- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala +++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala @@ -42,7 +42,6 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter { } assert(thrown.getMessage === s"spark.cloudant.endpoint parameter " + s"is invalid. Please supply the valid option '_all_docs' or '_changes'.") - } testIfEnabled("empty username option throws an error message") { @@ -99,7 +98,35 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter { val thrown = intercept[CloudantException] { spark.read.format("org.apache.bahir.cloudant").load("n_flight") } - assert(thrown.getMessage === s"Error retrieving _changes feed data" + - s" from database 'n_flight': HTTP/1.1 401 Unauthorized") + assert(thrown.getMessage === "Error retrieving _changes feed data" + + " from database 'n_flight' with response code 401: {\"error\":\"unauthorized\"," + + "\"reason\":\"Name or password is incorrect.\"}") + } + + testIfEnabled("string with valid value for cloudant.numberOfRetries option") { + spark = SparkSession.builder().config(conf) + .config("cloudant.host", TestUtils.getHost) + .config("cloudant.username", TestUtils.getUsername) + .config("cloudant.password", TestUtils.getPassword) + .config("cloudant.numberOfRetries", "5") + .getOrCreate() + + val df = spark.read.format("org.apache.bahir.cloudant").load("n_booking") + assert(df.count() === 2) + } + + testIfEnabled("invalid value for cloudant.numberOfRetries option throws an error message") { + spark = SparkSession.builder().config(conf) + .config("cloudant.host", TestUtils.getHost) + .config("cloudant.username", TestUtils.getUsername) + .config("cloudant.password", TestUtils.getPassword) + .config("cloudant.numberOfRetries", "five") + .getOrCreate() + + val thrown = intercept[CloudantException] { + spark.read.format("org.apache.bahir.cloudant").load("db") + } + assert(thrown.getMessage === s"Option \'cloudant.numberOfRetries\' failed with exception " + + s"""java.lang.NumberFormatException: For input string: "five"""") } }
