Repository: bahir Updated Branches: refs/heads/master 0e1505a89 -> eae02f29e
[BAHIR-138] fix deprecated warnings in sql-cloudant Fix warnings in DefaultSource class, and in CloudantStreaming and CloudantStreamingSelector examples. How Imported spark.implicits._ to convert Spark RDD to Dataset Replaced deprecated json(RDD[String]) with json(Dataset[String]) Improved streaming examples: Replaced registerTempTable with preferred createOrReplaceTempView Replaced !isEmpty with nonEmpty Use an accessible sales database so users can run the example without any setup Fixed error message when stopping tests by adding logic to streaming receiver to not store documents in Spark memory when stream has stopped Closes #59 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/eae02f29 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/eae02f29 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/eae02f29 Branch: refs/heads/master Commit: eae02f29eb011f50bc313714e6cde62ce65804c4 Parents: 0e1505a Author: Esteban Laver <[email protected]> Authored: Mon Oct 2 16:18:40 2017 -0400 Committer: Luciano Resende <[email protected]> Committed: Wed Dec 20 08:49:02 2017 -0800 ---------------------------------------------------------------------- .../sql/cloudant/CloudantStreaming.scala | 73 ++++++++------------ .../cloudant/CloudantStreamingSelector.scala | 26 ++++--- .../bahir/cloudant/CloudantReceiver.scala | 4 +- 3 files changed, 47 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/eae02f29/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala index a1de696..df00756 100644 --- a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala +++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala @@ -14,86 +14,71 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.spark.examples.sql.cloudant import org.apache.spark.rdd.RDD -import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession -import org.apache.spark.streaming.{ Seconds, StreamingContext, Time } -import org.apache.spark.streaming.scheduler.{ StreamingListener, StreamingListenerReceiverError} +import org.apache.spark.streaming.{Seconds, StreamingContext, Time} import org.apache.bahir.cloudant.CloudantReceiver object CloudantStreaming { def main(args: Array[String]) { - val sparkConf = new SparkConf().setAppName("Cloudant Spark SQL External Datasource in Scala") + val spark = SparkSession.builder() + .appName("Cloudant Spark SQL External Datasource in Scala") + .master("local[*]") + .getOrCreate() + // Create the context with a 10 seconds batch size - val ssc = new StreamingContext(sparkConf, Seconds(10)) + val ssc = new StreamingContext(spark.sparkContext, Seconds(10)) + import spark.implicits._ - val changes = ssc.receiverStream(new CloudantReceiver(sparkConf, Map( - "cloudant.host" -> "ACCOUNT.cloudant.com", - "cloudant.username" -> "USERNAME", - "cloudant.password" -> "PASSWORD", - "database" -> "n_airportcodemapping"))) + val changes = ssc.receiverStream(new CloudantReceiver(spark.sparkContext.getConf, Map( + "cloudant.host" -> "examples.cloudant.com", + "database" -> "sales"))) changes.foreachRDD((rdd: RDD[String], time: Time) => { // Get the singleton instance of SparkSession - val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) + println(s"========= $time =========")// scalastyle:ignore - // Convert RDD[String] to DataFrame - val changesDataFrame = spark.read.json(rdd) - if (!changesDataFrame.schema.isEmpty) { + // Convert RDD[String] to Dataset[String] + val changesDataFrame = spark.read.json(rdd.toDS()) + if (changesDataFrame.schema.nonEmpty) { changesDataFrame.printSchema() - changesDataFrame.select("*").show() var hasDelRecord = false - var hasAirportNameField = false + var hasMonth = false for (field <- changesDataFrame.schema.fieldNames) { if ("_deleted".equals(field)) { hasDelRecord = true } - if ("airportName".equals(field)) { - hasAirportNameField = true + if ("month".equals(field)) { + hasMonth = true } } if (hasDelRecord) { changesDataFrame.filter(changesDataFrame("_deleted")).select("*").show() } - if (hasAirportNameField) { - changesDataFrame.filter(changesDataFrame("airportName") >= "Paris").select("*").show() - changesDataFrame.registerTempTable("airportcodemapping") - val airportCountsDataFrame = + if (hasMonth) { + changesDataFrame.filter(changesDataFrame("month") === "May").select("*").show(5) + changesDataFrame.createOrReplaceTempView("sales") + val salesInMayCountsDataFrame = spark.sql( - s""" - |select airportName, count(*) as total - |from airportcodemapping - |group by airportName + s""" + |select rep, amount + |from sales + |where month = "May" """.stripMargin) - airportCountsDataFrame.show() + salesInMayCountsDataFrame.show(5) } } }) ssc.start() - // run streaming for 120 secs - Thread.sleep(120000L) + // run streaming for 60 secs + Thread.sleep(60000L) ssc.stop(true) } } - -/** Lazily instantiated singleton instance of SparkSession */ -object SparkSessionSingleton { - @transient private var instance: SparkSession = _ - def getInstance(sparkConf: SparkConf): SparkSession = { - if (instance == null) { - instance = SparkSession - .builder - .config(sparkConf) - .getOrCreate() - } - instance - } -} http://git-wip-us.apache.org/repos/asf/bahir/blob/eae02f29/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala index 51d939a..05eca9b 100644 --- a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala +++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala @@ -20,7 +20,6 @@ package org.apache.spark.examples.sql.cloudant import java.util.concurrent.atomic.AtomicLong import org.apache.spark.rdd.RDD -import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{ Seconds, StreamingContext, Time } @@ -28,34 +27,39 @@ import org.apache.bahir.cloudant.CloudantReceiver object CloudantStreamingSelector { def main(args: Array[String]) { - val sparkConf = new SparkConf().setAppName("Cloudant Spark SQL External Datasource in Scala") + val spark = SparkSession.builder() + .appName("Cloudant Spark SQL External Datasource in Scala") + .master("local[*]") + .getOrCreate() + + import spark.implicits._ // Create the context with a 10 seconds batch size - val ssc = new StreamingContext(sparkConf, Seconds(10)) + val ssc = new StreamingContext(spark.sparkContext, Seconds(10)) val curTotalAmount = new AtomicLong(0) val curSalesCount = new AtomicLong(0) var batchAmount = 0L - val changes = ssc.receiverStream(new CloudantReceiver(sparkConf, Map( - "cloudant.host" -> "ACCOUNT.cloudant.com", - "cloudant.username" -> "USERNAME", - "cloudant.password" -> "PASSWORD", + val changes = ssc.receiverStream(new CloudantReceiver(spark.sparkContext.getConf, Map( + "cloudant.host" -> "examples.cloudant.com", "database" -> "sales", "selector" -> "{\"month\":\"May\", \"rep\":\"John\"}"))) changes.foreachRDD((rdd: RDD[String], time: Time) => { // Get the singleton instance of SQLContext - val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) + println(s"========= $time =========") // scalastyle:ignore - val changesDataFrame = spark.read.json(rdd) - if (!changesDataFrame.schema.isEmpty) { + val changesDataFrame = spark.read.json(rdd.toDS()) + if (changesDataFrame.schema.nonEmpty) { changesDataFrame.select("*").show() batchAmount = changesDataFrame.groupBy().sum("amount").collect()(0).getLong(0) curSalesCount.getAndAdd(changesDataFrame.count()) curTotalAmount.getAndAdd(batchAmount) println("Current sales count:" + curSalesCount)// scalastyle:ignore println("Current total amount:" + curTotalAmount)// scalastyle:ignore - } + } else { + ssc.stop() + } }) ssc.start() http://git-wip-us.apache.org/repos/asf/bahir/blob/eae02f29/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala index c6bae2e..60a7d4a 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala @@ -74,7 +74,9 @@ class CloudantReceiver(sparkConf: SparkConf, cloudantParams: Map[String, String] var doc = "" if(jsonDoc != null) { doc = Json.stringify(jsonDoc) - store(doc) + if(!isStopped() && doc.nonEmpty) { + store(doc) + } } } })
