[ 
https://issues.apache.org/jira/browse/BAHIR-110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16096674#comment-16096674
 ] 

ASF GitHub Bot commented on BAHIR-110:
--------------------------------------

Github user mayya-sharipova commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/45#discussion_r128830778
  
    --- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
 ---
    @@ -0,0 +1,100 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.bahir.cloudant.internal
    +
    +import org.slf4j.{Logger, LoggerFactory}
    +import play.api.libs.json.Json
    +import scalaj.http._
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.receiver.Receiver
    +
    +import org.apache.bahir.cloudant.CloudantChangesConfig
    +import org.apache.bahir.cloudant.common._
    +
    +
    +class ChangesReceiver(config: CloudantChangesConfig)
    +  extends Receiver[String](StorageLevel.MEMORY_AND_DISK) {
    +
    +  def onStart() {
    +    // Start the thread that receives data over a connection
    +    new Thread("Cloudant Receiver") {
    +      override def run() { receive() }
    +    }.start()
    +  }
    +
    +  private def receive(): Unit = {
    +    // Get total number of docs in database using _all_docs endpoint
    +    val limit = new JsonStoreDataAccess(config)
    +      .getTotalRows(config.getTotalUrl, queryUsed = false)
    +
    +    // Get continuous _changes url
    +    val url = config.getChangesReceiverUrl.toString
    +    val selector: String = {
    +      "{\"selector\":" + config.getSelector + "}"
    +    }
    +
    +    var count = 0
    +    val clRequest: HttpRequest = config.username match {
    +      case null =>
    +        Http(url)
    +          .postData(selector)
    +          .timeout(connTimeoutMs = 1000, readTimeoutMs = 0)
    +          .header("Content-Type", "application/json")
    +          .header("User-Agent", "spark-cloudant")
    +      case _ =>
    +        Http(url)
    +          .postData(selector)
    +          .timeout(connTimeoutMs = 1000, readTimeoutMs = 0)
    +          .header("Content-Type", "application/json")
    +          .header("User-Agent", "spark-cloudant")
    +          .auth(config.username, config.password)
    +    }
    +
    +    clRequest.exec((code, headers, is) => {
    +      if (code == 200) {
    +        scala.io.Source.fromInputStream(is, 
"utf-8").getLines().foreach(line => {
    +          if (line.length() > 0) {
    +            val json = Json.parse(line)
    +            val jsonDoc = (json \ "doc").getOrElse(null)
    +            var doc = ""
    +            if(jsonDoc != null) {
    +              doc = Json.stringify(jsonDoc)
    +              // Verify that doc is not empty and is not deleted
    +              val deleted = (jsonDoc \ "_deleted").getOrElse(null)
    +              if(!doc.isEmpty && deleted == null) {
    +                store(doc)
    +                count += 1
    +              }
    +            }
    +          } else if (count >= limit) {
    --- End diff --
    
    should we check `(count >= limit)` before line 72 for all objects, not only 
empty ones?


> Replace use of _all_docs API with _changes API in all receivers
> ---------------------------------------------------------------
>
>                 Key: BAHIR-110
>                 URL: https://issues.apache.org/jira/browse/BAHIR-110
>             Project: Bahir
>          Issue Type: Improvement
>            Reporter: Esteban Laver
>   Original Estimate: 216h
>  Remaining Estimate: 216h
>
> Today we use the _changes API for Spark streaming receiver and _all_docs API 
> for non-streaming receiver. _all_docs API supports parallel reads (using 
> offset and range) but performance of _changes API is still better in most 
> cases (even with single threaded support).
> With this ticket we want to:
> a) re-implement all receivers using _changes API
> b) compare performance between the two implementations based on _changes and 
> _all_docs
> Based on the results in b) we could decide to either
> - replace _all_docs implementation with _changes based implementation OR
> - allow customers to pick one (with a solid documentation about pros and 
> cons) 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to