Repository: bahir Updated Branches: refs/heads/master ebdc8b257 -> 785ee1e1a
[BAHIR-138] Fix deprecation warning messages - Imported âspark.implicits._â to convert Spark RDD to Dataset - Replaced deprecated `json(RDD[String])` with `json(Dataset[String])` Closes #63 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/785ee1e1 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/785ee1e1 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/785ee1e1 Branch: refs/heads/master Commit: 785ee1e1acfb129bb0524d79df3372968b9e95a7 Parents: ebdc8b2 Author: Esteban Laver <emla...@us.ibm.com> Authored: Fri Jan 12 00:26:29 2018 -0500 Committer: Luciano Resende <lrese...@apache.org> Committed: Wed Jan 24 23:47:19 2018 -0500 ---------------------------------------------------------------------- .../apache/bahir/cloudant/DefaultSource.scala | 243 ++++++++++--------- 1 file changed, 122 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/785ee1e1/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala index 36c2c78..2685993 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala @@ -30,57 +30,58 @@ import org.apache.bahir.cloudant.internal.ChangesReceiver case class CloudantReadWriteRelation (config: CloudantConfig, schema: StructType, dataFrame: DataFrame = null) - (@transient val sqlContext: SQLContext) + (@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan with InsertableRelation { - @transient lazy val dataAccess: JsonStoreDataAccess = {new JsonStoreDataAccess(config)} + @transient lazy val dataAccess: JsonStoreDataAccess = {new JsonStoreDataAccess(config)} - implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass) + implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass) - def buildScan(requiredColumns: Array[String], + import sqlContext.implicits._ + + def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { - val colsLength = requiredColumns.length + val colsLength = requiredColumns.length - if (dataFrame != null) { - if (colsLength == 0) { - dataFrame.select().rdd - } else if (colsLength == 1) { - dataFrame.select(requiredColumns(0)).rdd - } else { - val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i) - dataFrame.select(requiredColumns(0), colsExceptCol0: _*).rdd - } + if (dataFrame != null) { + if (colsLength == 0) { + dataFrame.select().rdd + } else if (colsLength == 1) { + dataFrame.select(requiredColumns(0)).rdd } else { - implicit val columns : Array[String] = requiredColumns - implicit val origFilters : Array[Filter] = filters - - logger.info("buildScan:" + columns + "," + origFilters) - val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) - val df = sqlContext.read.json(cloudantRDD) - if (colsLength > 1) { - val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i) - df.select(requiredColumns(0), colsExceptCol0: _*).rdd - } else { - df.rdd - } + val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i) + dataFrame.select(requiredColumns(0), colsExceptCol0: _*).rdd + } + } else { + implicit val columns : Array[String] = requiredColumns + implicit val origFilters : Array[Filter] = filters + + logger.info("buildScan:" + columns + "," + origFilters) + val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) + val df = sqlContext.read.json(cloudantRDD.toDS()) + if (colsLength > 1) { + val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i) + df.select(requiredColumns(0), colsExceptCol0: _*).rdd + } else { + df.rdd } } - + } def insert(data: DataFrame, overwrite: Boolean): Unit = { - if (config.getCreateDBonSave) { - dataAccess.createDB() - } - if (data.count() == 0) { - logger.warn("Database " + config.getDbname + - ": nothing was saved because the number of records was 0!") - } else { - val result = data.toJSON.foreachPartition { x => - val list = x.toList // Has to pass as List, Iterator results in 0 data - dataAccess.saveAll(list) - } + if (config.getCreateDBonSave) { + dataAccess.createDB() + } + if (data.count() == 0) { + logger.warn("Database " + config.getDbname + + ": nothing was saved because the number of records was 0!") + } else { + val result = data.toJSON.foreachPartition { x => + val list = x.toList // Has to pass as List, Iterator results in 0 data + dataAccess.saveAll(list) } } + } } class DefaultSource extends RelationProvider @@ -91,97 +92,97 @@ class DefaultSource extends RelationProvider def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): CloudantReadWriteRelation = { - create(sqlContext, parameters, null) - } - - private def create(sqlContext: SQLContext, - parameters: Map[String, String], - inSchema: StructType) = { - - val config: CloudantConfig = JsonStoreConfigManager.getConfig(sqlContext, parameters) - - var dataFrame: DataFrame = null - - val schema: StructType = { - if (inSchema != null) { - inSchema - } else if (!config.isInstanceOf[CloudantChangesConfig] - || config.viewName != null || config.indexName != null) { - val df = if (config.getSchemaSampleSize == - JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT && - config.viewName == null - && config.indexName == null) { - val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) - dataFrame = sqlContext.read.json(cloudantRDD) - dataFrame - } else { - val dataAccess = new JsonStoreDataAccess(config) - val aRDD = sqlContext.sparkContext.parallelize( - dataAccess.getMany(config.getSchemaSampleSize)) - sqlContext.read.json(aRDD) - } - df.schema + create(sqlContext, parameters, null) + } + + private def create(sqlContext: SQLContext, + parameters: Map[String, String], + inSchema: StructType) = { + + import sqlContext.implicits._ + + val config: CloudantConfig = JsonStoreConfigManager.getConfig(sqlContext, parameters) + + var dataFrame: DataFrame = null + + val schema: StructType = { + if (inSchema != null) { + inSchema + } else if (!config.isInstanceOf[CloudantChangesConfig] + || config.viewName != null || config.indexName != null) { + val df = if (config.getSchemaSampleSize == + JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT && + config.viewName == null + && config.indexName == null) { + val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) + dataFrame = sqlContext.read.json(cloudantRDD.toDS()) + dataFrame } else { - /* Create a streaming context to handle transforming docs in - * larger databases into Spark datasets - */ - val changesConfig = config.asInstanceOf[CloudantChangesConfig] - val ssc = new StreamingContext(sqlContext.sparkContext, - Seconds(changesConfig.getBatchInterval)) - - val changes = ssc.receiverStream( - new ChangesReceiver(changesConfig)) - changes.persist(changesConfig.getStorageLevelForStreaming) - - // Global RDD that's created from union of all RDDs - var globalRDD = ssc.sparkContext.emptyRDD[String] - - logger.info("Loading data from Cloudant using " - + changesConfig.getChangesReceiverUrl) - - // Collect and union each RDD to convert all RDDs to a DataFrame - changes.foreachRDD((rdd: RDD[String]) => { - if (!rdd.isEmpty()) { - if (globalRDD != null) { - // Union RDDs in foreach loop - globalRDD = globalRDD.union(rdd) - } else { - globalRDD = rdd - } + val dataAccess = new JsonStoreDataAccess(config) + val aRDD = sqlContext.sparkContext.parallelize( + dataAccess.getMany(config.getSchemaSampleSize)) + sqlContext.read.json(aRDD.toDS()) + } + df.schema + } else { + /* Create a streaming context to handle transforming docs in + * larger databases into Spark datasets + */ + val changesConfig = config.asInstanceOf[CloudantChangesConfig] + val ssc = new StreamingContext(sqlContext.sparkContext, + Seconds(changesConfig.getBatchInterval)) + + val changes = ssc.receiverStream(new ChangesReceiver(changesConfig)) + changes.persist(changesConfig.getStorageLevelForStreaming) + + // Global RDD that's created from union of all RDDs + var globalRDD = ssc.sparkContext.emptyRDD[String] + + logger.info("Loading data from Cloudant using " + + changesConfig.getChangesReceiverUrl) + + // Collect and union each RDD to convert all RDDs to a DataFrame + changes.foreachRDD((rdd: RDD[String]) => { + if (!rdd.isEmpty()) { + if (globalRDD != null) { + // Union RDDs in foreach loop + globalRDD = globalRDD.union(rdd) } else { - // Convert final global RDD[String] to DataFrame - dataFrame = sqlContext.sparkSession.read.json(globalRDD) - ssc.stop(stopSparkContext = false, stopGracefully = false) + globalRDD = rdd } - }) - - ssc.start - // run streaming until all docs from continuous feed are received - ssc.awaitTermination - - if(dataFrame.schema.nonEmpty) { - dataFrame.schema } else { - throw new CloudantException(CloudantChangesConfig.receiverErrorMsg) + // Convert final global RDD[String] to DataFrame + dataFrame = sqlContext.sparkSession.read.json(globalRDD.toDS()) + ssc.stop(stopSparkContext = false, stopGracefully = false) } + }) + + ssc.start + // run streaming until all docs from continuous feed are received + ssc.awaitTermination + + if(dataFrame.schema.nonEmpty) { + dataFrame.schema + } else { + throw new CloudantException(CloudantChangesConfig.receiverErrorMsg) } } - CloudantReadWriteRelation(config, schema, dataFrame)(sqlContext) - } - - def createRelation(sqlContext: SQLContext, - mode: SaveMode, - parameters: Map[String, String], - data: DataFrame): CloudantReadWriteRelation = { - val relation = create(sqlContext, parameters, data.schema) - relation.insert(data, mode==SaveMode.Overwrite) - relation } + CloudantReadWriteRelation(config, schema, dataFrame)(sqlContext) + } - def createRelation(sqlContext: SQLContext, - parameters: Map[String, String], - schema: StructType): CloudantReadWriteRelation = { - create(sqlContext, parameters, schema) - } + def createRelation(sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): CloudantReadWriteRelation = { + val relation = create(sqlContext, parameters, data.schema) + relation.insert(data, mode==SaveMode.Overwrite) + relation + } + def createRelation(sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): CloudantReadWriteRelation = { + create(sqlContext, parameters, schema) + } }