Repository: bahir
Updated Branches:
  refs/heads/master 57cb15fd3 -> 0e1505a89


[BAHIR-128] Improve sql-cloudant _changes receiver

This change improves the stability of _changes receiver and
fix the intermitent failing test in sql-cloudant's
CloudantChangesDFSuite.

How

Improve performance and decrease testing time by setting batch size
to 8 seconds and using seq_interval _changes feed option.
Use getResource to load json files path
Added Mike Rhodes's ChangesRowScanner for reading each _changes line
and transforming to GSON's JSON object
Added Mike Rhodes's ChangesRow representing a row in the changes feed

Closes #57


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/0e1505a8
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/0e1505a8
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/0e1505a8

Branch: refs/heads/master
Commit: 0e1505a8960bfe40ea025267bbf36ec5c4cf5c79
Parents: 57cb15f
Author: Esteban Laver <[email protected]>
Authored: Fri Sep 8 10:33:26 2017 -0400
Committer: Luciano Resende <[email protected]>
Committed: Tue Dec 19 20:26:50 2017 -0800

----------------------------------------------------------------------
 .../bahir/cloudant/common/ChangesRow.java       | 77 ++++++++++++++++++++
 .../cloudant/common/ChangesRowScanner.java      | 77 ++++++++++++++++++++
 .../bahir/cloudant/CloudantChangesConfig.scala  |  3 +-
 .../apache/bahir/cloudant/DefaultSource.scala   |  2 +-
 .../cloudant/common/JsonStoreDataAccess.scala   |  2 +-
 .../cloudant/internal/ChangesReceiver.scala     | 41 +++--------
 .../bahir/cloudant/ClientSparkFunSuite.scala    | 13 +++-
 .../bahir/cloudant/CloudantChangesDFSuite.scala | 15 ++--
 8 files changed, 185 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/0e1505a8/sql-cloudant/src/main/java/org/apache/bahir/cloudant/common/ChangesRow.java
----------------------------------------------------------------------
diff --git 
a/sql-cloudant/src/main/java/org/apache/bahir/cloudant/common/ChangesRow.java 
b/sql-cloudant/src/main/java/org/apache/bahir/cloudant/common/ChangesRow.java
new file mode 100644
index 0000000..a3ed125
--- /dev/null
+++ 
b/sql-cloudant/src/main/java/org/apache/bahir/cloudant/common/ChangesRow.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bahir.cloudant.common;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+import java.util.List;
+
+/**
+ * Class representing a single row in a changes feed. Structure:
+ *
+ * {
+ *   last_seq": 5
+ *   "results": [
+ *     ---*** This next items is the ChangesRow ***---
+ *     {
+ *       "changes": [ {"rev": "2-eec205a9d413992850a6e32678485900"}, ... ],
+ *       "deleted": true,
+ *       "id": "deleted",
+ *       "seq": 5,
+ *       "doc": ... structure ...
+ *     }
+ *   ]
+ * }
+ */
+public class ChangesRow {
+
+    public class Rev {
+        private String rev;
+
+        public String getRev() {
+            return rev;
+        }
+    }
+
+    private List<Rev> changes;
+    public Boolean deleted;
+    private String id;
+    private JsonElement seq;
+    private JsonObject doc;
+
+    public List<Rev> getChanges() {
+        return changes;
+    }
+
+    public String getSeq() {
+        if (seq.isJsonNull()) {
+            return null;
+        } else {
+            return seq.toString();
+        }
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public JsonObject getDoc() {
+        return doc;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/0e1505a8/sql-cloudant/src/main/java/org/apache/bahir/cloudant/common/ChangesRowScanner.java
----------------------------------------------------------------------
diff --git 
a/sql-cloudant/src/main/java/org/apache/bahir/cloudant/common/ChangesRowScanner.java
 
b/sql-cloudant/src/main/java/org/apache/bahir/cloudant/common/ChangesRowScanner.java
new file mode 100644
index 0000000..8fa2cde
--- /dev/null
+++ 
b/sql-cloudant/src/main/java/org/apache/bahir/cloudant/common/ChangesRowScanner.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bahir.cloudant.common;
+
+import com.google.gson.Gson;
+import java.io.BufferedReader;
+import java.io.IOException;
+
+/**
+ * This scanner will read through a _changes stream until it finds the
+ * next meaningful row, either a change entry or the closing line with
+ * the lastSeq and, perhaps, pending changes (for normal/longpoll feeds).
+ */
+public class ChangesRowScanner {
+
+    private static final Gson gson = new Gson();
+
+    /**
+     * Read up to the next meaningful line from the changes feed, and calls
+     * the passed delegate depending on what it finds. Works for all styles of
+     * changes feed (normal, longpoll, continuous).
+     *
+     * @return True if should continue
+     *
+     * @throws IOException if there's a problem reading the stream
+     */
+    public static ChangesRow readRowFromReader(BufferedReader changesReader)
+            throws IOException {
+
+        String line;
+
+        // Read the next line (empty = heartbeat, ignore; null = end of stream)
+        while ((line = changesReader.readLine()) != null) {
+            if (line.isEmpty()) {
+                continue;
+            }
+            if (line.startsWith("{\"results\":")) {
+                // ignore, just the START of the result set in normal/longpoll 
mode
+                continue;
+            } else if (line.startsWith("],")) {
+                // ignore, just the END of the result set in normal/longpoll 
mode
+                continue;
+            }
+            break;
+        }
+
+        if(line != null) {
+            if (line.startsWith("\"last_seq\":")) {
+                return null; // End of feed
+            } else if (line.startsWith("{\"last_seq\":")) {
+                return null; // End of feed
+            } else {
+                if (line.endsWith(",")) {
+                    line = line.substring(0, line.length() - 1);
+                }
+                ChangesRow r = gson.fromJson(line, ChangesRow.class);
+                return r; // not end of feed
+            }
+        } else {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/0e1505a8/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
----------------------------------------------------------------------
diff --git 
a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
 
b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
index 4c4e0b2..0e70b95 100644
--- 
a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
+++ 
b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
@@ -67,7 +67,8 @@ class CloudantChangesConfig(protocol: String, host: String, 
dbName: String,
   }
 
   def getChangesReceiverUrl: String = {
-    var url = dbUrl + "/" + defaultIndex + 
"?include_docs=true&feed=continuous&timeout=" + timeout
+    var url = dbUrl + "/" + defaultIndex + "?include_docs=true&feed=normal" +
+      "&seq_interval=" + bulkSize + "&timeout=" + timeout
     if (getSelector != null) {
       url = url + "&filter=_selector"
     }

http://git-wip-us.apache.org/repos/asf/bahir/blob/0e1505a8/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala 
b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
index 37f2f1b..1596133 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
@@ -125,7 +125,7 @@ class DefaultSource extends RelationProvider
           /* Create a streaming context to handle transforming docs in
           * larger databases into Spark datasets
           */
-          val ssc = new StreamingContext(sqlContext.sparkContext, Seconds(10))
+          val ssc = new StreamingContext(sqlContext.sparkContext, Seconds(8))
 
           val changesConfig = config.asInstanceOf[CloudantChangesConfig]
           val changes = ssc.receiverStream(

http://git-wip-us.apache.org/repos/asf/bahir/blob/0e1505a8/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
----------------------------------------------------------------------
diff --git 
a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
 
b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
index df5a18b..9d09ecb 100644
--- 
a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
+++ 
b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
@@ -130,7 +130,7 @@ class JsonStoreDataAccess (config: CloudantConfig)  {
         " request error: " + clResponse.body)
     }
     val data = postProcessor(clResponse.body)
-    logger.debug(s"got result:$data")
+    logger.debug(s"got result: $data")
     data
   }
 

http://git-wip-us.apache.org/repos/asf/bahir/blob/0e1505a8/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
----------------------------------------------------------------------
diff --git 
a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
 
b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
index de026ba..ac0aac6 100644
--- 
a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
+++ 
b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
@@ -16,18 +16,16 @@
  */
 package org.apache.bahir.cloudant.internal
 
-import org.slf4j.{Logger, LoggerFactory}
-import play.api.libs.json.Json
+import java.io.{BufferedReader, InputStreamReader}
+
 import scalaj.http._
 
-import org.apache.spark.SparkConf
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.receiver.Receiver
 
 import org.apache.bahir.cloudant.CloudantChangesConfig
 import org.apache.bahir.cloudant.common._
 
-
 class ChangesReceiver(config: CloudantChangesConfig)
   extends Receiver[String](StorageLevel.MEMORY_AND_DISK) {
 
@@ -39,28 +37,21 @@ class ChangesReceiver(config: CloudantChangesConfig)
   }
 
   private def receive(): Unit = {
-    // Get total number of docs in database using _all_docs endpoint
-    val limit = new JsonStoreDataAccess(config)
-      .getTotalRows(config.getTotalUrl, queryUsed = false)
-
-    // Get continuous _changes url
+    // Get normal _changes url
     val url = config.getChangesReceiverUrl.toString
     val selector: String = {
       "{\"selector\":" + config.getSelector + "}"
     }
 
-    var count = 0
     val clRequest: HttpRequest = config.username match {
       case null =>
         Http(url)
           .postData(selector)
-          .timeout(connTimeoutMs = 1000, readTimeoutMs = 0)
           .header("Content-Type", "application/json")
           .header("User-Agent", "spark-cloudant")
       case _ =>
         Http(url)
           .postData(selector)
-          .timeout(connTimeoutMs = 1000, readTimeoutMs = 0)
           .header("Content-Type", "application/json")
           .header("User-Agent", "spark-cloudant")
           .auth(config.username, config.password)
@@ -68,27 +59,15 @@ class ChangesReceiver(config: CloudantChangesConfig)
 
     clRequest.exec((code, headers, is) => {
       if (code == 200) {
-        scala.io.Source.fromInputStream(is, "utf-8").getLines().foreach(line 
=> {
-          if (count < limit) {
-            if (line.length() > 0) {
-              val json = Json.parse(line)
-              val jsonDoc = (json \ "doc").getOrElse(null)
-              var doc = ""
-              if (jsonDoc != null) {
-                doc = Json.stringify(jsonDoc)
-                // Verify that doc is not empty and is not deleted
-                val deleted = (jsonDoc \ "_deleted").getOrElse(null)
-                if (!doc.isEmpty && deleted == null) {
-                  store(doc)
-                  count += 1
-                }
-              }
+        var json = new ChangesRow()
+        if (is != null) {
+          val bufferedReader = new BufferedReader(new InputStreamReader(is))
+          while ((json = ChangesRowScanner.readRowFromReader(bufferedReader)) 
!= null) {
+            if (!isStopped() && json != null && !json.getDoc.has("_deleted")) {
+              store(json.getDoc.toString)
             }
-          } else {
-            // exit loop once limit is reached
-            return
           }
-        })
+        }
       } else {
         val status = headers.getOrElse("Status", IndexedSeq.empty)
         val errorMsg = "Error retrieving _changes feed " + config.getDbname + 
": " + status(0)

http://git-wip-us.apache.org/repos/asf/bahir/blob/0e1505a8/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala
 
b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala
index 94ceadf..aa8a48e 100644
--- 
a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala
+++ 
b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala
@@ -36,6 +36,7 @@ class ClientSparkFunSuite extends SparkFunSuite with 
BeforeAndAfter {
 
   var client: CloudantClient = _
   val conf: SparkConf = new SparkConf().setMaster("local[4]")
+  var deletedDoc = new JsonObject()
   var spark: SparkSession = _
 
   override def beforeAll() {
@@ -81,10 +82,10 @@ class ClientSparkFunSuite extends SparkFunSuite with 
BeforeAndAfter {
     // insert docs and design docs from JSON flat files
     for (dbName: String <- TestUtils.testDatabasesList) {
       val db = client.database(dbName, true)
-      val jsonFilePath = System.getProperty("user.dir") +
-        "/src/test/resources/json-files/" + dbName + ".json"
-      if (new File(jsonFilePath).exists()) {
-        val jsonFileArray = new Gson().fromJson(new FileReader(jsonFilePath), 
classOf[JsonArray])
+      val jsonFilePath = getClass.getResource("/json-files/" + dbName + 
".json")
+      if (jsonFilePath != null && new File(jsonFilePath.getFile).exists()) {
+        val jsonFileArray = new Gson().fromJson(new 
FileReader(jsonFilePath.getFile),
+          classOf[JsonArray])
         val listOfObjects = new util.ArrayList[JsonObject]
         if (jsonFileArray != null) {
           var i = 0
@@ -100,6 +101,10 @@ class ClientSparkFunSuite extends SparkFunSuite with 
BeforeAndAfter {
           assert(responses.get(i).getStatusCode == 200 || 
responses.get(i).getStatusCode == 201)
           i += 1
         }
+        if (dbName == "n_flight") {
+          deletedDoc.addProperty("_id", responses.get(0).getId)
+          deletedDoc.addProperty("_rev", responses.get(0).getRev)
+        }
 
       }
     }

http://git-wip-us.apache.org/repos/asf/bahir/blob/0e1505a8/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala
 
b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala
index da51d9f..5e8f6f6 100644
--- 
a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala
+++ 
b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala
@@ -19,8 +19,6 @@ package org.apache.bahir.cloudant
 
 import scala.util.Try
 
-import play.api.libs.json._
-
 import org.apache.spark.sql.SparkSession
 
 class CloudantChangesDFSuite extends ClientSparkFunSuite {
@@ -47,6 +45,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
     // Caching df in memory to speed computations
     // and not to retrieve data from cloudant again
     df.cache()
+
     // all docs in database minus the design doc
     assert(df.count() == 1967)
   }
@@ -62,10 +61,8 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
 
   testIfEnabled("load data and verify deleted doc is not in results") {
     val db = client.database("n_flight", false)
-    // Find then delete a doc to verify it's not included when loading data
-    val doc = db.find("003bd483-9f98-4203-afdd-c539a4f38d21")
-    val json = try {  Json.parse(doc) } finally { doc.close() }
-    db.remove((json \ "_id").get.as[String], (json \ "_rev").get.as[String])
+    // delete a saved doc to verify it's not included when loading data
+    db.remove(deletedDoc.get("_id").getAsString, 
deletedDoc.get("_rev").getAsString)
 
     val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight")
     // all docs in database minus the design doc and _deleted=true doc
@@ -110,7 +107,6 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
     Try {
       client.deleteDB(saveDfToDb)
     }
-
     // Saving dataframe to Cloudant db
     // to create a Cloudant db during save set the option createDBOnSave=true
     val df2 = df.filter(df("_id") >= "CAA")
@@ -123,6 +119,11 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
       .load(saveDfToDb)
 
     assert(dfAirport.count() == 13)
+
+    // delete 'airportcodemapping_df' database
+    Try {
+      client.deleteDB(saveDfToDb)
+    }
   }
 
   // view option tests

Reply via email to