Repository: bahir Updated Branches: refs/heads/master 57cb15fd3 -> 0e1505a89
[BAHIR-128] Improve sql-cloudant _changes receiver This change improves the stability of _changes receiver and fix the intermitent failing test in sql-cloudant's CloudantChangesDFSuite. How Improve performance and decrease testing time by setting batch size to 8 seconds and using seq_interval _changes feed option. Use getResource to load json files path Added Mike Rhodes's ChangesRowScanner for reading each _changes line and transforming to GSON's JSON object Added Mike Rhodes's ChangesRow representing a row in the changes feed Closes #57 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/0e1505a8 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/0e1505a8 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/0e1505a8 Branch: refs/heads/master Commit: 0e1505a8960bfe40ea025267bbf36ec5c4cf5c79 Parents: 57cb15f Author: Esteban Laver <[email protected]> Authored: Fri Sep 8 10:33:26 2017 -0400 Committer: Luciano Resende <[email protected]> Committed: Tue Dec 19 20:26:50 2017 -0800 ---------------------------------------------------------------------- .../bahir/cloudant/common/ChangesRow.java | 77 ++++++++++++++++++++ .../cloudant/common/ChangesRowScanner.java | 77 ++++++++++++++++++++ .../bahir/cloudant/CloudantChangesConfig.scala | 3 +- .../apache/bahir/cloudant/DefaultSource.scala | 2 +- .../cloudant/common/JsonStoreDataAccess.scala | 2 +- .../cloudant/internal/ChangesReceiver.scala | 41 +++-------- .../bahir/cloudant/ClientSparkFunSuite.scala | 13 +++- .../bahir/cloudant/CloudantChangesDFSuite.scala | 15 ++-- 8 files changed, 185 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/0e1505a8/sql-cloudant/src/main/java/org/apache/bahir/cloudant/common/ChangesRow.java ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/java/org/apache/bahir/cloudant/common/ChangesRow.java b/sql-cloudant/src/main/java/org/apache/bahir/cloudant/common/ChangesRow.java new file mode 100644 index 0000000..a3ed125 --- /dev/null +++ b/sql-cloudant/src/main/java/org/apache/bahir/cloudant/common/ChangesRow.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bahir.cloudant.common; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +import java.util.List; + +/** + * Class representing a single row in a changes feed. Structure: + * + * { + * last_seq": 5 + * "results": [ + * ---*** This next items is the ChangesRow ***--- + * { + * "changes": [ {"rev": "2-eec205a9d413992850a6e32678485900"}, ... ], + * "deleted": true, + * "id": "deleted", + * "seq": 5, + * "doc": ... structure ... + * } + * ] + * } + */ +public class ChangesRow { + + public class Rev { + private String rev; + + public String getRev() { + return rev; + } + } + + private List<Rev> changes; + public Boolean deleted; + private String id; + private JsonElement seq; + private JsonObject doc; + + public List<Rev> getChanges() { + return changes; + } + + public String getSeq() { + if (seq.isJsonNull()) { + return null; + } else { + return seq.toString(); + } + } + + public String getId() { + return id; + } + + public JsonObject getDoc() { + return doc; + } + +} http://git-wip-us.apache.org/repos/asf/bahir/blob/0e1505a8/sql-cloudant/src/main/java/org/apache/bahir/cloudant/common/ChangesRowScanner.java ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/java/org/apache/bahir/cloudant/common/ChangesRowScanner.java b/sql-cloudant/src/main/java/org/apache/bahir/cloudant/common/ChangesRowScanner.java new file mode 100644 index 0000000..8fa2cde --- /dev/null +++ b/sql-cloudant/src/main/java/org/apache/bahir/cloudant/common/ChangesRowScanner.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bahir.cloudant.common; + +import com.google.gson.Gson; +import java.io.BufferedReader; +import java.io.IOException; + +/** + * This scanner will read through a _changes stream until it finds the + * next meaningful row, either a change entry or the closing line with + * the lastSeq and, perhaps, pending changes (for normal/longpoll feeds). + */ +public class ChangesRowScanner { + + private static final Gson gson = new Gson(); + + /** + * Read up to the next meaningful line from the changes feed, and calls + * the passed delegate depending on what it finds. Works for all styles of + * changes feed (normal, longpoll, continuous). + * + * @return True if should continue + * + * @throws IOException if there's a problem reading the stream + */ + public static ChangesRow readRowFromReader(BufferedReader changesReader) + throws IOException { + + String line; + + // Read the next line (empty = heartbeat, ignore; null = end of stream) + while ((line = changesReader.readLine()) != null) { + if (line.isEmpty()) { + continue; + } + if (line.startsWith("{\"results\":")) { + // ignore, just the START of the result set in normal/longpoll mode + continue; + } else if (line.startsWith("],")) { + // ignore, just the END of the result set in normal/longpoll mode + continue; + } + break; + } + + if(line != null) { + if (line.startsWith("\"last_seq\":")) { + return null; // End of feed + } else if (line.startsWith("{\"last_seq\":")) { + return null; // End of feed + } else { + if (line.endsWith(",")) { + line = line.substring(0, line.length() - 1); + } + ChangesRow r = gson.fromJson(line, ChangesRow.class); + return r; // not end of feed + } + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/0e1505a8/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala index 4c4e0b2..0e70b95 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala @@ -67,7 +67,8 @@ class CloudantChangesConfig(protocol: String, host: String, dbName: String, } def getChangesReceiverUrl: String = { - var url = dbUrl + "/" + defaultIndex + "?include_docs=true&feed=continuous&timeout=" + timeout + var url = dbUrl + "/" + defaultIndex + "?include_docs=true&feed=normal" + + "&seq_interval=" + bulkSize + "&timeout=" + timeout if (getSelector != null) { url = url + "&filter=_selector" } http://git-wip-us.apache.org/repos/asf/bahir/blob/0e1505a8/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala index 37f2f1b..1596133 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala @@ -125,7 +125,7 @@ class DefaultSource extends RelationProvider /* Create a streaming context to handle transforming docs in * larger databases into Spark datasets */ - val ssc = new StreamingContext(sqlContext.sparkContext, Seconds(10)) + val ssc = new StreamingContext(sqlContext.sparkContext, Seconds(8)) val changesConfig = config.asInstanceOf[CloudantChangesConfig] val changes = ssc.receiverStream( http://git-wip-us.apache.org/repos/asf/bahir/blob/0e1505a8/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala index df5a18b..9d09ecb 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala @@ -130,7 +130,7 @@ class JsonStoreDataAccess (config: CloudantConfig) { " request error: " + clResponse.body) } val data = postProcessor(clResponse.body) - logger.debug(s"got result:$data") + logger.debug(s"got result: $data") data } http://git-wip-us.apache.org/repos/asf/bahir/blob/0e1505a8/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala index de026ba..ac0aac6 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala @@ -16,18 +16,16 @@ */ package org.apache.bahir.cloudant.internal -import org.slf4j.{Logger, LoggerFactory} -import play.api.libs.json.Json +import java.io.{BufferedReader, InputStreamReader} + import scalaj.http._ -import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver import org.apache.bahir.cloudant.CloudantChangesConfig import org.apache.bahir.cloudant.common._ - class ChangesReceiver(config: CloudantChangesConfig) extends Receiver[String](StorageLevel.MEMORY_AND_DISK) { @@ -39,28 +37,21 @@ class ChangesReceiver(config: CloudantChangesConfig) } private def receive(): Unit = { - // Get total number of docs in database using _all_docs endpoint - val limit = new JsonStoreDataAccess(config) - .getTotalRows(config.getTotalUrl, queryUsed = false) - - // Get continuous _changes url + // Get normal _changes url val url = config.getChangesReceiverUrl.toString val selector: String = { "{\"selector\":" + config.getSelector + "}" } - var count = 0 val clRequest: HttpRequest = config.username match { case null => Http(url) .postData(selector) - .timeout(connTimeoutMs = 1000, readTimeoutMs = 0) .header("Content-Type", "application/json") .header("User-Agent", "spark-cloudant") case _ => Http(url) .postData(selector) - .timeout(connTimeoutMs = 1000, readTimeoutMs = 0) .header("Content-Type", "application/json") .header("User-Agent", "spark-cloudant") .auth(config.username, config.password) @@ -68,27 +59,15 @@ class ChangesReceiver(config: CloudantChangesConfig) clRequest.exec((code, headers, is) => { if (code == 200) { - scala.io.Source.fromInputStream(is, "utf-8").getLines().foreach(line => { - if (count < limit) { - if (line.length() > 0) { - val json = Json.parse(line) - val jsonDoc = (json \ "doc").getOrElse(null) - var doc = "" - if (jsonDoc != null) { - doc = Json.stringify(jsonDoc) - // Verify that doc is not empty and is not deleted - val deleted = (jsonDoc \ "_deleted").getOrElse(null) - if (!doc.isEmpty && deleted == null) { - store(doc) - count += 1 - } - } + var json = new ChangesRow() + if (is != null) { + val bufferedReader = new BufferedReader(new InputStreamReader(is)) + while ((json = ChangesRowScanner.readRowFromReader(bufferedReader)) != null) { + if (!isStopped() && json != null && !json.getDoc.has("_deleted")) { + store(json.getDoc.toString) } - } else { - // exit loop once limit is reached - return } - }) + } } else { val status = headers.getOrElse("Status", IndexedSeq.empty) val errorMsg = "Error retrieving _changes feed " + config.getDbname + ": " + status(0) http://git-wip-us.apache.org/repos/asf/bahir/blob/0e1505a8/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala index 94ceadf..aa8a48e 100644 --- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala +++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala @@ -36,6 +36,7 @@ class ClientSparkFunSuite extends SparkFunSuite with BeforeAndAfter { var client: CloudantClient = _ val conf: SparkConf = new SparkConf().setMaster("local[4]") + var deletedDoc = new JsonObject() var spark: SparkSession = _ override def beforeAll() { @@ -81,10 +82,10 @@ class ClientSparkFunSuite extends SparkFunSuite with BeforeAndAfter { // insert docs and design docs from JSON flat files for (dbName: String <- TestUtils.testDatabasesList) { val db = client.database(dbName, true) - val jsonFilePath = System.getProperty("user.dir") + - "/src/test/resources/json-files/" + dbName + ".json" - if (new File(jsonFilePath).exists()) { - val jsonFileArray = new Gson().fromJson(new FileReader(jsonFilePath), classOf[JsonArray]) + val jsonFilePath = getClass.getResource("/json-files/" + dbName + ".json") + if (jsonFilePath != null && new File(jsonFilePath.getFile).exists()) { + val jsonFileArray = new Gson().fromJson(new FileReader(jsonFilePath.getFile), + classOf[JsonArray]) val listOfObjects = new util.ArrayList[JsonObject] if (jsonFileArray != null) { var i = 0 @@ -100,6 +101,10 @@ class ClientSparkFunSuite extends SparkFunSuite with BeforeAndAfter { assert(responses.get(i).getStatusCode == 200 || responses.get(i).getStatusCode == 201) i += 1 } + if (dbName == "n_flight") { + deletedDoc.addProperty("_id", responses.get(0).getId) + deletedDoc.addProperty("_rev", responses.get(0).getRev) + } } } http://git-wip-us.apache.org/repos/asf/bahir/blob/0e1505a8/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala index da51d9f..5e8f6f6 100644 --- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala +++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala @@ -19,8 +19,6 @@ package org.apache.bahir.cloudant import scala.util.Try -import play.api.libs.json._ - import org.apache.spark.sql.SparkSession class CloudantChangesDFSuite extends ClientSparkFunSuite { @@ -47,6 +45,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite { // Caching df in memory to speed computations // and not to retrieve data from cloudant again df.cache() + // all docs in database minus the design doc assert(df.count() == 1967) } @@ -62,10 +61,8 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite { testIfEnabled("load data and verify deleted doc is not in results") { val db = client.database("n_flight", false) - // Find then delete a doc to verify it's not included when loading data - val doc = db.find("003bd483-9f98-4203-afdd-c539a4f38d21") - val json = try { Json.parse(doc) } finally { doc.close() } - db.remove((json \ "_id").get.as[String], (json \ "_rev").get.as[String]) + // delete a saved doc to verify it's not included when loading data + db.remove(deletedDoc.get("_id").getAsString, deletedDoc.get("_rev").getAsString) val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight") // all docs in database minus the design doc and _deleted=true doc @@ -110,7 +107,6 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite { Try { client.deleteDB(saveDfToDb) } - // Saving dataframe to Cloudant db // to create a Cloudant db during save set the option createDBOnSave=true val df2 = df.filter(df("_id") >= "CAA") @@ -123,6 +119,11 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite { .load(saveDfToDb) assert(dfAirport.count() == 13) + + // delete 'airportcodemapping_df' database + Try { + client.deleteDB(saveDfToDb) + } } // view option tests
