Repository: bahir Updated Branches: refs/heads/master d2dec8416 -> 180bd890d
Set Spark version to 2.1.2 preparing for release Update the Spark version to Spark 2.1.2 and update necessary code to properly compile with the cited spark version. Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/180bd890 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/180bd890 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/180bd890 Branch: refs/heads/master Commit: 180bd890de47374650cb5fa65f20782161e95bf9 Parents: d2dec84 Author: Luciano Resende <[email protected]> Authored: Wed May 30 11:25:02 2018 -0700 Committer: Luciano Resende <[email protected]> Committed: Wed May 30 11:25:02 2018 -0700 ---------------------------------------------------------------------- pom.xml | 2 +- .../spark/examples/sql/cloudant/CloudantStreaming.scala | 2 +- .../examples/sql/cloudant/CloudantStreamingSelector.scala | 2 +- .../main/scala/org/apache/bahir/cloudant/DefaultSource.scala | 8 ++++---- 4 files changed, 7 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/180bd890/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index dc54de1..bb6e101 100644 --- a/pom.xml +++ b/pom.xml @@ -99,7 +99,7 @@ <log4j.version>1.2.17</log4j.version> <!-- Spark version --> - <spark.version>2.2.0</spark.version> + <spark.version>2.1.2</spark.version> <!-- MQTT Client --> <mqtt.paho.client>1.1.0</mqtt.paho.client> http://git-wip-us.apache.org/repos/asf/bahir/blob/180bd890/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala index df00756..4662b04 100644 --- a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala +++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala @@ -43,7 +43,7 @@ object CloudantStreaming { println(s"========= $time =========")// scalastyle:ignore // Convert RDD[String] to Dataset[String] - val changesDataFrame = spark.read.json(rdd.toDS()) + val changesDataFrame = spark.read.json(rdd) if (changesDataFrame.schema.nonEmpty) { changesDataFrame.printSchema() http://git-wip-us.apache.org/repos/asf/bahir/blob/180bd890/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala index 05eca9b..8c347bf 100644 --- a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala +++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala @@ -49,7 +49,7 @@ object CloudantStreamingSelector { // Get the singleton instance of SQLContext println(s"========= $time =========") // scalastyle:ignore - val changesDataFrame = spark.read.json(rdd.toDS()) + val changesDataFrame = spark.read.json(rdd) if (changesDataFrame.schema.nonEmpty) { changesDataFrame.select("*").show() batchAmount = changesDataFrame.groupBy().sum("amount").collect()(0).getLong(0) http://git-wip-us.apache.org/repos/asf/bahir/blob/180bd890/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala index 47643cc..ee071d0 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala @@ -58,7 +58,7 @@ case class CloudantReadWriteRelation (config: CloudantConfig, logger.info("buildScan:" + columns + "," + origFilters) val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) - val df = sqlContext.read.json(cloudantRDD.toDS()) + val df = sqlContext.read.json(cloudantRDD) if (colsLength > 1) { val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i) df.select(requiredColumns(0), colsExceptCol0: _*).rdd @@ -115,13 +115,13 @@ class DefaultSource extends RelationProvider config.viewPath == null && config.indexPath == null) { val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) - dataFrame = sqlContext.read.json(cloudantRDD.toDS()) + dataFrame = sqlContext.read.json(cloudantRDD) dataFrame } else { val dataAccess = new JsonStoreDataAccess(config) val aRDD = sqlContext.sparkContext.parallelize( dataAccess.getMany(config.getSchemaSampleSize)) - sqlContext.read.json(aRDD.toDS()) + sqlContext.read.json(aRDD) } df.schema } else { @@ -147,7 +147,7 @@ class DefaultSource extends RelationProvider globalRDD = rdd ++ globalRDD } else { // Convert final global RDD[String] to DataFrame - dataFrame = sqlContext.sparkSession.read.json(globalRDD.toDS()) + dataFrame = sqlContext.sparkSession.read.json(globalRDD) ssc.stop(stopSparkContext = false, stopGracefully = false) } })
