[ 
https://issues.apache.org/jira/browse/BAHIR-110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16087380#comment-16087380
 ] 

ASF GitHub Bot commented on BAHIR-110:
--------------------------------------

Github user mayya-sharipova commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/45#discussion_r127467315
  
    --- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---
    @@ -98,29 +99,81 @@ class DefaultSource extends RelationProvider
     
           val config: CloudantConfig = 
JsonStoreConfigManager.getConfig(sqlContext, parameters)
     
    -      var allDocsDF: DataFrame = null
    +      var dataFrame: DataFrame = null
     
           val schema: StructType = {
             if (inSchema != null) {
               inSchema
    -        } else {
    -          val df = if (config.getSchemaSampleSize() ==
    -            JsonStoreConfigManager.ALL_DOCS_LIMIT &&
    +        } else if (!config.isInstanceOf[CloudantChangesConfig]
    +          || config.viewName != null || config.indexName != null) {
    +          val df = if (config.getSchemaSampleSize ==
    +            JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT &&
                 config.viewName == null
                 && config.indexName == null) {
                 val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, 
config)
    -            allDocsDF = sqlContext.read.json(cloudantRDD)
    -            allDocsDF
    +            dataFrame = sqlContext.read.json(cloudantRDD)
    +            dataFrame
               } else {
                 val dataAccess = new JsonStoreDataAccess(config)
                 val aRDD = sqlContext.sparkContext.parallelize(
    -                dataAccess.getMany(config.getSchemaSampleSize()))
    +                dataAccess.getMany(config.getSchemaSampleSize))
                 sqlContext.read.json(aRDD)
               }
               df.schema
    +        } else {
    +          /* Create a streaming context to handle transforming docs in
    +          * larger databases into Spark datasets
    +          */
    +          val ssc = new StreamingContext(sqlContext.sparkContext, 
Seconds(10))
    +          val streamingMap = {
    +            val selector = 
config.asInstanceOf[CloudantChangesConfig].getSelector
    +            if (selector != null) {
    +              Map(
    +                "database" -> config.getDbname,
    +                "selector" -> selector
    +              )
    +            } else {
    +              Map(
    +                "database" -> config.getDbname
    +              )
    +            }
    +          }
    +
    +          val changes = ssc.receiverStream(
    +            new CloudantReceiver(sqlContext.sparkContext.getConf, 
streamingMap))
    +          changes.persist(config.asInstanceOf[CloudantChangesConfig]
    +            .getStorageLevelForStreaming)
    +
    +          // Global RDD that's created from union of all RDDs
    +          var globalRDD = ssc.sparkContext.emptyRDD[String]
    +
    +          logger.info("Loading data from Cloudant using "
    +            + 
config.asInstanceOf[CloudantChangesConfig].getContinuousChangesUrl)
    +
    --- End diff --
    
    @emlaver  Here while trying to load a db, I am getting a message "Loading 
data from Cloudant using 
https://XXXX.cloudant.com/n_airportcodemapping/_changes?include_docs=true&feed=continuous&heartbeat=3000";
    
    We should NOT load data into Spark SQL using `continuous` feed (which for 
constantly updating database may be never be over).  The whole point of loading 
a db into Spark SQL is to load a snapshot of a db at a particular point of 
time.  Use `normal` feed here. 


> Replace use of _all_docs API with _changes API in all receivers
> ---------------------------------------------------------------
>
>                 Key: BAHIR-110
>                 URL: https://issues.apache.org/jira/browse/BAHIR-110
>             Project: Bahir
>          Issue Type: Improvement
>            Reporter: Esteban Laver
>   Original Estimate: 216h
>  Remaining Estimate: 216h
>
> Today we use the _changes API for Spark streaming receiver and _all_docs API 
> for non-streaming receiver. _all_docs API supports parallel reads (using 
> offset and range) but performance of _changes API is still better in most 
> cases (even with single threaded support).
> With this ticket we want to:
> a) re-implement all receivers using _changes API
> b) compare performance between the two implementations based on _changes and 
> _all_docs
> Based on the results in b) we could decide to either
> - replace _all_docs implementation with _changes based implementation OR
> - allow customers to pick one (with a solid documentation about pros and 
> cons) 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to