Repository: bahir Updated Branches: refs/heads/master b36632a1a -> a5ea67906
Set Spark version to 2.2.1 preparing for release Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/a5ea6790 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/a5ea6790 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/a5ea6790 Branch: refs/heads/master Commit: a5ea6790620a2c1bad85561e4142efd05e9bfdce Parents: b36632a Author: Luciano Resende <lrese...@apache.org> Authored: Wed Jun 6 15:05:37 2018 +0200 Committer: Luciano Resende <lrese...@apache.org> Committed: Wed Jun 6 15:14:00 2018 +0200 ---------------------------------------------------------------------- pom.xml | 2 +- .../spark/examples/sql/cloudant/CloudantStreaming.scala | 2 +- .../examples/sql/cloudant/CloudantStreamingSelector.scala | 2 +- .../main/scala/org/apache/bahir/cloudant/DefaultSource.scala | 8 ++++---- 4 files changed, 7 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/a5ea6790/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index bb6e101..1989a77 100644 --- a/pom.xml +++ b/pom.xml @@ -99,7 +99,7 @@ <log4j.version>1.2.17</log4j.version> <!-- Spark version --> - <spark.version>2.1.2</spark.version> + <spark.version>2.2.1</spark.version> <!-- MQTT Client --> <mqtt.paho.client>1.1.0</mqtt.paho.client> http://git-wip-us.apache.org/repos/asf/bahir/blob/a5ea6790/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala index 4662b04..df00756 100644 --- a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala +++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala @@ -43,7 +43,7 @@ object CloudantStreaming { println(s"========= $time =========")// scalastyle:ignore // Convert RDD[String] to Dataset[String] - val changesDataFrame = spark.read.json(rdd) + val changesDataFrame = spark.read.json(rdd.toDS()) if (changesDataFrame.schema.nonEmpty) { changesDataFrame.printSchema() http://git-wip-us.apache.org/repos/asf/bahir/blob/a5ea6790/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala index 8c347bf..05eca9b 100644 --- a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala +++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala @@ -49,7 +49,7 @@ object CloudantStreamingSelector { // Get the singleton instance of SQLContext println(s"========= $time =========") // scalastyle:ignore - val changesDataFrame = spark.read.json(rdd) + val changesDataFrame = spark.read.json(rdd.toDS()) if (changesDataFrame.schema.nonEmpty) { changesDataFrame.select("*").show() batchAmount = changesDataFrame.groupBy().sum("amount").collect()(0).getLong(0) http://git-wip-us.apache.org/repos/asf/bahir/blob/a5ea6790/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala index ee071d0..47643cc 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala @@ -58,7 +58,7 @@ case class CloudantReadWriteRelation (config: CloudantConfig, logger.info("buildScan:" + columns + "," + origFilters) val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) - val df = sqlContext.read.json(cloudantRDD) + val df = sqlContext.read.json(cloudantRDD.toDS()) if (colsLength > 1) { val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i) df.select(requiredColumns(0), colsExceptCol0: _*).rdd @@ -115,13 +115,13 @@ class DefaultSource extends RelationProvider config.viewPath == null && config.indexPath == null) { val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) - dataFrame = sqlContext.read.json(cloudantRDD) + dataFrame = sqlContext.read.json(cloudantRDD.toDS()) dataFrame } else { val dataAccess = new JsonStoreDataAccess(config) val aRDD = sqlContext.sparkContext.parallelize( dataAccess.getMany(config.getSchemaSampleSize)) - sqlContext.read.json(aRDD) + sqlContext.read.json(aRDD.toDS()) } df.schema } else { @@ -147,7 +147,7 @@ class DefaultSource extends RelationProvider globalRDD = rdd ++ globalRDD } else { // Convert final global RDD[String] to DataFrame - dataFrame = sqlContext.sparkSession.read.json(globalRDD) + dataFrame = sqlContext.sparkSession.read.json(globalRDD.toDS()) ssc.stop(stopSparkContext = false, stopGracefully = false) } })