[ 
https://issues.apache.org/jira/browse/BAHIR-110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16097921#comment-16097921
 ] 

ASF GitHub Bot commented on BAHIR-110:
--------------------------------------

Github user emlaver commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/45#discussion_r128946219
  
    --- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
 ---
    @@ -0,0 +1,100 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.bahir.cloudant.internal
    +
    +import org.slf4j.{Logger, LoggerFactory}
    +import play.api.libs.json.Json
    +import scalaj.http._
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.receiver.Receiver
    +
    +import org.apache.bahir.cloudant.CloudantChangesConfig
    +import org.apache.bahir.cloudant.common._
    +
    +
    +class ChangesReceiver(config: CloudantChangesConfig)
    +  extends Receiver[String](StorageLevel.MEMORY_AND_DISK) {
    +
    +  def onStart() {
    +    // Start the thread that receives data over a connection
    +    new Thread("Cloudant Receiver") {
    +      override def run() { receive() }
    +    }.start()
    +  }
    +
    +  private def receive(): Unit = {
    +    // Get total number of docs in database using _all_docs endpoint
    +    val limit = new JsonStoreDataAccess(config)
    +      .getTotalRows(config.getTotalUrl, queryUsed = false)
    +
    +    // Get continuous _changes url
    +    val url = config.getChangesReceiverUrl.toString
    +    val selector: String = {
    +      "{\"selector\":" + config.getSelector + "}"
    +    }
    +
    +    var count = 0
    +    val clRequest: HttpRequest = config.username match {
    +      case null =>
    +        Http(url)
    +          .postData(selector)
    +          .timeout(connTimeoutMs = 1000, readTimeoutMs = 0)
    +          .header("Content-Type", "application/json")
    +          .header("User-Agent", "spark-cloudant")
    +      case _ =>
    +        Http(url)
    +          .postData(selector)
    +          .timeout(connTimeoutMs = 1000, readTimeoutMs = 0)
    +          .header("Content-Type", "application/json")
    +          .header("User-Agent", "spark-cloudant")
    +          .auth(config.username, config.password)
    +    }
    +
    +    clRequest.exec((code, headers, is) => {
    +      if (code == 200) {
    +        scala.io.Source.fromInputStream(is, 
"utf-8").getLines().foreach(line => {
    +          if (line.length() > 0) {
    +            val json = Json.parse(line)
    +            val jsonDoc = (json \ "doc").getOrElse(null)
    +            var doc = ""
    +            if(jsonDoc != null) {
    +              doc = Json.stringify(jsonDoc)
    +              // Verify that doc is not empty and is not deleted
    +              val deleted = (jsonDoc \ "_deleted").getOrElse(null)
    +              if(!doc.isEmpty && deleted == null) {
    +                store(doc)
    +                count += 1
    +              }
    +            }
    +          } else if (count >= limit) {
    --- End diff --
    
    Fixed in 8b40e38.
    
    



> Implement _changes API for non-streaming receiver
> -------------------------------------------------
>
>                 Key: BAHIR-110
>                 URL: https://issues.apache.org/jira/browse/BAHIR-110
>             Project: Bahir
>          Issue Type: Improvement
>            Reporter: Esteban Laver
>   Original Estimate: 216h
>  Remaining Estimate: 216h
>
> Today we use the _changes API for Spark streaming receiver and _all_docs API 
> for non-streaming receiver. _all_docs API supports parallel reads (using 
> offset and range) but performance of _changes API is still better in most 
> cases (even with single threaded support).
> With this ticket we want to:
> a) implement _changes API for non-streaming receivers
> b) allow customers to pick either _all_docs (default) or _changes API 
> endpoint, with documentation about pros and cons
> _changes performance details:
> Successfully loaded Cloudant (using local cloudant-developer docker image) 
> docs into Spark (local standalone) with the following database sizes: 15GB 
> (time: 8 1/2 mins), 20GB (17 mins), 46GB (25 mins), and 75GB (48 1/2 mins).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to