Repository: spark
Updated Branches:
  refs/heads/master fc472bddd -> b8302ccd0


[SPARK-20015][SPARKR][SS][DOC][EXAMPLE] Document R Structured Streaming 
(experimental) in R vignettes and R & SS programming guide, R example

## What changes were proposed in this pull request?

Add
- R vignettes
- R programming guide
- SS programming guide
- R example

Also disable spark.als in vignettes for now since it's failing (SPARK-20402)

## How was this patch tested?

manually

Author: Felix Cheung <felixcheun...@hotmail.com>

Closes #17814 from felixcheung/rdocss.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8302ccd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8302ccd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8302ccd

Branch: refs/heads/master
Commit: b8302ccd02265f9d7a7895c7b033441fa2d8ffd1
Parents: fc472bd
Author: Felix Cheung <felixcheun...@hotmail.com>
Authored: Thu May 4 00:27:10 2017 -0700
Committer: Felix Cheung <felixche...@apache.org>
Committed: Thu May 4 00:27:10 2017 -0700

----------------------------------------------------------------------
 R/pkg/vignettes/sparkr-vignettes.Rmd            |  79 ++++-
 docs/sparkr.md                                  |   4 +
 docs/structured-streaming-programming-guide.md  | 285 ++++++++++++++++---
 .../r/streaming/structured_network_wordcount.R  |  57 ++++
 4 files changed, 381 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b8302ccd/R/pkg/vignettes/sparkr-vignettes.Rmd
----------------------------------------------------------------------
diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd 
b/R/pkg/vignettes/sparkr-vignettes.Rmd
index 4b9d6c3..d38ec4f 100644
--- a/R/pkg/vignettes/sparkr-vignettes.Rmd
+++ b/R/pkg/vignettes/sparkr-vignettes.Rmd
@@ -182,7 +182,7 @@ head(df)
 ```
 
 ### Data Sources
-SparkR supports operating on a variety of data sources through the 
`SparkDataFrame` interface. You can check the Spark SQL programming guide for 
more [specific 
options](https://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options)
 that are available for the built-in data sources.
+SparkR supports operating on a variety of data sources through the 
`SparkDataFrame` interface. You can check the Spark SQL Programming Guide for 
more [specific 
options](https://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options)
 that are available for the built-in data sources.
 
 The general method for creating `SparkDataFrame` from data sources is 
`read.df`. This method takes in the path for the file to load and the type of 
data source, and the currently active Spark Session will be used automatically. 
SparkR supports reading CSV, JSON and Parquet files natively and through Spark 
Packages you can find data source connectors for popular file formats like 
Avro. These packages can be added with `sparkPackages` parameter when 
initializing SparkSession using `sparkR.session`.
 
@@ -232,7 +232,7 @@ write.df(people, path = "people.parquet", source = 
"parquet", mode = "overwrite"
 ```
 
 ### Hive Tables
-You can also create SparkDataFrames from Hive tables. To do this we will need 
to create a SparkSession with Hive support which can access tables in the Hive 
MetaStore. Note that Spark should have been built with Hive support and more 
details can be found in the [SQL programming 
guide](https://spark.apache.org/docs/latest/sql-programming-guide.html). In 
SparkR, by default it will attempt to create a SparkSession with Hive support 
enabled (`enableHiveSupport = TRUE`).
+You can also create SparkDataFrames from Hive tables. To do this we will need 
to create a SparkSession with Hive support which can access tables in the Hive 
MetaStore. Note that Spark should have been built with Hive support and more 
details can be found in the [SQL Programming 
Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html). In 
SparkR, by default it will attempt to create a SparkSession with Hive support 
enabled (`enableHiveSupport = TRUE`).
 
 ```{r, eval=FALSE}
 sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
@@ -314,7 +314,7 @@ Use `cube` or `rollup` to compute subtotals across multiple 
dimensions.
 mean(cube(carsDF, "cyl", "gear", "am"), "mpg")
 ```
 
-generates groupings for {(`cyl`, `gear`, `am`), (`cyl`, `gear`), (`cyl`), ()}, 
while 
+generates groupings for {(`cyl`, `gear`, `am`), (`cyl`, `gear`), (`cyl`), ()}, 
while
 
 ```{r}
 mean(rollup(carsDF, "cyl", "gear", "am"), "mpg")
@@ -672,6 +672,7 @@ head(select(naiveBayesPrediction, "Class", "Sex", "Age", 
"Survived", "prediction
 Survival analysis studies the expected duration of time until an event 
happens, and often the relationship with risk factors or treatment taken on the 
subject. In contrast to standard regression analysis, survival modeling has to 
deal with special characteristics in the data including non-negative survival 
time and censoring.
 
 Accelerated Failure Time (AFT) model is a parametric survival model for 
censored data that assumes the effect of a covariate is to accelerate or 
decelerate the life course of an event by some constant. For more information, 
refer to the Wikipedia page [AFT 
Model](https://en.wikipedia.org/wiki/Accelerated_failure_time_model) and the 
references there. Different from a [Proportional Hazards 
Model](https://en.wikipedia.org/wiki/Proportional_hazards_model) designed for 
the same purpose, the AFT model is easier to parallelize because each instance 
contributes to the objective function independently.
+
 ```{r, warning=FALSE}
 library(survival)
 ovarianDF <- createDataFrame(ovarian)
@@ -902,7 +903,7 @@ perplexity
 
 There are multiple options that can be configured in `spark.als`, including 
`rank`, `reg`, `nonnegative`. For a complete list, refer to the help file.
 
-```{r}
+```{r, eval=FALSE}
 ratings <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 
4.0),
                 list(2, 1, 1.0), list(2, 2, 5.0))
 df <- createDataFrame(ratings, c("user", "item", "rating"))
@@ -910,7 +911,7 @@ model <- spark.als(df, "rating", "user", "item", rank = 10, 
reg = 0.1, nonnegati
 ```
 
 Extract latent factors.
-```{r}
+```{r, eval=FALSE}
 stats <- summary(model)
 userFactors <- stats$userFactors
 itemFactors <- stats$itemFactors
@@ -920,7 +921,7 @@ head(itemFactors)
 
 Make predictions.
 
-```{r}
+```{r, eval=FALSE}
 predicted <- predict(model, df)
 head(predicted)
 ```
@@ -1002,6 +1003,72 @@ unlink(modelPath)
 ```
 
 
+## Structured Streaming
+
+SparkR supports the Structured Streaming API (experimental).
+
+You can check the Structured Streaming Programming Guide for [an 
introduction](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#programming-model)
 to its programming model and basic concepts.
+
+### Simple Source and Sink
+
+Spark has a few built-in input sources. As an example, to test with a socket 
source reading text into words and displaying the computed word counts:
+
+```{r, eval=FALSE}
+# Create DataFrame representing the stream of input lines from connection
+lines <- read.stream("socket", host = hostname, port = port)
+
+# Split the lines into words
+words <- selectExpr(lines, "explode(split(value, ' ')) as word")
+
+# Generate running word count
+wordCounts <- count(groupBy(words, "word"))
+
+# Start running the query that prints the running counts to the console
+query <- write.stream(wordCounts, "console", outputMode = "complete")
+```
+
+### Kafka Source
+
+It is simple to read data from Kafka. For more information, see [Input 
Sources](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources)
 supported by Structured Streaming.
+
+```{r, eval=FALSE}
+topic <- read.stream("kafka",
+                     kafka.bootstrap.servers = "host1:port1,host2:port2",
+                     subscribe = "topic1")
+keyvalue <- selectExpr(topic, "CAST(key AS STRING)", "CAST(value AS STRING)")
+```
+
+### Operations and Sinks
+
+Most of the common operations on `SparkDataFrame` are supported for streaming, 
including selection, projection, and aggregation. Once you have defined the 
final result, to start the streaming computation, you will call the 
`write.stream` method setting a sink and `outputMode`.
+
+A streaming `SparkDataFrame` can be written for debugging to the console, to a 
temporary in-memory table, or for further processing in a fault-tolerant manner 
to a File Sink in different formats.
+
+```{r, eval=FALSE}
+noAggDF <- select(where(deviceDataStreamingDf, "signal > 10"), "device")
+
+# Print new data to console
+write.stream(noAggDF, "console")
+
+# Write new data to Parquet files
+write.stream(noAggDF,
+             "parquet",
+             path = "path/to/destination/dir",
+             checkpointLocation = "path/to/checkpoint/dir")
+
+# Aggregate
+aggDF <- count(groupBy(noAggDF, "device"))
+
+# Print updated aggregations to console
+write.stream(aggDF, "console", outputMode = "complete")
+
+# Have all the aggregates in an in memory table. The query name will be the 
table name
+write.stream(aggDF, "memory", queryName = "aggregates", outputMode = 
"complete")
+
+head(sql("select * from aggregates"))
+```
+
+
 ## Advanced Topics
 
 ### SparkR Object Classes

http://git-wip-us.apache.org/repos/asf/spark/blob/b8302ccd/docs/sparkr.md
----------------------------------------------------------------------
diff --git a/docs/sparkr.md b/docs/sparkr.md
index 6dbd02a..569b85e 100644
--- a/docs/sparkr.md
+++ b/docs/sparkr.md
@@ -593,6 +593,10 @@ The following example shows how to save/load a MLlib model 
by SparkR.
 </tr>
 </table>
 
+# Structured Streaming
+
+SparkR supports the Structured Streaming API (experimental). Structured 
Streaming is a scalable and fault-tolerant stream processing engine built on 
the Spark SQL engine. For more information see the R API on the [Structured 
Streaming Programming Guide](structured-streaming-programming-guide.html)
+
 # R Function Name Conflicts
 
 When loading and attaching a new package in R, it is possible to have a name 
[conflict](https://stat.ethz.ch/R-manual/R-devel/library/base/html/library.html),
 where a

http://git-wip-us.apache.org/repos/asf/spark/blob/b8302ccd/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 5b18cf2..53b3db2 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -8,13 +8,13 @@ title: Structured Streaming Programming Guide
 {:toc}
 
 # Overview
-Structured Streaming is a scalable and fault-tolerant stream processing engine 
built on the Spark SQL engine. You can express your streaming computation the 
same way you would express a batch computation on static data. The Spark SQL 
engine will take care of running it incrementally and continuously and updating 
the final result as streaming data continues to arrive. You can use the 
[Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java or Python to 
express streaming aggregations, event-time windows, stream-to-batch joins, etc. 
The computation is executed on the same optimized Spark SQL engine. Finally, 
the system ensures end-to-end exactly-once fault-tolerance guarantees through 
checkpointing and Write Ahead Logs. In short, *Structured Streaming provides 
fast, scalable, fault-tolerant, end-to-end exactly-once stream processing 
without the user having to reason about streaming.*
+Structured Streaming is a scalable and fault-tolerant stream processing engine 
built on the Spark SQL engine. You can express your streaming computation the 
same way you would express a batch computation on static data. The Spark SQL 
engine will take care of running it incrementally and continuously and updating 
the final result as streaming data continues to arrive. You can use the 
[Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java, Python or R 
to express streaming aggregations, event-time windows, stream-to-batch joins, 
etc. The computation is executed on the same optimized Spark SQL engine. 
Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees 
through checkpointing and Write Ahead Logs. In short, *Structured Streaming 
provides fast, scalable, fault-tolerant, end-to-end exactly-once stream 
processing without the user having to reason about streaming.*
 
-**Structured Streaming is still ALPHA in Spark 2.1** and the APIs are still 
experimental. In this guide, we are going to walk you through the programming 
model and the APIs. First, let's start with a simple example - a streaming word 
count. 
+**Structured Streaming is still ALPHA in Spark 2.1** and the APIs are still 
experimental. In this guide, we are going to walk you through the programming 
model and the APIs. First, let's start with a simple example - a streaming word 
count.
 
 # Quick Example
-Let’s say you want to maintain a running word count of text data received 
from a data server listening on a TCP socket. Let’s see how you can express 
this using Structured Streaming. You can see the full code in 
-[Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java)/[Python]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/sql/streaming/structured_network_wordcount.py).
+Let’s say you want to maintain a running word count of text data received 
from a data server listening on a TCP socket. Let’s see how you can express 
this using Structured Streaming. You can see the full code in
+[Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java)/[Python]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/sql/streaming/structured_network_wordcount.py)/[R]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/r/streaming/structured_network_wordcount.R).
 And if you [download Spark](http://spark.apache.org/downloads.html), you can 
directly run the example. In any case, let’s walk through the example 
step-by-step and understand how it works. First, we have to import the 
necessary classes and create a local SparkSession, the starting point of all 
functionalities related to Spark.
 
 <div class="codetabs">
@@ -64,6 +64,13 @@ spark = SparkSession \
 {% endhighlight %}
 
 </div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+sparkR.session(appName = "StructuredNetworkWordCount")
+{% endhighlight %}
+
+</div>
 </div>
 
 Next, let’s create a streaming DataFrame that represents text data received 
from a server listening on localhost:9999, and transform the DataFrame to 
calculate word counts.
@@ -137,6 +144,22 @@ wordCounts = words.groupBy("word").count()
 This `lines` DataFrame represents an unbounded table containing the streaming 
text data. This table contains one column of strings named "value", and each 
line in the streaming text data becomes a row in the table. Note, that this is 
not currently receiving any data as we are just setting up the transformation, 
and have not yet started it. Next, we have used two built-in SQL functions - 
split and explode, to split each line into multiple rows with a word each. In 
addition, we use the function `alias` to name the new column as "word". 
Finally, we have defined the `wordCounts` DataFrame by grouping by the unique 
values in the Dataset and counting them. Note that this is a streaming 
DataFrame which represents the running word counts of the stream.
 
 </div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+# Create DataFrame representing the stream of input lines from connection to 
localhost:9999
+lines <- read.stream("socket", host = "localhost", port = 9999)
+
+# Split the lines into words
+words <- selectExpr(lines, "explode(split(value, ' ')) as word")
+
+# Generate running word count
+wordCounts <- count(group_by(words, "word"))
+{% endhighlight %}
+
+This `lines` SparkDataFrame represents an unbounded table containing the 
streaming text data. This table contains one column of strings named "value", 
and each line in the streaming text data becomes a row in the table. Note, that 
this is not currently receiving any data as we are just setting up the 
transformation, and have not yet started it. Next, we have a SQL expression 
with two SQL functions - split and explode, to split each line into multiple 
rows with a word each. In addition, we name the new column as "word". Finally, 
we have defined the `wordCounts` SparkDataFrame by grouping by the unique 
values in the SparkDataFrame and counting them. Note that this is a streaming 
SparkDataFrame which represents the running word counts of the stream.
+
+</div>
 </div>
 
 We have now set up the query on the streaming data. All that is left is to 
actually start receiving data and computing the counts. To do this, we set it 
up to print the complete set of counts (specified by `outputMode("complete")`) 
to the console every time they are updated. And then start the streaming 
computation using `start()`.
@@ -182,9 +205,19 @@ query.awaitTermination()
 {% endhighlight %}
 
 </div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+# Start running the query that prints the running counts to the console
+query <- write.stream(wordCounts, "console", outputMode = "complete")
+
+awaitTermination(query)
+{% endhighlight %}
+
+</div>
 </div>
 
-After this code is executed, the streaming computation will have started in 
the background. The `query` object is a handle to that active streaming query, 
and we have decided to wait for the termination of the query using 
`query.awaitTermination()` to prevent the process from exiting while the query 
is active.
+After this code is executed, the streaming computation will have started in 
the background. The `query` object is a handle to that active streaming query, 
and we have decided to wait for the termination of the query using 
`awaitTermination()` to prevent the process from exiting while the query is 
active.
 
 To actually execute this example code, you can either compile the code in your 
own 
 [Spark application](quick-start.html#self-contained-applications), or simply 
@@ -211,6 +244,11 @@ $ ./bin/run-example 
org.apache.spark.examples.sql.streaming.JavaStructuredNetwor
 $ ./bin/spark-submit 
examples/src/main/python/sql/streaming/structured_network_wordcount.py 
localhost 9999
 {% endhighlight %}
 </div>
+<div data-lang="r"  markdown="1">
+{% highlight bash %}
+$ ./bin/spark-submit 
examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999
+{% endhighlight %}
+</div>
 </div>
 
 Then, any lines typed in the terminal running the netcat server will be 
counted and printed on screen every second. It will look something like the 
following.
@@ -338,6 +376,35 @@ Batch: 1
 ...
 {% endhighlight %}
 </div>
+<div data-lang="r" markdown="1">
+{% highlight bash %}
+# TERMINAL 2: RUNNING structured_network_wordcount.R
+
+$ ./bin/spark-submit 
examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999
+
+-------------------------------------------
+Batch: 0
+-------------------------------------------
++------+-----+
+| value|count|
++------+-----+
+|apache|    1|
+| spark|    1|
++------+-----+
+
+-------------------------------------------
+Batch: 1
+-------------------------------------------
++------+-----+
+| value|count|
++------+-----+
+|apache|    2|
+| spark|    1|
+|hadoop|    1|
++------+-----+
+...
+{% endhighlight %}
+</div>
 </div>
     </td>
 </table>
@@ -409,14 +476,14 @@ to track the read position in the stream. The engine uses 
checkpointing and writ
 
 # API using Datasets and DataFrames
 Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, 
as well as streaming, unbounded data. Similar to static Datasets/DataFrames, 
you can use the common entry point `SparkSession`
-([Scala](api/scala/index.html#org.apache.spark.sql.SparkSession)/[Java](api/java/org/apache/spark/sql/SparkSession.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.SparkSession)
 docs)
+([Scala](api/scala/index.html#org.apache.spark.sql.SparkSession)/[Java](api/java/org/apache/spark/sql/SparkSession.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.SparkSession)/[R](api/R/sparkR.session.html)
 docs)
 to create streaming DataFrames/Datasets from streaming sources, and apply the 
same operations on them as static DataFrames/Datasets. If you are not familiar 
with Datasets/DataFrames, you are strongly advised to familiarize yourself with 
them using the
 [DataFrame/Dataset Programming Guide](sql-programming-guide.html).
 
 ## Creating streaming DataFrames and streaming Datasets
-Streaming DataFrames can be created through the `DataStreamReader` interface 
+Streaming DataFrames can be created through the `DataStreamReader` interface
 
([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamReader.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader)
 docs)
-returned by `SparkSession.readStream()`. Similar to the read interface for 
creating static DataFrame, you can specify the details of the source – data 
format, schema, options, etc.
+returned by `SparkSession.readStream()`. In [R](api/R/read.stream.html), with 
the `read.stream()` method. Similar to the read interface for creating static 
DataFrame, you can specify the details of the source – data format, schema, 
options, etc.
 
 #### Input Sources
 In Spark 2.0, there are a few built-in sources.
@@ -445,7 +512,8 @@ Here are the details of all the sources in Spark.
         <code>path</code>: path to the input directory, and common to all file 
formats.
         <br/><br/>
         For file-format-specific options, see the related methods in 
<code>DataStreamReader</code>
-        (<a 
href="api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a
 
href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a
 
href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>).
+        (<a 
href="api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a
 
href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a
 
href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>/<a
+        href="api/R/read.stream.html">R</a>).
         E.g. for "parquet" format options see 
<code>DataStreamReader.parquet()</code></td>
     <td>Yes</td>
     <td>Supports glob paths, but does not support multiple comma-separated 
paths/globs.</td>
@@ -483,7 +551,7 @@ Here are some examples.
 {% highlight scala %}
 val spark: SparkSession = ...
 
-// Read text from socket 
+// Read text from socket
 val socketDF = spark
   .readStream
   .format("socket")
@@ -493,7 +561,7 @@ val socketDF = spark
 
 socketDF.isStreaming    // Returns True for DataFrames that have streaming 
sources
 
-socketDF.printSchema 
+socketDF.printSchema
 
 // Read all the csv files written atomically in a directory
 val userSchema = new StructType().add("name", "string").add("age", "integer")
@@ -510,7 +578,7 @@ val csvDF = spark
 {% highlight java %}
 SparkSession spark = ...
 
-// Read text from socket 
+// Read text from socket
 Dataset<Row> socketDF = spark
   .readStream()
   .format("socket")
@@ -537,7 +605,7 @@ Dataset<Row> csvDF = spark
 {% highlight python %}
 spark = SparkSession. ...
 
-# Read text from socket 
+# Read text from socket
 socketDF = spark \
     .readStream \
     .format("socket") \
@@ -547,7 +615,7 @@ socketDF = spark \
 
 socketDF.isStreaming()    # Returns True for DataFrames that have streaming 
sources
 
-socketDF.printSchema() 
+socketDF.printSchema()
 
 # Read all the csv files written atomically in a directory
 userSchema = StructType().add("name", "string").add("age", "integer")
@@ -559,6 +627,25 @@ csvDF = spark \
 {% endhighlight %}
 
 </div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+sparkR.session(...)
+
+# Read text from socket
+socketDF <- read.stream("socket", host = hostname, port = port)
+
+isStreaming(socketDF)    # Returns TRUE for SparkDataFrames that have 
streaming sources
+
+printSchema(socketDF)
+
+# Read all the csv files written atomically in a directory
+schema <- structType(structField("name", "string"),
+                     structField("age", "integer"))
+csvDF <- read.stream("csv", path = "/path/to/directory", schema = schema, sep 
= ";")
+{% endhighlight %}
+
+</div>
 </div>
 
 These examples generate streaming DataFrames that are untyped, meaning that 
the schema of the DataFrame is not checked at compile time, only checked at 
runtime when the query is submitted. Some operations like `map`, `flatMap`, 
etc. need the type to be known at compile time. To do those, you can convert 
these untyped streaming DataFrames to typed streaming Datasets using the same 
methods as static DataFrame. See the [SQL Programming 
Guide](sql-programming-guide.html) for more details. Additionally, more details 
on the supported streaming sources are discussed later in the document.
@@ -638,12 +725,24 @@ ds.groupByKey((MapFunction<DeviceData, String>) value -> 
value.getDeviceType(),
 df = ...  # streaming DataFrame with IOT device data with schema { device: 
string, deviceType: string, signal: double, time: DateType }
 
 # Select the devices which have signal more than 10
-df.select("device").where("signal > 10")                              
+df.select("device").where("signal > 10")
 
 # Running count of the number of updates for each device type
 df.groupBy("deviceType").count()
 {% endhighlight %}
 </div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+df <- ...  # streaming DataFrame with IOT device data with schema { device: 
string, deviceType: string, signal: double, time: DateType }
+
+# Select the devices which have signal more than 10
+select(where(df, "signal > 10"), "device")
+
+# Running count of the number of updates for each device type
+count(groupBy(df, "deviceType"))
+{% endhighlight %}
+</div>
 </div>
 
 ### Window Operations on Event Time
@@ -840,7 +939,7 @@ Streaming DataFrames can be joined with static DataFrames 
to create new streamin
 
 {% highlight scala %}
 val staticDf = spark.read. ...
-val streamingDf = spark.readStream. ... 
+val streamingDf = spark.readStream. ...
 
 streamingDf.join(staticDf, "type")          // inner equi-join with a static DF
 streamingDf.join(staticDf, "type", "right_join")  // right outer join with a 
static DF  
@@ -972,7 +1071,7 @@ Once you have defined the final result DataFrame/Dataset, 
all that is left is fo
 
([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamWriter.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamWriter)
 docs)
 returned through `Dataset.writeStream()`. You will have to specify one or more 
of the following in this interface.
 
-- *Details of the output sink:* Data format, location, etc. 
+- *Details of the output sink:* Data format, location, etc.
 
 - *Output mode:* Specify what gets written to the output sink.
 
@@ -1077,7 +1176,7 @@ Here is the compatibility matrix.
 #### Output Sinks
 There are a few types of built-in output sinks.
 
-- **File sink** - Stores the output to a directory. 
+- **File sink** - Stores the output to a directory.
 
 {% highlight scala %}
 writeStream
@@ -1145,7 +1244,8 @@ Here are the details of all the sinks in Spark.
         · "s3a://a/b/c/dataset.txt"<br/>
         <br/>
         For file-format-specific options, see the related methods in 
DataFrameWriter
-        (<a 
href="api/scala/index.html#org.apache.spark.sql.DataFrameWriter">Scala</a>/<a 
href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a 
href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>).
+        (<a 
href="api/scala/index.html#org.apache.spark.sql.DataFrameWriter">Scala</a>/<a 
href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a 
href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>/<a
+        href="api/R/write.stream.html">R</a>).
         E.g. for "parquet" format options see 
<code>DataFrameWriter.parquet()</code>
     </td>
     <td>Yes</td>
@@ -1208,7 +1308,7 @@ noAggDF
   .option("checkpointLocation", "path/to/checkpoint/dir")
   .option("path", "path/to/destination/dir")
   .start()
-   
+
 // ========== DF with aggregation ==========
 val aggDF = df.groupBy("device").count()
 
@@ -1219,7 +1319,7 @@ aggDF
   .format("console")
   .start()
 
-// Have all the aggregates in an in-memory table 
+// Have all the aggregates in an in-memory table
 aggDF
   .writeStream
   .queryName("aggregates")    // this query name will be the table name
@@ -1250,7 +1350,7 @@ noAggDF
   .option("checkpointLocation", "path/to/checkpoint/dir")
   .option("path", "path/to/destination/dir")
   .start();
-   
+
 // ========== DF with aggregation ==========
 Dataset<Row> aggDF = df.groupBy("device").count();
 
@@ -1261,7 +1361,7 @@ aggDF
   .format("console")
   .start();
 
-// Have all the aggregates in an in-memory table 
+// Have all the aggregates in an in-memory table
 aggDF
   .writeStream()
   .queryName("aggregates")    // this query name will be the table name
@@ -1292,7 +1392,7 @@ noAggDF \
     .option("checkpointLocation", "path/to/checkpoint/dir") \
     .option("path", "path/to/destination/dir") \
     .start()
-   
+
 # ========== DF with aggregation ==========
 aggDF = df.groupBy("device").count()
 
@@ -1315,6 +1415,35 @@ spark.sql("select * from aggregates").show()   # 
interactively query in-memory t
 {% endhighlight %}
 
 </div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+# ========== DF with no aggregations ==========
+noAggDF <- select(where(deviceDataDf, "signal > 10"), "device")
+
+# Print new data to console
+write.stream(noAggDF, "console")
+
+# Write new data to Parquet files
+write.stream(noAggDF,
+             "parquet",
+             path = "path/to/destination/dir",
+             checkpointLocation = "path/to/checkpoint/dir")
+
+# ========== DF with aggregation ==========
+aggDF <- count(groupBy(df, "device"))
+
+# Print updated aggregations to console
+write.stream(aggDF, "console", outputMode = "complete")
+
+# Have all the aggregates in an in memory table. The query name will be the 
table name
+write.stream(aggDF, "memory", queryName = "aggregates", outputMode = 
"complete")
+
+# Interactively query in-memory table
+head(sql("select * from aggregates"))
+{% endhighlight %}
+
+</div>
 </div>
 
 #### Using Foreach
@@ -1351,7 +1480,7 @@ query.name        // get the name of the auto-generated 
or user-specified name
 
 query.explain()   // print detailed explanations of the query
 
-query.stop()      // stop the query 
+query.stop()      // stop the query
 
 query.awaitTermination()   // block until query is terminated, with stop() or 
with error
 
@@ -1403,7 +1532,7 @@ query.name()        # get the name of the auto-generated 
or user-specified name
 
 query.explain()   # print detailed explanations of the query
 
-query.stop()      # stop the query 
+query.stop()      # stop the query
 
 query.awaitTermination()   # block until query is terminated, with stop() or 
with error
 
@@ -1416,6 +1545,24 @@ query.lastProgress()    # the most recent progress 
update of this streaming quer
 {% endhighlight %}
 
 </div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+query <- write.stream(df, "console")  # get the query object
+
+queryName(query)          # get the name of the auto-generated or 
user-specified name
+
+explain(query)            # print detailed explanations of the query
+
+stopQuery(query)          # stop the query
+
+awaitTermination(query)   # block until query is terminated, with stop() or 
with error
+
+lastProgress(query)       # the most recent progress update of this streaming 
query
+
+{% endhighlight %}
+
+</div>
 </div>
 
 You can start any number of queries in a single SparkSession. They will all be 
running concurrently sharing the cluster resources. You can use 
`sparkSession.streams()` to get the `StreamingQueryManager`
@@ -1462,6 +1609,12 @@ spark.streams().awaitAnyTermination()  # block until any 
one of them terminates
 {% endhighlight %}
 
 </div>
+<div data-lang="r"  markdown="1">
+{% highlight bash %}
+Not available in R.
+{% endhighlight %}
+
+</div>
 </div>
 
 
@@ -1645,6 +1798,58 @@ Will print something like the following.
 {% endhighlight %}
 
 </div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+query <- ...  # a StreamingQuery
+lastProgress(query)
+
+'''
+Will print something like the following.
+
+{
+  "id" : "8c57e1ec-94b5-4c99-b100-f694162df0b9",
+  "runId" : "ae505c5a-a64e-4896-8c28-c7cbaf926f16",
+  "name" : null,
+  "timestamp" : "2017-04-26T08:27:28.835Z",
+  "numInputRows" : 0,
+  "inputRowsPerSecond" : 0.0,
+  "processedRowsPerSecond" : 0.0,
+  "durationMs" : {
+    "getOffset" : 0,
+    "triggerExecution" : 1
+  },
+  "stateOperators" : [ {
+    "numRowsTotal" : 4,
+    "numRowsUpdated" : 0
+  } ],
+  "sources" : [ {
+    "description" : "TextSocketSource[host: localhost, port: 9999]",
+    "startOffset" : 1,
+    "endOffset" : 1,
+    "numInputRows" : 0,
+    "inputRowsPerSecond" : 0.0,
+    "processedRowsPerSecond" : 0.0
+  } ],
+  "sink" : {
+    "description" : 
"org.apache.spark.sql.execution.streaming.ConsoleSink@76b37531"
+  }
+}
+'''
+
+status(query)
+'''
+Will print something like the following.
+
+{
+  "message" : "Waiting for data to arrive",
+  "isDataAvailable" : false,
+  "isTriggerActive" : false
+}
+'''
+{% endhighlight %}
+
+</div>
 </div>
 
 ### Asynchronous API
@@ -1704,10 +1909,16 @@ Not available in Python.
 {% endhighlight %}
 
 </div>
+<div data-lang="r"  markdown="1">
+{% highlight bash %}
+Not available in R.
+{% endhighlight %}
+
+</div>
 </div>
 
 ## Recovering from Failures with Checkpointing 
-In case of a failure or intentional shutdown, you can recover the previous 
progress and state of a previous query, and continue where it left off. This is 
done using checkpointing and write ahead logs. You can configure a query with a 
checkpoint location, and the query will save all the progress information (i.e. 
range of offsets processed in each trigger) and the running aggregates (e.g. 
word counts in the [quick example](#quick-example)) to the checkpoint location. 
This checkpoint location has to be a path in an HDFS compatible file system, 
and can be set as an option in the DataStreamWriter when [starting a 
query](#starting-streaming-queries). 
+In case of a failure or intentional shutdown, you can recover the previous 
progress and state of a previous query, and continue where it left off. This is 
done using checkpointing and write ahead logs. You can configure a query with a 
checkpoint location, and the query will save all the progress information (i.e. 
range of offsets processed in each trigger) and the running aggregates (e.g. 
word counts in the [quick example](#quick-example)) to the checkpoint location. 
This checkpoint location has to be a path in an HDFS compatible file system, 
and can be set as an option in the DataStreamWriter when [starting a 
query](#starting-streaming-queries).
 
 <div class="codetabs">
 <div data-lang="scala"  markdown="1">
@@ -1746,19 +1957,17 @@ aggDF \
 {% endhighlight %}
 
 </div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = 
"path/to/HDFS/dir")
+{% endhighlight %}
+
+</div>
 </div>
 
 # Where to go from here
-- Examples: See and run the 
-[Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming)/[Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/sql/streaming)/[Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/sql/streaming)
 
+- Examples: See and run the
+[Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming)/[Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/sql/streaming)/[Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/sql/streaming)/[R]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/r/streaming)
 examples.
 - Spark Summit 2016 Talk - [A Deep Dive into Structured 
Streaming](https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/)
-
-
-
-
-
-
-
-
-

http://git-wip-us.apache.org/repos/asf/spark/blob/b8302ccd/examples/src/main/r/streaming/structured_network_wordcount.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/streaming/structured_network_wordcount.R 
b/examples/src/main/r/streaming/structured_network_wordcount.R
new file mode 100644
index 0000000..cda18eb
--- /dev/null
+++ b/examples/src/main/r/streaming/structured_network_wordcount.R
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Counts words in UTF8 encoded, '\n' delimited text received from the network.
+
+# To run this on your local machine, you need to first run a Netcat server
+# $ nc -lk 9999
+# and then run the example
+# ./bin/spark-submit 
examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999
+
+# Load SparkR library into your R session
+library(SparkR)
+
+# Initialize SparkSession
+sparkR.session(appName = 
"SparkR-Streaming-structured-network-wordcount-example")
+
+args <- commandArgs(trailing = TRUE)
+
+if (length(args) != 2) {
+  print("Usage: structured_network_wordcount.R <hostname> <port>")
+  print("<hostname> and <port> describe the TCP server that Structured 
Streaming")
+  print("would connect to receive data.")
+  q("no")
+}
+
+hostname <- args[[1]]
+port <- as.integer(args[[2]])
+
+# Create DataFrame representing the stream of input lines from connection to 
localhost:9999
+lines <- read.stream("socket", host = hostname, port = port)
+
+# Split the lines into words
+words <- selectExpr(lines, "explode(split(value, ' ')) as word")
+
+# Generate running word count
+wordCounts <- count(groupBy(words, "word"))
+
+# Start running the query that prints the running counts to the console
+query <- write.stream(wordCounts, "console", outputMode = "complete")
+
+awaitTermination(query)
+
+sparkR.session.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to