Update website with Bahir Extensions for Spark 2.2.0 release
Project: http://git-wip-us.apache.org/repos/asf/bahir-website/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir-website/commit/feca5587 Tree: http://git-wip-us.apache.org/repos/asf/bahir-website/tree/feca5587 Diff: http://git-wip-us.apache.org/repos/asf/bahir-website/diff/feca5587 Branch: refs/heads/master Commit: feca558749d7db51c640adc75321a39a1dd745c7 Parents: a31e910 Author: Luciano Resende <[email protected]> Authored: Wed Sep 6 16:54:50 2017 -0700 Committer: Luciano Resende <[email protected]> Committed: Wed Sep 6 16:54:50 2017 -0700 ---------------------------------------------------------------------- site/_data/project.yml | 4 +- site/docs/spark/2.0.0/documentation.md | 2 +- site/docs/spark/2.0.1/documentation.md | 2 +- site/docs/spark/2.0.2/documentation.md | 2 +- site/docs/spark/2.1.0/documentation.md | 2 +- site/docs/spark/2.1.1/documentation.md | 2 +- site/docs/spark/2.2.0/documentation.md | 58 ++++ site/docs/spark/2.2.0/spark-sql-cloudant.md | 316 +++++++++++++++++++ .../spark/2.2.0/spark-sql-streaming-akka.md | 137 ++++++++ .../spark/2.2.0/spark-sql-streaming-mqtt.md | 146 +++++++++ site/docs/spark/2.2.0/spark-streaming-akka.md | 89 ++++++ site/docs/spark/2.2.0/spark-streaming-mqtt.md | 98 ++++++ site/docs/spark/2.2.0/spark-streaming-pubsub.md | 71 +++++ .../docs/spark/2.2.0/spark-streaming-twitter.md | 74 +++++ site/docs/spark/2.2.0/spark-streaming-zeromq.md | 65 ++++ site/docs/spark/overview.md | 3 +- 16 files changed, 1063 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir-website/blob/feca5587/site/_data/project.yml ---------------------------------------------------------------------- diff --git a/site/_data/project.yml b/site/_data/project.yml index c3e3c8f..47ecbfc 100644 --- a/site/_data/project.yml +++ b/site/_data/project.yml @@ -58,8 +58,8 @@ podling: false spark_unix_name: bahir spark_github_project_name: bahir -spark_latest_release: 2.1.1 -spark_latest_release_date: 07/11/2017 +spark_latest_release: 2.2.0 +spark_latest_release_date: 08/22/2017 spark_latest_release_location: http://www.apache.org/dist/bahir/bahir-spark spark_latest_release_location_mirror: http://www.apache.org/dyn/closer.lua/bahir/bahir-spark spark_download: /downloads/spark http://git-wip-us.apache.org/repos/asf/bahir-website/blob/feca5587/site/docs/spark/2.0.0/documentation.md ---------------------------------------------------------------------- diff --git a/site/docs/spark/2.0.0/documentation.md b/site/docs/spark/2.0.0/documentation.md index 38148e9..6c544b2 100644 --- a/site/docs/spark/2.0.0/documentation.md +++ b/site/docs/spark/2.0.0/documentation.md @@ -25,7 +25,7 @@ limitations under the License. {% include JB/setup %} -### Apache Bahir Extensions for Apache Spark +### Apache Bahir Extensions for Apache Spark 2.0.0 <br/> http://git-wip-us.apache.org/repos/asf/bahir-website/blob/feca5587/site/docs/spark/2.0.1/documentation.md ---------------------------------------------------------------------- diff --git a/site/docs/spark/2.0.1/documentation.md b/site/docs/spark/2.0.1/documentation.md index 38148e9..597b726 100644 --- a/site/docs/spark/2.0.1/documentation.md +++ b/site/docs/spark/2.0.1/documentation.md @@ -25,7 +25,7 @@ limitations under the License. {% include JB/setup %} -### Apache Bahir Extensions for Apache Spark +### Apache Bahir Extensions for Apache Spark 2.0.1 <br/> http://git-wip-us.apache.org/repos/asf/bahir-website/blob/feca5587/site/docs/spark/2.0.2/documentation.md ---------------------------------------------------------------------- diff --git a/site/docs/spark/2.0.2/documentation.md b/site/docs/spark/2.0.2/documentation.md index 38148e9..1714900 100644 --- a/site/docs/spark/2.0.2/documentation.md +++ b/site/docs/spark/2.0.2/documentation.md @@ -25,7 +25,7 @@ limitations under the License. {% include JB/setup %} -### Apache Bahir Extensions for Apache Spark +### Apache Bahir Extensions for Apache Spark 2.0.2 <br/> http://git-wip-us.apache.org/repos/asf/bahir-website/blob/feca5587/site/docs/spark/2.1.0/documentation.md ---------------------------------------------------------------------- diff --git a/site/docs/spark/2.1.0/documentation.md b/site/docs/spark/2.1.0/documentation.md index 38148e9..0b8b9d7 100644 --- a/site/docs/spark/2.1.0/documentation.md +++ b/site/docs/spark/2.1.0/documentation.md @@ -25,7 +25,7 @@ limitations under the License. {% include JB/setup %} -### Apache Bahir Extensions for Apache Spark +### Apache Bahir Extensions for Apache Spark 2.1.0 <br/> http://git-wip-us.apache.org/repos/asf/bahir-website/blob/feca5587/site/docs/spark/2.1.1/documentation.md ---------------------------------------------------------------------- diff --git a/site/docs/spark/2.1.1/documentation.md b/site/docs/spark/2.1.1/documentation.md index 7eb03c6..d122652 100644 --- a/site/docs/spark/2.1.1/documentation.md +++ b/site/docs/spark/2.1.1/documentation.md @@ -25,7 +25,7 @@ limitations under the License. {% include JB/setup %} -### Apache Bahir Extensions for Apache Spark +### Apache Bahir Extensions for Apache Spark 2.1.1 <br/> http://git-wip-us.apache.org/repos/asf/bahir-website/blob/feca5587/site/docs/spark/2.2.0/documentation.md ---------------------------------------------------------------------- diff --git a/site/docs/spark/2.2.0/documentation.md b/site/docs/spark/2.2.0/documentation.md new file mode 100644 index 0000000..22130db --- /dev/null +++ b/site/docs/spark/2.2.0/documentation.md @@ -0,0 +1,58 @@ +--- +layout: page +title: Extensions for Apache Spark +description: Extensions for Apache Spark +group: nav-right +--- +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +{% include JB/setup %} + +### Apache Bahir Extensions for Apache Spark 2.2.0 + +<br/> + +#### SQL Data Sources + +[Apache CouchDB/Cloudant data source](../spark-sql-cloudant) {:height="36px" width="36px"} + +<br/> + +#### Structured Streaming Data Sources + +[Akka data source](../spark-sql-streaming-akka) {:height="36px" width="36px"} + +[MQTT data source](../spark-sql-streaming-mqtt) + +<br/> + +#### Discretized Streams (DStreams) Connectors + +[Apache CouchDB/Cloudant connector](../spark-sql-cloudant) {:height="36px" width="36px"} + +[Akka connector](../spark-streaming-akka) + +[Google Cloud Pub/Sub connector](../spark-streaming-pubsub) {:height="36px" width="36px"} + +[MQTT connector](../spark-streaming-mqtt) + +[Twitter connector](../spark-streaming-twitter) + +[ZeroMQ connector](../spark-streaming-zeromq) http://git-wip-us.apache.org/repos/asf/bahir-website/blob/feca5587/site/docs/spark/2.2.0/spark-sql-cloudant.md ---------------------------------------------------------------------- diff --git a/site/docs/spark/2.2.0/spark-sql-cloudant.md b/site/docs/spark/2.2.0/spark-sql-cloudant.md new file mode 100644 index 0000000..984eb2b --- /dev/null +++ b/site/docs/spark/2.2.0/spark-sql-cloudant.md @@ -0,0 +1,316 @@ +--- +layout: page +title: Spark Data Source for Apache CouchDB/Cloudant +description: Spark Data Source for Apache CouchDB/Cloudant +group: nav-right +--- +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +{% include JB/setup %} +A library for reading data from Cloudant or CouchDB databases using Spark SQL and Spark Streaming. + +[IBM® Cloudant®](https://cloudant.com) is a document-oriented DataBase as a Service (DBaaS). It stores data as documents +in JSON format. It's built with scalability, high availability, and durability in mind. It comes with a +wide variety of indexing options including map-reduce, Cloudant Query, full-text indexing, and +geospatial indexing. The replication capabilities make it easy to keep data in sync between database +clusters, desktop PCs, and mobile devices. + +[Apache CouchDBâ¢](http://couchdb.apache.org) is open source database software that focuses on ease of use and having an architecture that "completely embraces the Web". It has a document-oriented NoSQL database architecture and is implemented in the concurrency-oriented language Erlang; it uses JSON to store data, JavaScript as its query language using MapReduce, and HTTP for an API. + +## Linking + +Using SBT: + + libraryDependencies += "org.apache.bahir" %% "spark-sql-cloudant" % "2.2.0" + +Using Maven: + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>spark-sql-cloudant_2.11</artifactId> + <version>2.2.0</version> + </dependency> + +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option. + + $ bin/spark-shell --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0 + +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. +The `--packages` argument can also be used with `bin/spark-submit`. + +Submit a job in Python: + + spark-submit --master local[4] --jars <path to cloudant-spark.jar> <path to python script> + +Submit a job in Scala: + + spark-submit --class "<your class>" --master local[4] --jars <path to cloudant-spark.jar> <path to your app jar> + +This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards. + + +## Configuration options +The configuration is obtained in the following sequence: + +1. default in the Config, which is set in the application.conf +2. key in the SparkConf, which is set in SparkConf +3. key in the parameters, which is set in a dataframe or temporaty table options +4. "spark."+key in the SparkConf (as they are treated as the one passed in through spark-submit using --conf option) + +Here each subsequent configuration overrides the previous one. Thus, configuration set using DataFrame option overrides what has beens set in SparkConf. And configuration passed in spark-submit using --conf takes precedence over any setting in the code. + + +### Configuration in application.conf +Default values are defined in [here](cloudant-spark-sql/src/main/resources/application.conf). + +### Configuration on SparkConf + +Name | Default | Meaning +--- |:---:| --- +cloudant.protocol|https|protocol to use to transfer data: http or https +cloudant.host||cloudant host url +cloudant.username||cloudant userid +cloudant.password||cloudant password +cloudant.useQuery|false|By default, _all_docs endpoint is used if configuration 'view' and 'index' (see below) are not set. When useQuery is enabled, _find endpoint will be used in place of _all_docs when query condition is not on primary key field (_id), so that query predicates may be driven into datastore. +cloudant.queryLimit|25|The maximum number of results returned when querying the _find endpoint. +jsonstore.rdd.partitions|10|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition +jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited +jsonstore.rdd.minInPartition|10|the min rows in a partition. +jsonstore.rdd.requestTimeout|900000| the request timeout in milliseconds +bulkSize|200| the bulk save size +schemaSampleSize| "-1" | the sample size for RDD schema discovery. 1 means we are using only first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs +createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. + +### Configuration on Spark SQL Temporary Table or DataFrame + +Besides all the configurations passed to a temporary table or dataframe through SparkConf, it is also possible to set the following configurations in temporary table or dataframe using OPTIONS: + +Name | Default | Meaning +--- |:---:| --- +database||cloudant database name +view||cloudant view w/o the database name. only used for load. +index||cloudant search index w/o the database name. only used for load data with less than or equal to 200 results. +path||cloudant: as database name if database is not present +schemaSampleSize|"-1"| the sample size used to discover the schema for this temp table. -1 scans all documents +bulkSize|200| the bulk save size +createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. + +For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: `{id, key, value}`, where `value `can be a compount field. An example of loading data from a view: + +```python +spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', view '_design/view/_view/AA0')") + +``` + +### Configuration on Cloudant Receiver for Spark Streaming + +Name | Default | Meaning +--- |:---:| --- +cloudant.host||cloudant host url +cloudant.username||cloudant userid +cloudant.password||cloudant password +database||cloudant database name +selector| all documents| a selector written in Cloudant Query syntax, specifying conditions for selecting documents. Only documents satisfying the selector's conditions will be retrieved from Cloudant and loaded into Spark. + + +### Configuration in spark-submit using --conf option + +The above stated configuration keys can also be set using `spark-submit --conf` option. When passing configuration in spark-submit, make sure adding "spark." as prefix to the keys. + + +## Examples + +### Python API + +#### Using SQL In Python + +```python +spark = SparkSession\ + .builder\ + .appName("Cloudant Spark SQL Example in Python using temp tables")\ + .config("cloudant.host","ACCOUNT.cloudant.com")\ + .config("cloudant.username", "USERNAME")\ + .config("cloudant.password","PASSWORD")\ + .getOrCreate() + + +# Loading temp table from Cloudant db +spark.sql(" CREATE TEMPORARY TABLE airportTable USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')") +airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id") +airportData.printSchema() +print 'Total # of rows in airportData: ' + str(airportData.count()) +for code in airportData.collect(): + print code._id +``` + +See [CloudantApp.py](examples/python/CloudantApp.py) for examples. + +Submit job example: +``` +spark-submit --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0 --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD sql-cloudant/examples/python/CloudantApp.py +``` + +#### Using DataFrame In Python + +```python +spark = SparkSession\ + .builder\ + .appName("Cloudant Spark SQL Example in Python using dataframes")\ + .config("cloudant.host","ACCOUNT.cloudant.com")\ + .config("cloudant.username", "USERNAME")\ + .config("cloudant.password","PASSWORD")\ + .config("jsonstore.rdd.partitions", 8)\ + .getOrCreate() + +# ***1. Loading dataframe from Cloudant db +df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant") +df.cache() +df.printSchema() +df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show() +df.filter(df._id >= 'CAA').select("_id",'airportName').show() +``` + +See [CloudantDF.py](examples/python/CloudantDF.py) for examples. + +In case of doing multiple operations on a dataframe (select, filter etc.), +you should persist a dataframe. Otherwise, every operation on a dataframe will load the same data from Cloudant again. +Persisting will also speed up computation. This statement will persist an RDD in memory: `df.cache()`. Alternatively for large dbs to persist in memory & disk, use: + +```python +from pyspark import StorageLevel +df.persist(storageLevel = StorageLevel(True, True, False, True, 1)) +``` + +[Sample code](examples/python/CloudantDFOption.py) on using DataFrame option to define cloudant configuration + +### Scala API + +#### Using SQL In Scala + +```scala +val spark = SparkSession + .builder() + .appName("Cloudant Spark SQL Example") + .config("cloudant.host","ACCOUNT.cloudant.com") + .config("cloudant.username", "USERNAME") + .config("cloudant.password","PASSWORD") + .getOrCreate() + +// For implicit conversions of Dataframe to RDDs +import spark.implicits._ + +// create a temp table from Cloudant db and query it using sql syntax +spark.sql( + s""" + |CREATE TEMPORARY TABLE airportTable + |USING org.apache.bahir.cloudant + |OPTIONS ( database 'n_airportcodemapping') + """.stripMargin) +// create a dataframe +val airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id") +airportData.printSchema() +println(s"Total # of rows in airportData: " + airportData.count()) +// convert dataframe to array of Rows, and process each row +airportData.map(t => "code: " + t(0) + ",name:" + t(1)).collect().foreach(println) +``` +See [CloudantApp.scala](examples/scala/src/main/scala/mytest/spark/CloudantApp.scala) for examples. + +Submit job example: +``` +spark-submit --class org.apache.spark.examples.sql.cloudant.CloudantApp --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0 --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD /path/to/spark-sql-cloudant_2.11-2.2.0-tests.jar +``` + +### Using DataFrame In Scala + +```scala +val spark = SparkSession + .builder() + .appName("Cloudant Spark SQL Example with Dataframe") + .config("cloudant.host","ACCOUNT.cloudant.com") + .config("cloudant.username", "USERNAME") + .config("cloudant.password","PASSWORD") + .config("createDBOnSave","true") // to create a db on save + .config("jsonstore.rdd.partitions", "20") // using 20 partitions + .getOrCreate() + +// 1. Loading data from Cloudant db +val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight") +// Caching df in memory to speed computations +// and not to retrieve data from cloudant again +df.cache() +df.printSchema() + +// 2. Saving dataframe to Cloudant db +val df2 = df.filter(df("flightSegmentId") === "AA106") + .select("flightSegmentId","economyClassBaseCost") +df2.show() +df2.write.format("org.apache.bahir.cloudant").save("n_flight2") +``` + +See [CloudantDF.scala](examples/scala/src/main/scala/mytest/spark/CloudantDF.scala) for examples. + +[Sample code](examples/scala/src/main/scala/mytest/spark/CloudantDFOption.scala) on using DataFrame option to define Cloudant configuration. + + +### Using Streams In Scala + +```scala +val ssc = new StreamingContext(sparkConf, Seconds(10)) +val changes = ssc.receiverStream(new CloudantReceiver(Map( + "cloudant.host" -> "ACCOUNT.cloudant.com", + "cloudant.username" -> "USERNAME", + "cloudant.password" -> "PASSWORD", + "database" -> "n_airportcodemapping"))) + +changes.foreachRDD((rdd: RDD[String], time: Time) => { + // Get the singleton instance of SparkSession + val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) + + println(s"========= $time =========") + // Convert RDD[String] to DataFrame + val changesDataFrame = spark.read.json(rdd) + if (!changesDataFrame.schema.isEmpty) { + changesDataFrame.printSchema() + changesDataFrame.select("*").show() + .... + } +}) +ssc.start() +// run streaming for 120 secs +Thread.sleep(120000L) +ssc.stop(true) + +``` + +See [CloudantStreaming.scala](examples/scala/src/main/scala/mytest/spark/CloudantStreaming.scala) for examples. + +By default, Spark Streaming will load all documents from a database. If you want to limit the loading to +specific documents, use `selector` option of `CloudantReceiver` and specify your conditions +(See [CloudantStreamingSelector.scala](examples/scala/src/main/scala/mytest/spark/CloudantStreamingSelector.scala) +example for more details): + +```scala +val changes = ssc.receiverStream(new CloudantReceiver(Map( + "cloudant.host" -> "ACCOUNT.cloudant.com", + "cloudant.username" -> "USERNAME", + "cloudant.password" -> "PASSWORD", + "database" -> "sales", + "selector" -> "{\"month\":\"May\", \"rep\":\"John\"}"))) +``` http://git-wip-us.apache.org/repos/asf/bahir-website/blob/feca5587/site/docs/spark/2.2.0/spark-sql-streaming-akka.md ---------------------------------------------------------------------- diff --git a/site/docs/spark/2.2.0/spark-sql-streaming-akka.md b/site/docs/spark/2.2.0/spark-sql-streaming-akka.md new file mode 100644 index 0000000..945ee71 --- /dev/null +++ b/site/docs/spark/2.2.0/spark-sql-streaming-akka.md @@ -0,0 +1,137 @@ +--- +layout: page +title: Spark Structured Streaming Akka +description: Spark Structured Streaming Akka +group: nav-right +--- +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +{% include JB/setup %} +A library for reading data from Akka Actors using Spark SQL Streaming ( or Structured streaming.). + +## Linking + +Using SBT: + + libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-akka" % "2.2.0" + +Using Maven: + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>spark-sql-streaming-akka_2.11</artifactId> + <version>2.2.0</version> + </dependency> + +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option. +For example, to include it when starting the spark shell: + + $ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-akka_2.11:2.2.0 + +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. +The `--packages` argument can also be used with `bin/spark-submit`. + +This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards. + +## Examples + +A SQL Stream can be created with data streams received from Akka Feeder actor using, + + sqlContext.readStream + .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider") + .option("urlOfPublisher", "feederActorUri") + .load() + +## Enable recovering from failures. + +Setting values for option `persistenceDirPath` helps in recovering in case of a restart, by restoring the state where it left off before the shutdown. + + sqlContext.readStream + .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider") + .option("urlOfPublisher", "feederActorUri") + .option("persistenceDirPath", "/path/to/localdir") + .load() + +## Configuration options. + +This source uses [Akka Actor api](http://doc.akka.io/api/akka/2.4/akka/actor/Actor.html). + +* `urlOfPublisher` The url of Publisher or Feeder actor that the Receiver actor connects to. Set this as the tcp url of the Publisher or Feeder actor. +* `persistenceDirPath` By default it is used for storing incoming messages on disk. + +### Scala API + +An example, for scala API to count words from incoming message stream. + + // Create DataFrame representing the stream of input lines from connection + // to publisher or feeder actor + val lines = spark.readStream + .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider") + .option("urlOfPublisher", urlOfPublisher) + .load().as[(String, Timestamp)] + + // Split the lines into words + val words = lines.map(_._1).flatMap(_.split(" ")) + + // Generate running word count + val wordCounts = words.groupBy("value").count() + + // Start running the query that prints the running counts to the console + val query = wordCounts.writeStream + .outputMode("complete") + .format("console") + .start() + + query.awaitTermination() + +Please see `AkkaStreamWordCount.scala` for full example. + +### Java API + +An example, for Java API to count words from incoming message stream. + + // Create DataFrame representing the stream of input lines from connection + // to publisher or feeder actor + Dataset<String> lines = spark + .readStream() + .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider") + .option("urlOfPublisher", urlOfPublisher) + .load().select("value").as(Encoders.STRING()); + + // Split the lines into words + Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterator<String> call(String s) throws Exception { + return Arrays.asList(s.split(" ")).iterator(); + } + }, Encoders.STRING()); + + // Generate running word count + Dataset<Row> wordCounts = words.groupBy("value").count(); + + // Start running the query that prints the running counts to the console + StreamingQuery query = wordCounts.writeStream() + .outputMode("complete") + .format("console") + .start(); + + query.awaitTermination(); + +Please see `JavaAkkaStreamWordCount.java` for full example. http://git-wip-us.apache.org/repos/asf/bahir-website/blob/feca5587/site/docs/spark/2.2.0/spark-sql-streaming-mqtt.md ---------------------------------------------------------------------- diff --git a/site/docs/spark/2.2.0/spark-sql-streaming-mqtt.md b/site/docs/spark/2.2.0/spark-sql-streaming-mqtt.md new file mode 100644 index 0000000..86fbe42 --- /dev/null +++ b/site/docs/spark/2.2.0/spark-sql-streaming-mqtt.md @@ -0,0 +1,146 @@ +--- +layout: page +title: Spark Structured Streaming MQTT +description: Spark Structured Streaming MQTT +group: nav-right +--- +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +{% include JB/setup %} + +A library for reading data from MQTT Servers using Spark SQL Streaming ( or Structured streaming.). + +## Linking + +Using SBT: + + libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.2.0" + +Using Maven: + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>spark-sql-streaming-mqtt_2.11</artifactId> + <version>2.2.0</version> + </dependency> + +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option. +For example, to include it when starting the spark shell: + + $ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.2.0 + +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. +The `--packages` argument can also be used with `bin/spark-submit`. + +This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards. + +## Examples + +A SQL Stream can be created with data streams received through MQTT Server using, + + sqlContext.readStream + .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") + .option("topic", "mytopic") + .load("tcp://localhost:1883") + +## Enable recovering from failures. + +Setting values for option `localStorage` and `clientId` helps in recovering in case of a restart, by restoring the state where it left off before the shutdown. + + sqlContext.readStream + .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") + .option("topic", "mytopic") + .option("localStorage", "/path/to/localdir") + .option("clientId", "some-client-id") + .load("tcp://localhost:1883") + +## Configuration options. + +This source uses [Eclipse Paho Java Client](https://eclipse.org/paho/clients/java/). Client API documentation is located [here](http://www.eclipse.org/paho/files/javadoc/index.html). + + * `brokerUrl` A url MqttClient connects to. Set this or `path` as the url of the Mqtt Server. e.g. tcp://localhost:1883. + * `persistence` By default it is used for storing incoming messages on disk. If `memory` is provided as value for this option, then recovery on restart is not supported. + * `topic` Topic MqttClient subscribes to. + * `clientId` clientId, this client is assoicated with. Provide the same value to recover a stopped client. + * `QoS` The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe. + * `username` Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors. + * `password` Sets the password to use for the connection. + * `cleanSession` Setting it true starts a clean session, removes all checkpointed messages by a previous run of this source. This is set to false by default. + * `connectionTimeout` Sets the connection timeout, a value of 0 is interpretted as wait until client connects. See `MqttConnectOptions.setConnectionTimeout` for more information. + * `keepAlive` Same as `MqttConnectOptions.setKeepAliveInterval`. + * `mqttVersion` Same as `MqttConnectOptions.setMqttVersion`. + +### Scala API + +An example, for scala API to count words from incoming message stream. + + // Create DataFrame representing the stream of input lines from connection to mqtt server + val lines = spark.readStream + .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") + .option("topic", topic) + .load(brokerUrl).as[(String, Timestamp)] + + // Split the lines into words + val words = lines.map(_._1).flatMap(_.split(" ")) + + // Generate running word count + val wordCounts = words.groupBy("value").count() + + // Start running the query that prints the running counts to the console + val query = wordCounts.writeStream + .outputMode("complete") + .format("console") + .start() + + query.awaitTermination() + +Please see `MQTTStreamWordCount.scala` for full example. + +### Java API + +An example, for Java API to count words from incoming message stream. + + // Create DataFrame representing the stream of input lines from connection to mqtt server. + Dataset<String> lines = spark + .readStream() + .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") + .option("topic", topic) + .load(brokerUrl).select("value").as(Encoders.STRING()); + + // Split the lines into words + Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterator<String> call(String x) { + return Arrays.asList(x.split(" ")).iterator(); + } + }, Encoders.STRING()); + + // Generate running word count + Dataset<Row> wordCounts = words.groupBy("value").count(); + + // Start running the query that prints the running counts to the console + StreamingQuery query = wordCounts.writeStream() + .outputMode("complete") + .format("console") + .start(); + + query.awaitTermination(); + +Please see `JavaMQTTStreamWordCount.java` for full example. http://git-wip-us.apache.org/repos/asf/bahir-website/blob/feca5587/site/docs/spark/2.2.0/spark-streaming-akka.md ---------------------------------------------------------------------- diff --git a/site/docs/spark/2.2.0/spark-streaming-akka.md b/site/docs/spark/2.2.0/spark-streaming-akka.md new file mode 100644 index 0000000..da51d5b --- /dev/null +++ b/site/docs/spark/2.2.0/spark-streaming-akka.md @@ -0,0 +1,89 @@ +--- +layout: page +title: Spark Streaming Akka +description: Spark Streaming Akka +group: nav-right +--- +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +{% include JB/setup %} + +A library for reading data from Akka Actors using Spark Streaming. + +## Linking + +Using SBT: + + libraryDependencies += "org.apache.bahir" %% "spark-streaming-akka" % "2.2.0" + +Using Maven: + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>spark-streaming-akka_2.11</artifactId> + <version>2.2.0</version> + </dependency> + +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option. +For example, to include it when starting the spark shell: + + $ bin/spark-shell --packages org.apache.bahir:spark-streaming-akka_2.11:2.2.0 + +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. +The `--packages` argument can also be used with `bin/spark-submit`. + +This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above. + +## Examples + +DStreams can be created with data streams received through Akka actors by using `AkkaUtils.createStream(ssc, actorProps, actor-name)`. + +### Scala API + +You need to extend `ActorReceiver` so as to store received data into Spark using `store(...)` methods. The supervisor strategy of +this actor can be configured to handle failures, etc. + + class CustomActor extends ActorReceiver { + def receive = { + case data: String => store(data) + } + } + + // A new input stream can be created with this custom actor as + val ssc: StreamingContext = ... + val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver") + +### Java API + +You need to extend `JavaActorReceiver` so as to store received data into Spark using `store(...)` methods. The supervisor strategy of +this actor can be configured to handle failures, etc. + + class CustomActor extends JavaActorReceiver { + @Override + public void onReceive(Object msg) throws Exception { + store((String) msg); + } + } + + // A new input stream can be created with this custom actor as + JavaStreamingContext jssc = ...; + JavaDStream<String> lines = AkkaUtils.<String>createStream(jssc, Props.create(CustomActor.class), "CustomReceiver"); + +See end-to-end examples at [Akka Examples](https://github.com/apache/bahir/tree/master/streaming-akka/examples) http://git-wip-us.apache.org/repos/asf/bahir-website/blob/feca5587/site/docs/spark/2.2.0/spark-streaming-mqtt.md ---------------------------------------------------------------------- diff --git a/site/docs/spark/2.2.0/spark-streaming-mqtt.md b/site/docs/spark/2.2.0/spark-streaming-mqtt.md new file mode 100644 index 0000000..9e9e2d2 --- /dev/null +++ b/site/docs/spark/2.2.0/spark-streaming-mqtt.md @@ -0,0 +1,98 @@ +--- +layout: page +title: Spark Structured Streaming MQTT +description: Spark Structured Streaming MQTT +group: nav-right +--- +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +{% include JB/setup %} + + +[MQTT](http://mqtt.org/) is MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium. + +## Linking + +Using SBT: + + libraryDependencies += "org.apache.bahir" %% "spark-streaming-mqtt" % "2.2.0" + +Using Maven: + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>spark-streaming-mqtt_2.11</artifactId> + <version>2.2.0</version> + </dependency> + +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option. +For example, to include it when starting the spark shell: + + $ bin/spark-shell --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.2.0 + +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. +The `--packages` argument can also be used with `bin/spark-submit`. + +This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above. + +## Configuration options. + +This source uses the [Eclipse Paho Java Client](https://eclipse.org/paho/clients/java/). Client API documentation is located [here](http://www.eclipse.org/paho/files/javadoc/index.html). + + * `brokerUrl` A url MqttClient connects to. Set this as the url of the Mqtt Server. e.g. tcp://localhost:1883. + * `storageLevel` By default it is used for storing incoming messages on disk. + * `topic` Topic MqttClient subscribes to. + * `topics` List of topics MqttClient subscribes to. + * `clientId` clientId, this client is assoicated with. Provide the same value to recover a stopped client. + * `QoS` The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe. + * `username` Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors. + * `password` Sets the password to use for the connection. + * `cleanSession` Setting it true starts a clean session, removes all checkpointed messages by a previous run of this source. This is set to false by default. + * `connectionTimeout` Sets the connection timeout, a value of 0 is interpreted as wait until client connects. See `MqttConnectOptions.setConnectionTimeout` for more information. + * `keepAlive` Same as `MqttConnectOptions.setKeepAliveInterval`. + * `mqttVersion` Same as `MqttConnectOptions.setMqttVersion`. + + +## Examples + +### Scala API + +You need to extend `ActorReceiver` so as to store received data into Spark using `store(...)` methods. The supervisor strategy of +this actor can be configured to handle failures, etc. + + val lines = MQTTUtils.createStream(ssc, brokerUrl, topic) + val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topic) + +Additional mqtt connection options can be provided: + +```Scala +val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion) +val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion) +``` + +### Java API + +You need to extend `JavaActorReceiver` so as to store received data into Spark using `store(...)` methods. The supervisor strategy of +this actor can be configured to handle failures, etc. + + JavaDStream<String> lines = MQTTUtils.createStream(jssc, brokerUrl, topic); + JavaReceiverInputDStream<Tuple2<String, String>> lines = MQTTUtils.createPairedStream(jssc, brokerUrl, topics); + +See end-to-end examples at [MQTT Examples](https://github.com/apache/bahir/tree/master/streaming-mqtt/examples) http://git-wip-us.apache.org/repos/asf/bahir-website/blob/feca5587/site/docs/spark/2.2.0/spark-streaming-pubsub.md ---------------------------------------------------------------------- diff --git a/site/docs/spark/2.2.0/spark-streaming-pubsub.md b/site/docs/spark/2.2.0/spark-streaming-pubsub.md new file mode 100644 index 0000000..d4356d6 --- /dev/null +++ b/site/docs/spark/2.2.0/spark-streaming-pubsub.md @@ -0,0 +1,71 @@ +--- +layout: page +title: Spark Streaming Google Pub-Sub +description: Spark Streaming Google Pub-Sub +group: nav-right +--- +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +{% include JB/setup %} +A library for reading data from [Google Cloud Pub/Sub](https://cloud.google.com/pubsub/) using Spark Streaming. + +## Linking + +Using SBT: + + libraryDependencies += "org.apache.bahir" %% "spark-streaming-pubsub" % "2.2.0" + +Using Maven: + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>spark-streaming-pubsub_2.11</artifactId> + <version>2.2.0</version> + </dependency> + +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option. +For example, to include it when starting the spark shell: + + $ bin/spark-shell --packages org.apache.bahir:spark-streaming-pubsub_2.11:2.2.0 + +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. +The `--packages` argument can also be used with `bin/spark-submit`. + +## Examples + +First you need to create credential by SparkGCPCredentials, it support four type of credentials +* application default + `SparkGCPCredentials.builder.build()` +* json type service account + `SparkGCPCredentials.builder.jsonServiceAccount(PATH_TO_JSON_KEY).build()` +* p12 type service account + `SparkGCPCredentials.builder.p12ServiceAccount(PATH_TO_P12_KEY, EMAIL_ACCOUNT).build()` +* metadata service account(running on dataproc) + `SparkGCPCredentials.builder.metadataServiceAccount().build()` + +### Scala API + + val lines = PubsubUtils.createStream(ssc, projectId, subscriptionName, credential, ..) + +### Java API + + JavaDStream<SparkPubsubMessage> lines = PubsubUtils.createStream(jssc, projectId, subscriptionName, credential...) + +See end-to-end examples at [Google Cloud Pubsub Examples](streaming-pubsub/examples) http://git-wip-us.apache.org/repos/asf/bahir-website/blob/feca5587/site/docs/spark/2.2.0/spark-streaming-twitter.md ---------------------------------------------------------------------- diff --git a/site/docs/spark/2.2.0/spark-streaming-twitter.md b/site/docs/spark/2.2.0/spark-streaming-twitter.md new file mode 100644 index 0000000..3bf44af --- /dev/null +++ b/site/docs/spark/2.2.0/spark-streaming-twitter.md @@ -0,0 +1,74 @@ +--- +layout: page +title: Spark Streaming Twitter +description: Spark Streaming Twitter +group: nav-right +--- +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +{% include JB/setup %} + +A library for reading social data from [twitter](http://twitter.com/) using Spark Streaming. + +## Linking + +Using SBT: + + libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.2.0" + +Using Maven: + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>spark-streaming-twitter_2.11</artifactId> + <version>2.2.0</version> + </dependency> + +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option. +For example, to include it when starting the spark shell: + + $ bin/spark-shell --packages org.apache.bahir:spark-streaming-twitter_2.11:2.2.0 + +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. +The `--packages` argument can also be used with `bin/spark-submit`. + +This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above. + + +## Examples + +`TwitterUtils` uses Twitter4j to get the public stream of tweets using [Twitter's Streaming API](https://dev.twitter.com/docs/streaming-apis). Authentication information +can be provided by any of the [methods](http://twitter4j.org/en/configuration.html) supported by Twitter4J library. You can import the `TwitterUtils` class and create a DStream with `TwitterUtils.createStream` as shown below. + +### Scala API + + import org.apache.spark.streaming.twitter._ + + TwitterUtils.createStream(ssc, None) + +### Java API + + import org.apache.spark.streaming.twitter.*; + + TwitterUtils.createStream(jssc); + + +You can also either get the public stream, or get the filtered stream based on keywords. +See end-to-end examples at [Twitter Examples](https://github.com/apache/bahir/tree/master/streaming-twitter/examples) http://git-wip-us.apache.org/repos/asf/bahir-website/blob/feca5587/site/docs/spark/2.2.0/spark-streaming-zeromq.md ---------------------------------------------------------------------- diff --git a/site/docs/spark/2.2.0/spark-streaming-zeromq.md b/site/docs/spark/2.2.0/spark-streaming-zeromq.md new file mode 100644 index 0000000..62acbd1 --- /dev/null +++ b/site/docs/spark/2.2.0/spark-streaming-zeromq.md @@ -0,0 +1,65 @@ +--- +layout: page +title: Spark Streaming ZeroMQ +description: Spark Streaming ZeroMQ +group: nav-right +--- +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +{% include JB/setup %} + +A library for reading data from [ZeroMQ](http://zeromq.org/) using Spark Streaming. + +## Linking + +Using SBT: + + libraryDependencies += "org.apache.bahir" %% "spark-streaming-zeromq" % "2.2.0" + +Using Maven: + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>spark-streaming-zeromq_2.11</artifactId> + <version>2.2.0</version> + </dependency> + +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option. +For example, to include it when starting the spark shell: + + $ bin/spark-shell --packages org.apache.bahir:spark-streaming-zeromq_2.11:2.2.0 + +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. +The `--packages` argument can also be used with `bin/spark-submit`. + +This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above. + +## Examples + + +### Scala API + + val lines = ZeroMQUtils.createStream(ssc, ...) + +### Java API + + JavaDStream<String> lines = ZeroMQUtils.createStream(jssc, ...); + +See end-to-end examples at [ZeroMQ Examples](https://github.com/apache/bahir/tree/master/streaming-zeromq/examples) http://git-wip-us.apache.org/repos/asf/bahir-website/blob/feca5587/site/docs/spark/overview.md ---------------------------------------------------------------------- diff --git a/site/docs/spark/overview.md b/site/docs/spark/overview.md index 7c52280..61cb9bb 100644 --- a/site/docs/spark/overview.md +++ b/site/docs/spark/overview.md @@ -27,7 +27,8 @@ limitations under the License. ### Apache Bahir Extensions for Apache Spark - - [Current - 2.2.0-SNAPSHOT](/docs/spark/current/documentation) + - [Current - 2.3.0-SNAPSHOT](/docs/spark/current/documentation) + - [2.2.0](/docs/spark/2.2.0/documentation) - [2.1.1](/docs/spark/2.1.1/documentation) - [2.1.0](/docs/spark/2.1.0/documentation) - [2.0.2](/docs/spark/2.0.2/documentation)
