[ 
https://issues.apache.org/jira/browse/BAHIR-110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082938#comment-16082938
 ] 

ASF GitHub Bot commented on BAHIR-110:
--------------------------------------

Github user emlaver commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/45#discussion_r126802062
  
    --- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---
    @@ -98,29 +99,89 @@ class DefaultSource extends RelationProvider
     
           val config: CloudantConfig = 
JsonStoreConfigManager.getConfig(sqlContext, parameters)
     
    -      var allDocsDF: DataFrame = null
    +      var dataFrame: DataFrame = null
     
           val schema: StructType = {
             if (inSchema != null) {
               inSchema
    -        } else {
    -          val df = if (config.getSchemaSampleSize() ==
    -            JsonStoreConfigManager.ALL_DOCS_LIMIT &&
    +        } else if (!config.isInstanceOf[CloudantChangesConfig]
    +          || config.viewName != null || config.indexName != null) {
    +          val df = if (config.getSchemaSampleSize ==
    +            JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT &&
                 config.viewName == null
                 && config.indexName == null) {
                 val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, 
config)
    -            allDocsDF = sqlContext.read.json(cloudantRDD)
    -            allDocsDF
    +            dataFrame = sqlContext.read.json(cloudantRDD)
    +            dataFrame
               } else {
                 val dataAccess = new JsonStoreDataAccess(config)
                 val aRDD = sqlContext.sparkContext.parallelize(
    -                dataAccess.getMany(config.getSchemaSampleSize()))
    +                dataAccess.getMany(config.getSchemaSampleSize))
                 sqlContext.read.json(aRDD)
               }
               df.schema
    +        } else {
    +          /* Create a streaming context to handle transforming docs in
    +          * larger databases into Spark datasets
    +          */
    +          /* Allow the raw data and persisted RDDs to be accessible outside
    +          * of the streaming context.
    +          * See https://spark.apache.org/docs/latest/configuration.html
    +          * for more details.
    +          */
    +          sqlContext.sparkSession.conf.set("spark.streaming.unpersist", 
"false")
    +
    +          val ssc = new StreamingContext(sqlContext.sparkContext, 
Seconds(10))
    +          val streamingMap = {
    +            val selector = 
config.asInstanceOf[CloudantChangesConfig].getSelector
    +            if (selector != null) {
    +              Map(
    +                "database" -> config.getDbname,
    +                "selector" -> selector
    +              )
    +            } else {
    +              Map(
    +                "database" -> config.getDbname
    +              )
    +            }
    +          }
    +
    +          val changes = ssc.receiverStream(
    +            new CloudantReceiver(sqlContext.sparkContext.getConf, 
streamingMap))
    +          changes.persist(config.asInstanceOf[CloudantChangesConfig]
    +            .getStorageLevelForStreaming)
    +
    +          // Global RDD that's created from union of all RDDs
    +          var globalRDD = ssc.sparkContext.emptyRDD[String]
    +
    +          logger.info("Loading data from Cloudant using "
    +            + 
config.asInstanceOf[CloudantChangesConfig].getContinuousChangesUrl)
    +
    +          // Collect and union each RDD to convert all RDDs to a DataFrame
    +          changes.foreachRDD((rdd: RDD[String]) => {
    +            if (!rdd.isEmpty()) {
    +              if (globalRDD != null) {
    +                // Union RDDs in foreach loop
    +                globalRDD = globalRDD.union(rdd)
    +              } else {
    +                globalRDD = rdd
    +              }
    +            } else {
    +              // Convert final global RDD[String] to DataFrame
    +              dataFrame = sqlContext.sparkSession.read.json(globalRDD)
    +              ssc.stop(stopSparkContext = false, stopGracefully = false)
    +            }
    +          })
    +
    +          ssc.start
    +          // run streaming until all docs from continuous feed are received
    +          ssc.awaitTermination
    +          // ssc.stop(stopSparkContext = false, stopGracefully = false)
    --- End diff --
    
    Fixed in 5588c87.


> Replace use of _all_docs API with _changes API in all receivers
> ---------------------------------------------------------------
>
>                 Key: BAHIR-110
>                 URL: https://issues.apache.org/jira/browse/BAHIR-110
>             Project: Bahir
>          Issue Type: Improvement
>            Reporter: Esteban Laver
>   Original Estimate: 216h
>  Remaining Estimate: 216h
>
> Today we use the _changes API for Spark streaming receiver and _all_docs API 
> for non-streaming receiver. _all_docs API supports parallel reads (using 
> offset and range) but performance of _changes API is still better in most 
> cases (even with single threaded support).
> With this ticket we want to:
> a) re-implement all receivers using _changes API
> b) compare performance between the two implementations based on _changes and 
> _all_docs
> Based on the results in b) we could decide to either
> - replace _all_docs implementation with _changes based implementation OR
> - allow customers to pick one (with a solid documentation about pros and 
> cons) 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to