Repository: bahir
Updated Branches:
  refs/heads/master 56e1deed3 -> 612f22b24


Set Spark version to 2.2.2 preparing for release


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/612f22b2
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/612f22b2
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/612f22b2

Branch: refs/heads/master
Commit: 612f22b24457c0046c5f4ef955cdd7c3cfc9a134
Parents: 56e1dee
Author: Luciano Resende <[email protected]>
Authored: Wed Nov 7 17:41:55 2018 -0800
Committer: Luciano Resende <[email protected]>
Committed: Wed Nov 7 17:43:23 2018 -0800

----------------------------------------------------------------------
 pom.xml                                                      | 2 +-
 .../spark/examples/sql/cloudant/CloudantStreaming.scala      | 2 +-
 .../examples/sql/cloudant/CloudantStreamingSelector.scala    | 2 +-
 .../main/scala/org/apache/bahir/cloudant/DefaultSource.scala | 8 ++++----
 4 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/612f22b2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9bd1cec..1edd641 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,7 +99,7 @@
     <log4j.version>1.2.17</log4j.version>
 
     <!-- Spark version -->
-    <spark.version>2.1.3</spark.version>
+    <spark.version>2.2.2</spark.version>
 
     <!-- MQTT Client -->
     <mqtt.paho.client>1.1.0</mqtt.paho.client>

http://git-wip-us.apache.org/repos/asf/bahir/blob/612f22b2/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
----------------------------------------------------------------------
diff --git 
a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
 
b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
index 4662b04..df00756 100644
--- 
a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
+++ 
b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
@@ -43,7 +43,7 @@ object CloudantStreaming {
 
       println(s"========= $time =========")// scalastyle:ignore
       // Convert RDD[String] to Dataset[String]
-      val changesDataFrame = spark.read.json(rdd)
+      val changesDataFrame = spark.read.json(rdd.toDS())
       if (changesDataFrame.schema.nonEmpty) {
         changesDataFrame.printSchema()
 

http://git-wip-us.apache.org/repos/asf/bahir/blob/612f22b2/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
----------------------------------------------------------------------
diff --git 
a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
 
b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
index 8c347bf..05eca9b 100644
--- 
a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
+++ 
b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
@@ -49,7 +49,7 @@ object CloudantStreamingSelector {
       // Get the singleton instance of SQLContext
 
       println(s"========= $time =========") // scalastyle:ignore
-      val changesDataFrame = spark.read.json(rdd)
+      val changesDataFrame = spark.read.json(rdd.toDS())
       if (changesDataFrame.schema.nonEmpty) {
         changesDataFrame.select("*").show()
         batchAmount = 
changesDataFrame.groupBy().sum("amount").collect()(0).getLong(0)

http://git-wip-us.apache.org/repos/asf/bahir/blob/612f22b2/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala 
b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
index ee071d0..47643cc 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
@@ -58,7 +58,7 @@ case class CloudantReadWriteRelation (config: CloudantConfig,
 
       logger.info("buildScan:" + columns + "," + origFilters)
       val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
-      val df = sqlContext.read.json(cloudantRDD)
+      val df = sqlContext.read.json(cloudantRDD.toDS())
       if (colsLength > 1) {
         val colsExceptCol0 = for (i <- 1 until colsLength) yield 
requiredColumns(i)
         df.select(requiredColumns(0), colsExceptCol0: _*).rdd
@@ -115,13 +115,13 @@ class DefaultSource extends RelationProvider
           config.viewPath == null
           && config.indexPath == null) {
           val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
-          dataFrame = sqlContext.read.json(cloudantRDD)
+          dataFrame = sqlContext.read.json(cloudantRDD.toDS())
           dataFrame
         } else {
           val dataAccess = new JsonStoreDataAccess(config)
           val aRDD = sqlContext.sparkContext.parallelize(
             dataAccess.getMany(config.getSchemaSampleSize))
-          sqlContext.read.json(aRDD)
+          sqlContext.read.json(aRDD.toDS())
         }
         df.schema
       } else {
@@ -147,7 +147,7 @@ class DefaultSource extends RelationProvider
             globalRDD = rdd ++ globalRDD
           } else {
             // Convert final global RDD[String] to DataFrame
-            dataFrame = sqlContext.sparkSession.read.json(globalRDD)
+            dataFrame = sqlContext.sparkSession.read.json(globalRDD.toDS())
             ssc.stop(stopSparkContext = false, stopGracefully = false)
           }
         })

Reply via email to