[ https://issues.apache.org/jira/browse/BAHIR-110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16097921#comment-16097921 ]
ASF GitHub Bot commented on BAHIR-110: -------------------------------------- Github user emlaver commented on a diff in the pull request: https://github.com/apache/bahir/pull/45#discussion_r128946219 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bahir.cloudant.internal + +import org.slf4j.{Logger, LoggerFactory} +import play.api.libs.json.Json +import scalaj.http._ + +import org.apache.spark.SparkConf +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver + +import org.apache.bahir.cloudant.CloudantChangesConfig +import org.apache.bahir.cloudant.common._ + + +class ChangesReceiver(config: CloudantChangesConfig) + extends Receiver[String](StorageLevel.MEMORY_AND_DISK) { + + def onStart() { + // Start the thread that receives data over a connection + new Thread("Cloudant Receiver") { + override def run() { receive() } + }.start() + } + + private def receive(): Unit = { + // Get total number of docs in database using _all_docs endpoint + val limit = new JsonStoreDataAccess(config) + .getTotalRows(config.getTotalUrl, queryUsed = false) + + // Get continuous _changes url + val url = config.getChangesReceiverUrl.toString + val selector: String = { + "{\"selector\":" + config.getSelector + "}" + } + + var count = 0 + val clRequest: HttpRequest = config.username match { + case null => + Http(url) + .postData(selector) + .timeout(connTimeoutMs = 1000, readTimeoutMs = 0) + .header("Content-Type", "application/json") + .header("User-Agent", "spark-cloudant") + case _ => + Http(url) + .postData(selector) + .timeout(connTimeoutMs = 1000, readTimeoutMs = 0) + .header("Content-Type", "application/json") + .header("User-Agent", "spark-cloudant") + .auth(config.username, config.password) + } + + clRequest.exec((code, headers, is) => { + if (code == 200) { + scala.io.Source.fromInputStream(is, "utf-8").getLines().foreach(line => { + if (line.length() > 0) { + val json = Json.parse(line) + val jsonDoc = (json \ "doc").getOrElse(null) + var doc = "" + if(jsonDoc != null) { + doc = Json.stringify(jsonDoc) + // Verify that doc is not empty and is not deleted + val deleted = (jsonDoc \ "_deleted").getOrElse(null) + if(!doc.isEmpty && deleted == null) { + store(doc) + count += 1 + } + } + } else if (count >= limit) { --- End diff -- Fixed in 8b40e38. > Implement _changes API for non-streaming receiver > ------------------------------------------------- > > Key: BAHIR-110 > URL: https://issues.apache.org/jira/browse/BAHIR-110 > Project: Bahir > Issue Type: Improvement > Reporter: Esteban Laver > Original Estimate: 216h > Remaining Estimate: 216h > > Today we use the _changes API for Spark streaming receiver and _all_docs API > for non-streaming receiver. _all_docs API supports parallel reads (using > offset and range) but performance of _changes API is still better in most > cases (even with single threaded support). > With this ticket we want to: > a) implement _changes API for non-streaming receivers > b) allow customers to pick either _all_docs (default) or _changes API > endpoint, with documentation about pros and cons > _changes performance details: > Successfully loaded Cloudant (using local cloudant-developer docker image) > docs into Spark (local standalone) with the following database sizes: 15GB > (time: 8 1/2 mins), 20GB (17 mins), 46GB (25 mins), and 75GB (48 1/2 mins). -- This message was sent by Atlassian JIRA (v6.4.14#64029)