Repository: bahir Updated Branches: refs/heads/master 770b2916f -> ebdc8b257
[BAHIR-137] CouchDB/Cloudant _changes feed receiver improvements Adds batchInterval option for tuning _changes receiverâs streaming batch interval Throw a CloudantException if the final schema for the _changes receiver is empty Call stop method in streaming receiver when thereâs an error Closes #60 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/ebdc8b25 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/ebdc8b25 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/ebdc8b25 Branch: refs/heads/master Commit: ebdc8b257d32ff64a88657cc3e7dc838564a1d01 Parents: 770b291 Author: Esteban Laver <[email protected]> Authored: Mon Oct 2 11:09:28 2017 -0400 Committer: Luciano Resende <[email protected]> Committed: Wed Jan 24 20:03:20 2018 -0500 ---------------------------------------------------------------------- sql-cloudant/README.md | 1 + sql-cloudant/src/main/resources/application.conf | 1 + .../bahir/cloudant/CloudantChangesConfig.scala | 11 ++++++++++- .../org/apache/bahir/cloudant/DefaultSource.scala | 11 ++++++++--- .../cloudant/common/JsonStoreConfigManager.scala | 6 ++++-- .../bahir/cloudant/internal/ChangesReceiver.scala | 4 +++- .../apache/bahir/cloudant/CloudantOptionSuite.scala | 16 ++++++++++++++++ 7 files changed, 43 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/README.md ---------------------------------------------------------------------- diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md index 18118eb..160e279 100644 --- a/sql-cloudant/README.md +++ b/sql-cloudant/README.md @@ -57,6 +57,7 @@ Default values are defined in [here](src/main/resources/application.conf). Name | Default | Meaning --- |:---:| --- +cloudant.batchInterval|8|number of seconds to set for streaming all documents from `_changes` endpoint into Spark dataframe. See [Setting the right batch interval](https://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval) for tuning this value. cloudant.endpoint|`_all_docs`|endpoint for RelationProvider when loading data from Cloudant to DataFrames or SQL temporary tables. Select between the Cloudant `_all_docs` or `_changes` API endpoint. See **Note** below for differences between endpoints. cloudant.protocol|https|protocol to use to transfer data: http or https cloudant.host| |cloudant host url http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/resources/application.conf b/sql-cloudant/src/main/resources/application.conf index 62497e2..6ff4139 100644 --- a/sql-cloudant/src/main/resources/application.conf +++ b/sql-cloudant/src/main/resources/application.conf @@ -9,6 +9,7 @@ spark-sql { requestTimeout = 900000 } cloudant = { + batchInterval = 8 endpoint = "_all_docs" protocol = https useQuery = false http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala index 0e70b95..9f2a7ba 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala @@ -27,13 +27,17 @@ class CloudantChangesConfig(protocol: String, host: String, dbName: String, bulkSize: Int, schemaSampleSize: Int, createDBOnSave: Boolean, endpoint: String, selector: String, timeout: Int, storageLevel: StorageLevel, useQuery: Boolean, - queryLimit: Int) + queryLimit: Int, batchInterval: Int) extends CloudantConfig(protocol, host, dbName, indexName, viewName)(username, password, partitions, maxInPartition, minInPartition, requestTimeout, bulkSize, schemaSampleSize, createDBOnSave, endpoint, useQuery, queryLimit) { override val defaultIndex: String = endpoint + def getBatchInterval : Int = { + batchInterval + } + def getSelector : String = { if (selector != null && !selector.isEmpty) { selector @@ -80,3 +84,8 @@ class CloudantChangesConfig(protocol: String, host: String, dbName: String, dbUrl + "/" + JsonStoreConfigManager.ALL_DOCS_INDEX } } + +object CloudantChangesConfig { + // Error message from internal _changes receiver + var receiverErrorMsg: String = "" +} http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala index 1596133..36c2c78 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala @@ -125,9 +125,10 @@ class DefaultSource extends RelationProvider /* Create a streaming context to handle transforming docs in * larger databases into Spark datasets */ - val ssc = new StreamingContext(sqlContext.sparkContext, Seconds(8)) - val changesConfig = config.asInstanceOf[CloudantChangesConfig] + val ssc = new StreamingContext(sqlContext.sparkContext, + Seconds(changesConfig.getBatchInterval)) + val changes = ssc.receiverStream( new ChangesReceiver(changesConfig)) changes.persist(changesConfig.getStorageLevelForStreaming) @@ -158,7 +159,11 @@ class DefaultSource extends RelationProvider // run streaming until all docs from continuous feed are received ssc.awaitTermination - dataFrame.schema + if(dataFrame.schema.nonEmpty) { + dataFrame.schema + } else { + throw new CloudantException(CloudantChangesConfig.receiverErrorMsg) + } } } CloudantReadWriteRelation(config, schema, dataFrame)(sqlContext) http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala index 40b4b1a..9cd495d 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala @@ -18,8 +18,8 @@ package org.apache.bahir.cloudant.common import com.typesafe.config.ConfigFactory -import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf +import org.apache.spark.sql.SQLContext import org.apache.spark.storage.StorageLevel import org.apache.bahir.cloudant.{CloudantChangesConfig, CloudantConfig} @@ -35,6 +35,7 @@ object JsonStoreConfigManager { private val CLOUDANT_PASSWORD_CONFIG = "cloudant.password" private val CLOUDANT_PROTOCOL_CONFIG = "cloudant.protocol" private val CLOUDANT_API_ENDPOINT = "cloudant.endpoint" + private val CLOUDANT_STREAMING_BATCH_INTERVAL = "cloudant.batchInterval" private val STORAGE_LEVEL_FOR_CHANGES_INDEX = "cloudant.storageLevel" private val CLOUDANT_CHANGES_TIMEOUT = "cloudant.timeout" private val USE_QUERY_CONFIG = "cloudant.useQuery" @@ -173,6 +174,7 @@ object JsonStoreConfigManager { implicit val storageLevel = getStorageLevel( sparkConf, parameters, STORAGE_LEVEL_FOR_CHANGES_INDEX) implicit val timeout = getInt(sparkConf, parameters, CLOUDANT_CHANGES_TIMEOUT) + implicit val batchInterval = getInt(sparkConf, parameters, CLOUDANT_STREAMING_BATCH_INTERVAL) implicit val useQuery = getBool(sparkConf, parameters, USE_QUERY_CONFIG) implicit val queryLimit = getInt(sparkConf, parameters, QUERY_LIMIT_CONFIG) @@ -197,7 +199,7 @@ object JsonStoreConfigManager { new CloudantChangesConfig(protocol, host, dbName, indexName, viewName) (user, passwd, total, max, min, requestTimeout, bulkSize, schemaSampleSize, createDBOnSave, endpoint, selector, - timeout, storageLevel, useQuery, queryLimit) + timeout, storageLevel, useQuery, queryLimit, batchInterval) } else { throw new CloudantException(s"spark.$CLOUDANT_API_ENDPOINT parameter " + s"is invalid. Please supply the valid option '" + ALL_DOCS_INDEX + "' or '" + http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala index ac0aac6..323aab6 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala @@ -70,8 +70,10 @@ class ChangesReceiver(config: CloudantChangesConfig) } } else { val status = headers.getOrElse("Status", IndexedSeq.empty) - val errorMsg = "Error retrieving _changes feed " + config.getDbname + ": " + status(0) + val errorMsg = "Error retrieving _changes feed data from database " + + "'" + config.getDbname + "': " + status(0) reportError(errorMsg, new CloudantException(errorMsg)) + CloudantChangesConfig.receiverErrorMsg = errorMsg } }) } http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala index a8c8646..8495026 100644 --- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala +++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala @@ -86,4 +86,20 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter { assert(thrown.getMessage === s"Cloudant database name is empty. " + s"Please supply the required value.") } + + testIfEnabled("incorrect password throws an error message for changes receiver") { + spark = SparkSession.builder().config(conf) + .config("cloudant.protocol", TestUtils.getProtocol) + .config("cloudant.host", TestUtils.getHost) + .config("cloudant.username", TestUtils.getUsername) + .config("cloudant.password", TestUtils.getPassword.concat("a")) + .config("cloudant.endpoint", "_changes") + .getOrCreate() + + val thrown = intercept[CloudantException] { + spark.read.format("org.apache.bahir.cloudant").load("n_flight") + } + assert(thrown.getMessage === s"Error retrieving _changes feed data" + + s" from database 'n_flight': HTTP/1.1 401 Unauthorized") + } }
