[ https://issues.apache.org/jira/browse/BAHIR-110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16087380#comment-16087380 ]
ASF GitHub Bot commented on BAHIR-110: -------------------------------------- Github user mayya-sharipova commented on a diff in the pull request: https://github.com/apache/bahir/pull/45#discussion_r127467315 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala --- @@ -98,29 +99,81 @@ class DefaultSource extends RelationProvider val config: CloudantConfig = JsonStoreConfigManager.getConfig(sqlContext, parameters) - var allDocsDF: DataFrame = null + var dataFrame: DataFrame = null val schema: StructType = { if (inSchema != null) { inSchema - } else { - val df = if (config.getSchemaSampleSize() == - JsonStoreConfigManager.ALL_DOCS_LIMIT && + } else if (!config.isInstanceOf[CloudantChangesConfig] + || config.viewName != null || config.indexName != null) { + val df = if (config.getSchemaSampleSize == + JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT && config.viewName == null && config.indexName == null) { val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) - allDocsDF = sqlContext.read.json(cloudantRDD) - allDocsDF + dataFrame = sqlContext.read.json(cloudantRDD) + dataFrame } else { val dataAccess = new JsonStoreDataAccess(config) val aRDD = sqlContext.sparkContext.parallelize( - dataAccess.getMany(config.getSchemaSampleSize())) + dataAccess.getMany(config.getSchemaSampleSize)) sqlContext.read.json(aRDD) } df.schema + } else { + /* Create a streaming context to handle transforming docs in + * larger databases into Spark datasets + */ + val ssc = new StreamingContext(sqlContext.sparkContext, Seconds(10)) + val streamingMap = { + val selector = config.asInstanceOf[CloudantChangesConfig].getSelector + if (selector != null) { + Map( + "database" -> config.getDbname, + "selector" -> selector + ) + } else { + Map( + "database" -> config.getDbname + ) + } + } + + val changes = ssc.receiverStream( + new CloudantReceiver(sqlContext.sparkContext.getConf, streamingMap)) + changes.persist(config.asInstanceOf[CloudantChangesConfig] + .getStorageLevelForStreaming) + + // Global RDD that's created from union of all RDDs + var globalRDD = ssc.sparkContext.emptyRDD[String] + + logger.info("Loading data from Cloudant using " + + config.asInstanceOf[CloudantChangesConfig].getContinuousChangesUrl) + --- End diff -- @emlaver Here while trying to load a db, I am getting a message "Loading data from Cloudant using https://XXXX.cloudant.com/n_airportcodemapping/_changes?include_docs=true&feed=continuous&heartbeat=3000" We should NOT load data into Spark SQL using `continuous` feed (which for constantly updating database may be never be over). The whole point of loading a db into Spark SQL is to load a snapshot of a db at a particular point of time. Use `normal` feed here. > Replace use of _all_docs API with _changes API in all receivers > --------------------------------------------------------------- > > Key: BAHIR-110 > URL: https://issues.apache.org/jira/browse/BAHIR-110 > Project: Bahir > Issue Type: Improvement > Reporter: Esteban Laver > Original Estimate: 216h > Remaining Estimate: 216h > > Today we use the _changes API for Spark streaming receiver and _all_docs API > for non-streaming receiver. _all_docs API supports parallel reads (using > offset and range) but performance of _changes API is still better in most > cases (even with single threaded support). > With this ticket we want to: > a) re-implement all receivers using _changes API > b) compare performance between the two implementations based on _changes and > _all_docs > Based on the results in b) we could decide to either > - replace _all_docs implementation with _changes based implementation OR > - allow customers to pick one (with a solid documentation about pros and > cons) -- This message was sent by Atlassian JIRA (v6.4.14#64029)