Repository: bahir
Updated Branches:
  refs/heads/master b36632a1a -> a5ea67906


Set Spark version to 2.2.1 preparing for release


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/a5ea6790
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/a5ea6790
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/a5ea6790

Branch: refs/heads/master
Commit: a5ea6790620a2c1bad85561e4142efd05e9bfdce
Parents: b36632a
Author: Luciano Resende <lrese...@apache.org>
Authored: Wed Jun 6 15:05:37 2018 +0200
Committer: Luciano Resende <lrese...@apache.org>
Committed: Wed Jun 6 15:14:00 2018 +0200

----------------------------------------------------------------------
 pom.xml                                                      | 2 +-
 .../spark/examples/sql/cloudant/CloudantStreaming.scala      | 2 +-
 .../examples/sql/cloudant/CloudantStreamingSelector.scala    | 2 +-
 .../main/scala/org/apache/bahir/cloudant/DefaultSource.scala | 8 ++++----
 4 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/a5ea6790/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bb6e101..1989a77 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,7 +99,7 @@
     <log4j.version>1.2.17</log4j.version>
 
     <!-- Spark version -->
-    <spark.version>2.1.2</spark.version>
+    <spark.version>2.2.1</spark.version>
 
     <!-- MQTT Client -->
     <mqtt.paho.client>1.1.0</mqtt.paho.client>

http://git-wip-us.apache.org/repos/asf/bahir/blob/a5ea6790/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
----------------------------------------------------------------------
diff --git 
a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
 
b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
index 4662b04..df00756 100644
--- 
a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
+++ 
b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
@@ -43,7 +43,7 @@ object CloudantStreaming {
 
       println(s"========= $time =========")// scalastyle:ignore
       // Convert RDD[String] to Dataset[String]
-      val changesDataFrame = spark.read.json(rdd)
+      val changesDataFrame = spark.read.json(rdd.toDS())
       if (changesDataFrame.schema.nonEmpty) {
         changesDataFrame.printSchema()
 

http://git-wip-us.apache.org/repos/asf/bahir/blob/a5ea6790/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
----------------------------------------------------------------------
diff --git 
a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
 
b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
index 8c347bf..05eca9b 100644
--- 
a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
+++ 
b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
@@ -49,7 +49,7 @@ object CloudantStreamingSelector {
       // Get the singleton instance of SQLContext
 
       println(s"========= $time =========") // scalastyle:ignore
-      val changesDataFrame = spark.read.json(rdd)
+      val changesDataFrame = spark.read.json(rdd.toDS())
       if (changesDataFrame.schema.nonEmpty) {
         changesDataFrame.select("*").show()
         batchAmount = 
changesDataFrame.groupBy().sum("amount").collect()(0).getLong(0)

http://git-wip-us.apache.org/repos/asf/bahir/blob/a5ea6790/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala 
b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
index ee071d0..47643cc 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
@@ -58,7 +58,7 @@ case class CloudantReadWriteRelation (config: CloudantConfig,
 
       logger.info("buildScan:" + columns + "," + origFilters)
       val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
-      val df = sqlContext.read.json(cloudantRDD)
+      val df = sqlContext.read.json(cloudantRDD.toDS())
       if (colsLength > 1) {
         val colsExceptCol0 = for (i <- 1 until colsLength) yield 
requiredColumns(i)
         df.select(requiredColumns(0), colsExceptCol0: _*).rdd
@@ -115,13 +115,13 @@ class DefaultSource extends RelationProvider
           config.viewPath == null
           && config.indexPath == null) {
           val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
-          dataFrame = sqlContext.read.json(cloudantRDD)
+          dataFrame = sqlContext.read.json(cloudantRDD.toDS())
           dataFrame
         } else {
           val dataAccess = new JsonStoreDataAccess(config)
           val aRDD = sqlContext.sparkContext.parallelize(
             dataAccess.getMany(config.getSchemaSampleSize))
-          sqlContext.read.json(aRDD)
+          sqlContext.read.json(aRDD.toDS())
         }
         df.schema
       } else {
@@ -147,7 +147,7 @@ class DefaultSource extends RelationProvider
             globalRDD = rdd ++ globalRDD
           } else {
             // Convert final global RDD[String] to DataFrame
-            dataFrame = sqlContext.sparkSession.read.json(globalRDD)
+            dataFrame = sqlContext.sparkSession.read.json(globalRDD.toDS())
             ssc.stop(stopSparkContext = false, stopGracefully = false)
           }
         })

Reply via email to