Repository: spark
Updated Branches:
  refs/heads/master befab9c1c -> d4eee9932


[MINOR][DOCS] Fix minor typos in python example code

## What changes were proposed in this pull request?

Fix minor typos python example code in streaming programming guide

## How was this patch tested?

N/A

Author: Dmitriy Sokolov <[email protected]>

Closes #14805 from silentsokolov/fix-typos.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d4eee993
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d4eee993
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d4eee993

Branch: refs/heads/master
Commit: d4eee9932edf1a489d7fe9120a0f003150834df6
Parents: befab9c
Author: Dmitriy Sokolov <[email protected]>
Authored: Tue Aug 30 11:23:37 2016 +0100
Committer: Sean Owen <[email protected]>
Committed: Tue Aug 30 11:23:37 2016 +0100

----------------------------------------------------------------------
 docs/mllib-data-types.md                       | 16 ++---
 docs/programming-guide.md                      | 16 +++--
 docs/quick-start.md                            |  6 +-
 docs/streaming-kafka-0-8-integration.md        |  4 +-
 docs/streaming-programming-guide.md            | 33 +++++----
 docs/structured-streaming-programming-guide.md | 79 ++++++++++-----------
 6 files changed, 77 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d4eee993/docs/mllib-data-types.md
----------------------------------------------------------------------
diff --git a/docs/mllib-data-types.md b/docs/mllib-data-types.md
index 7dd3c97..35cee32 100644
--- a/docs/mllib-data-types.md
+++ b/docs/mllib-data-types.md
@@ -104,7 +104,7 @@ dv2 = [1.0, 0.0, 3.0]
 # Create a SparseVector.
 sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
 # Use a single-column SciPy csc_matrix as a sparse vector.
-sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 
2])), shape = (3, 1))
+sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 
2])), shape=(3, 1))
 {% endhighlight %}
 
 </div>
@@ -517,12 +517,12 @@ from pyspark.mllib.linalg.distributed import IndexedRow, 
IndexedRowMatrix
 
 # Create an RDD of indexed rows.
 #   - This can be done explicitly with the IndexedRow class:
-indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]), 
-                              IndexedRow(1, [4, 5, 6]), 
-                              IndexedRow(2, [7, 8, 9]), 
+indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
+                              IndexedRow(1, [4, 5, 6]),
+                              IndexedRow(2, [7, 8, 9]),
                               IndexedRow(3, [10, 11, 12])])
 #   - or by using (long, vector) tuples:
-indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]), 
+indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
                               (2, [7, 8, 9]), (3, [10, 11, 12])])
 
 # Create an IndexedRowMatrix from an RDD of IndexedRows.
@@ -731,15 +731,15 @@ from pyspark.mllib.linalg import Matrices
 from pyspark.mllib.linalg.distributed import BlockMatrix
 
 # Create an RDD of sub-matrix blocks.
-blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), 
+blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
                          ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 
12]))])
 
 # Create a BlockMatrix from an RDD of sub-matrix blocks.
 mat = BlockMatrix(blocks, 3, 2)
 
 # Get its size.
-m = mat.numRows() # 6
-n = mat.numCols() # 2
+m = mat.numRows()  # 6
+n = mat.numCols()  # 2
 
 # Get the blocks as an RDD of sub-matrix blocks.
 blocksRDD = mat.blocks

http://git-wip-us.apache.org/repos/asf/spark/blob/d4eee993/docs/programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 40287d7..74d5ee1 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -445,7 +445,7 @@ Similarly to text files, SequenceFiles can be saved and 
loaded by specifying the
 classes can be specified, but for standard Writables this is not required.
 
 {% highlight python %}
->>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
 >>> rdd.saveAsSequenceFile("path/to/file")
 >>> sorted(sc.sequenceFile("path/to/file").collect())
 [(1, u'a'), (2, u'aa'), (3, u'aaa')]
@@ -459,10 +459,12 @@ Elasticsearch ESInputFormat:
 
 {% highlight python %}
 $ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark
->>> conf = {"es.resource" : "index/type"}   # assume Elasticsearch is running 
on localhost defaults
->>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
-    "org.apache.hadoop.io.NullWritable", 
"org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
->>> rdd.first()         # the result is a MapWritable that is converted to a 
Python dict
+>>> conf = {"es.resource" : "index/type"}  # assume Elasticsearch is running 
on localhost defaults
+>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
+                             "org.apache.hadoop.io.NullWritable",
+                             "org.elasticsearch.hadoop.mr.LinkedMapWritable",
+                             conf=conf)
+>>> rdd.first()  # the result is a MapWritable that is converted to a Python 
dict
 (u'Elasticsearch ID',
  {u'field1': True,
   u'field2': u'Some Text',
@@ -797,7 +799,6 @@ def increment_counter(x):
 rdd.foreach(increment_counter)
 
 print("Counter value: ", counter)
-
 {% endhighlight %}
 </div>
 
@@ -1455,13 +1456,14 @@ The code below shows an accumulator being used to add 
up the elements of an arra
 
 {% highlight python %}
 >>> accum = sc.accumulator(0)
+>>> accum
 Accumulator<id=0, value=0>
 
 >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
 ...
 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
 
-scala> accum.value
+>>> accum.value
 10
 {% endhighlight %}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d4eee993/docs/quick-start.md
----------------------------------------------------------------------
diff --git a/docs/quick-start.md b/docs/quick-start.md
index a29e28f..2eab8d1 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -74,10 +74,10 @@ Spark's primary abstraction is a distributed collection of 
items called a Resili
 RDDs have _[actions](programming-guide.html#actions)_, which return values, 
and _[transformations](programming-guide.html#transformations)_, which return 
pointers to new RDDs. Let's start with a few actions:
 
 {% highlight python %}
->>> textFile.count() # Number of items in this RDD
+>>> textFile.count()  # Number of items in this RDD
 126
 
->>> textFile.first() # First item in this RDD
+>>> textFile.first()  # First item in this RDD
 u'# Apache Spark'
 {% endhighlight %}
 
@@ -90,7 +90,7 @@ Now let's use a transformation. We will use the 
[`filter`](programming-guide.htm
 We can chain together transformations and actions:
 
 {% highlight python %}
->>> textFile.filter(lambda line: "Spark" in line).count() # How many lines 
contain "Spark"?
+>>> textFile.filter(lambda line: "Spark" in line).count()  # How many lines 
contain "Spark"?
 15
 {% endhighlight %}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d4eee993/docs/streaming-kafka-0-8-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-0-8-integration.md 
b/docs/streaming-kafka-0-8-integration.md
index f8f7b95..d3fc9ad 100644
--- a/docs/streaming-kafka-0-8-integration.md
+++ b/docs/streaming-kafka-0-8-integration.md
@@ -195,8 +195,8 @@ Next, we discuss how to use this approach in your streaming 
application.
                    for o in offsetRanges:
                        print "%s %s %s %s" % (o.topic, o.partition, 
o.fromOffset, o.untilOffset)
 
-               directKafkaStream\
-                   .transform(storeOffsetRanges)\
+               directKafkaStream \
+                   .transform(storeOffsetRanges) \
                    .foreachRDD(printOffsetRanges)
        </div>
        </div>

http://git-wip-us.apache.org/repos/asf/spark/blob/d4eee993/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index 82d3647..c0e4f3b 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -930,7 +930,7 @@ JavaPairDStream<String, Integer> cleanedDStream = 
wordCounts.transform(
 <div data-lang="python" markdown="1">
 
 {% highlight python %}
-spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
+spamInfoRDD = sc.pickleFile(...)  # RDD containing spam information
 
 # join data stream with spam information to do data cleaning
 cleanedDStream = wordCounts.transform(lambda rdd: 
rdd.join(spamInfoRDD).filter(...))
@@ -1495,16 +1495,15 @@ See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_
 </div>
 <div data-lang="python" markdown="1">
 {% highlight python %}
-
 def getWordBlacklist(sparkContext):
-    if ('wordBlacklist' not in globals()):
-        globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
-    return globals()['wordBlacklist']
+    if ("wordBlacklist" not in globals()):
+        globals()["wordBlacklist"] = sparkContext.broadcast(["a", "b", "c"])
+    return globals()["wordBlacklist"]
 
 def getDroppedWordsCounter(sparkContext):
-    if ('droppedWordsCounter' not in globals()):
-        globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
-    return globals()['droppedWordsCounter']
+    if ("droppedWordsCounter" not in globals()):
+        globals()["droppedWordsCounter"] = sparkContext.accumulator(0)
+    return globals()["droppedWordsCounter"]
 
 def echo(time, rdd):
     # Get or register the blacklist Broadcast
@@ -1626,12 +1625,12 @@ See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_
 
 # Lazily instantiated global instance of SparkSession
 def getSparkSessionInstance(sparkConf):
-    if ('sparkSessionSingletonInstance' not in globals()):
-        globals()['sparkSessionSingletonInstance'] = SparkSession\
-            .builder\
-            .config(conf=sparkConf)\
+    if ("sparkSessionSingletonInstance" not in globals()):
+        globals()["sparkSessionSingletonInstance"] = SparkSession \
+            .builder \
+            .config(conf=sparkConf) \
             .getOrCreate()
-    return globals()['sparkSessionSingletonInstance']
+    return globals()["sparkSessionSingletonInstance"]
 
 ...
 
@@ -1829,11 +1828,11 @@ This behavior is made simple by using 
`StreamingContext.getOrCreate`. This is us
 {% highlight python %}
 # Function to create and setup a new StreamingContext
 def functionToCreateContext():
-    sc = SparkContext(...)   # new context
-    ssc = new StreamingContext(...)
-    lines = ssc.socketTextStream(...) # create DStreams
+    sc = SparkContext(...)  # new context
+    ssc = StreamingContext(...)
+    lines = ssc.socketTextStream(...)  # create DStreams
     ...
-    ssc.checkpoint(checkpointDirectory)   # set checkpoint directory
+    ssc.checkpoint(checkpointDirectory)  # set checkpoint directory
     return ssc
 
 # Get StreamingContext from checkpoint data or create a new one

http://git-wip-us.apache.org/repos/asf/spark/blob/d4eee993/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 8a88e06..cdc3975 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -59,9 +59,9 @@ from pyspark.sql import SparkSession
 from pyspark.sql.functions import explode
 from pyspark.sql.functions import split
 
-spark = SparkSession\
-    .builder()\
-    .appName("StructuredNetworkWordCount")\
+spark = SparkSession \
+    .builder() \
+    .appName("StructuredNetworkWordCount") \
     .getOrCreate()
 {% endhighlight %}
 
@@ -124,22 +124,22 @@ This `lines` DataFrame represents an unbounded table 
containing the streaming te
 
 {% highlight python %}
 # Create DataFrame representing the stream of input lines from connection to 
localhost:9999
-lines = spark\
-    .readStream\
-    .format('socket')\
-    .option('host', 'localhost')\
-    .option('port', 9999)\
+lines = spark \
+    .readStream \
+    .format("socket") \
+    .option("host", "localhost") \
+    .option("port", 9999) \
     .load()
 
 # Split the lines into words
 words = lines.select(
    explode(
-       split(lines.value, ' ')
-   ).alias('word')
+       split(lines.value, " ")
+   ).alias("word")
 )
 
 # Generate running word count
-wordCounts = words.groupBy('word').count()
+wordCounts = words.groupBy("word").count()
 {% endhighlight %}
 
 This `lines` DataFrame represents an unbounded table containing the streaming 
text data. This table contains one column of strings named "value", and each 
line in the streaming text data becomes a row in the table. Note, that this is 
not currently receiving any data as we are just setting up the transformation, 
and have not yet started it. Next, we have used two built-in SQL functions - 
split and explode, to split each line into multiple rows with a word each. In 
addition, we use the function `alias` to name the new column as "word". 
Finally, we have defined the `wordCounts` DataFrame by grouping by the unique 
values in the Dataset and counting them. Note that this is a streaming 
DataFrame which represents the running word counts of the stream.
@@ -180,10 +180,10 @@ query.awaitTermination();
 
 {% highlight python %}
  # Start running the query that prints the running counts to the console
-query = wordCounts\
-    .writeStream\
-    .outputMode('complete')\
-    .format('console')\
+query = wordCounts \
+    .writeStream \
+    .outputMode("complete") \
+    .format("console") \
     .start()
 
 query.awaitTermination()
@@ -488,7 +488,7 @@ spark = SparkSession. ...
 
 # Read text from socket 
 socketDF = spark \
-    .readStream()  \
+    .readStream() \
     .format("socket") \
     .option("host", "localhost") \
     .option("port", 9999) \
@@ -504,7 +504,7 @@ csvDF = spark \
     .readStream() \
     .option("sep", ";") \
     .schema(userSchema) \
-    .csv("/path/to/directory")    # Equivalent to 
format("csv").load("/path/to/directory")
+    .csv("/path/to/directory")  # Equivalent to 
format("csv").load("/path/to/directory")
 {% endhighlight %}
 
 </div>
@@ -596,8 +596,7 @@ ds.groupByKey(new MapFunction<DeviceData, String>() { // 
using typed API
 <div data-lang="python"  markdown="1">
 
 {% highlight python %}
-
-df = ...    # streaming DataFrame with IOT device data with schema { device: 
string, type: string, signal: double, time: DateType }
+df = ...  # streaming DataFrame with IOT device data with schema { device: 
string, type: string, signal: double, time: DateType }
 
 # Select the devices which have signal more than 10
 df.select("device").where("signal > 10")                              
@@ -653,11 +652,11 @@ Dataset<Row> windowedCounts = words.groupBy(
 </div>
 <div data-lang="python"  markdown="1">
 {% highlight python %}
-words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: 
String }
+words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: 
String }
 
 # Group the data by window and word and compute the count of each group
 windowedCounts = words.groupBy(
-    window(words.timestamp, '10 minutes', '5 minutes'),
+    window(words.timestamp, "10 minutes", "5 minutes"),
     words.word
 ).count()
 {% endhighlight %}
@@ -704,7 +703,7 @@ streamingDf.join(staticDf, "type", "right_join");  // right 
outer join with a st
 {% highlight python %}
 staticDf = spark.read. ...
 streamingDf = spark.readStream. ...
-streamingDf.join(staticDf, "type")         # inner equi-join with a static DF
+streamingDf.join(staticDf, "type")  # inner equi-join with a static DF
 streamingDf.join(staticDf, "type", "right_join")  # right outer join with a 
static DF
 {% endhighlight %}
 
@@ -907,25 +906,25 @@ spark.sql("select * from aggregates").show();   // 
interactively query in-memory
 noAggDF = deviceDataDf.select("device").where("signal > 10")   
 
 # Print new data to console
-noAggDF\
-    .writeStream()\
-    .format("console")\
+noAggDF \
+    .writeStream() \
+    .format("console") \
     .start()
 
 # Write new data to Parquet files
-noAggDF\
-    .writeStream()\
-    .parquet("path/to/destination/directory")\
+noAggDF \
+    .writeStream() \
+    .parquet("path/to/destination/directory") \
     .start()
    
 # ========== DF with aggregation ==========
 aggDF = df.groupBy("device").count()
 
 # Print updated aggregations to console
-aggDF\
-    .writeStream()\
-    .outputMode("complete")\
-    .format("console")\
+aggDF \
+    .writeStream() \
+    .outputMode("complete") \
+    .format("console") \
     .start()
 
 # Have all the aggregates in an in memory table. The query name will be the 
table name
@@ -1072,11 +1071,11 @@ spark.streams().awaitAnyTermination();   // block until 
any one of them terminat
 {% highlight python %}
 spark = ...  # spark session
 
-spark.streams().active    # get the list of currently active streaming queries
+spark.streams().active  # get the list of currently active streaming queries
 
-spark.streams().get(id)   # get a query object by its unique id
+spark.streams().get(id)  # get a query object by its unique id
 
-spark.streams().awaitAnyTermination()   # block until any one of them 
terminates
+spark.streams().awaitAnyTermination()  # block until any one of them terminates
 {% endhighlight %}
 
 </div>
@@ -1116,11 +1115,11 @@ aggDF
 <div data-lang="python"  markdown="1">
 
 {% highlight python %}
-aggDF\
-    .writeStream()\
-    .outputMode("complete")\
-    .option("checkpointLocation", "path/to/HDFS/dir")\
-    .format("memory")\
+aggDF \
+    .writeStream() \
+    .outputMode("complete") \
+    .option("checkpointLocation", "path/to/HDFS/dir") \
+    .format("memory") \
     .start()
 {% endhighlight %}
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to