Repository: spark
Updated Branches:
  refs/heads/master fbf1f342a -> 4e930420c


[SPARK-6799] [SPARKR] Remove SparkR RDD examples, add dataframe examples

This PR also makes some of the DataFrame to RDD methods private as the RDD 
class is private in 1.4

cc rxin pwendell

Author: Shivaram Venkataraman <[email protected]>

Closes #5949 from shivaram/sparkr-examples and squashes the following commits:

6c42fdc [Shivaram Venkataraman] Remove SparkR RDD examples, add dataframe 
examples


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e930420
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e930420
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e930420

Branch: refs/heads/master
Commit: 4e930420c19ae7773b138dfc7db8fc03b4660251
Parents: fbf1f34
Author: Shivaram Venkataraman <[email protected]>
Authored: Wed May 6 17:28:11 2015 -0700
Committer: Reynold Xin <[email protected]>
Committed: Wed May 6 17:28:11 2015 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                           |   4 -
 R/pkg/R/DataFrame.R                       |   2 +-
 examples/src/main/r/dataframe.R           |  54 +++++++++++++
 examples/src/main/r/kmeans.R              |  93 ---------------------
 examples/src/main/r/linear_solver_mnist.R | 107 -------------------------
 examples/src/main/r/logistic_regression.R |  62 --------------
 examples/src/main/r/pi.R                  |  46 -----------
 examples/src/main/r/wordcount.R           |  42 ----------
 8 files changed, 55 insertions(+), 355 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4e930420/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 528e660..3fb92be 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -45,8 +45,6 @@ exportMethods("cache",
               "showDF",
               "sortDF",
               "take",
-              "toJSON",
-              "toRDD",
               "unionAll",
               "unpersist",
               "where",
@@ -95,14 +93,12 @@ export("cacheTable",
        "createExternalTable",
        "dropTempTable",
        "jsonFile",
-       "jsonRDD",
        "loadDF",
        "parquetFile",
        "sql",
        "table",
        "tableNames",
        "tables",
-       "toDF",
        "uncacheTable")
 
 export("sparkRSQL.init",

http://git-wip-us.apache.org/repos/asf/spark/blob/4e930420/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 56c305d..47d92f1 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -272,7 +272,7 @@ setMethod("names",
 setMethod("registerTempTable",
           signature(x = "DataFrame", tableName = "character"),
           function(x, tableName) {
-              callJMethod(x@sdf, "registerTempTable", tableName)
+              invisible(callJMethod(x@sdf, "registerTempTable", tableName))
           })
 
 #' insertInto

http://git-wip-us.apache.org/repos/asf/spark/blob/4e930420/examples/src/main/r/dataframe.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/dataframe.R b/examples/src/main/r/dataframe.R
new file mode 100644
index 0000000..53b8171
--- /dev/null
+++ b/examples/src/main/r/dataframe.R
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(SparkR)
+
+# Initialize SparkContext and SQLContext
+sc <- sparkR.init(appName="SparkR-DataFrame-example")
+sqlContext <- sparkRSQL.init(sc)
+
+# Create a simple local data.frame
+localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))
+
+# Convert local data frame to a SparkR DataFrame
+df <- createDataFrame(sqlContext, localDF)
+
+# Print its schema
+printSchema(df)
+# root
+#  |-- name: string (nullable = true)
+#  |-- age: double (nullable = true)
+
+# Create a DataFrame from a JSON file
+path <- file.path(Sys.getenv("SPARK_HOME"), 
"examples/src/main/resources/people.json")
+peopleDF <- jsonFile(sqlContext, path)
+printSchema(peopleDF)
+
+# Register this DataFrame as a table.
+registerTempTable(peopleDF, "people")
+
+# SQL statements can be run by using the sql methods provided by sqlContext
+teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age 
<= 19")
+
+# Call collect to get a local data.frame
+teenagersLocalDF <- collect(teenagers)
+
+# Print the teenagers in our dataset 
+print(teenagersLocalDF)
+
+# Stop the SparkContext now
+sparkR.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/4e930420/examples/src/main/r/kmeans.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/kmeans.R b/examples/src/main/r/kmeans.R
deleted file mode 100644
index 6e6b5cb..0000000
--- a/examples/src/main/r/kmeans.R
+++ /dev/null
@@ -1,93 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(SparkR)
-
-# Logistic regression in Spark.
-# Note: unlike the example in Scala, a point here is represented as a vector of
-# doubles.
-
-parseVectors <-  function(lines) {
-  lines <- strsplit(as.character(lines) , " ", fixed = TRUE)
-  list(matrix(as.numeric(unlist(lines)), ncol = length(lines[[1]])))
-}
-
-dist.fun <- function(P, C) {
-  apply(
-    C,
-    1, 
-    function(x) { 
-      colSums((t(P) - x)^2)
-    }
-  )
-}
-
-closestPoint <-  function(P, C) {
-  max.col(-dist.fun(P, C))
-}
-# Main program
-
-args <- commandArgs(trailing = TRUE) 
-
-if (length(args) != 3) {
-  print("Usage: kmeans <file> <K> <convergeDist>")
-  q("no")
-}
-
-sc <- sparkR.init(appName = "RKMeans")
-K <- as.integer(args[[2]])
-convergeDist <- as.double(args[[3]])
-
-lines <- textFile(sc, args[[1]])
-points <- cache(lapplyPartition(lines, parseVectors))
-# kPoints <- take(points, K)
-kPoints <- do.call(rbind, takeSample(points, FALSE, K, 16189L))
-tempDist <- 1.0
-
-while (tempDist > convergeDist) {
-  closest <- lapplyPartition(
-    lapply(points,
-           function(p) {
-             cp <- closestPoint(p, kPoints); 
-             mapply(list, unique(cp), split.data.frame(cbind(1, p), cp), 
SIMPLIFY=FALSE)
-           }),
-    function(x) {do.call(c, x)
-    })
-  
-  pointStats <- reduceByKey(closest,
-                            function(p1, p2) {
-                              t(colSums(rbind(p1, p2)))
-                            },
-                            2L)
-  
-  newPoints <- do.call(
-    rbind,
-    collect(lapply(pointStats,
-                   function(tup) {
-                     point.sum <- tup[[2]][, -1]
-                     point.count <- tup[[2]][, 1]
-                     point.sum/point.count
-                   })))
-  
-  D <- dist.fun(kPoints, newPoints)
-  tempDist <- sum(D[cbind(1:3, max.col(-D))])
-  kPoints <- newPoints
-  cat("Finished iteration (delta = ", tempDist, ")\n")
-}
-
-cat("Final centers:\n")
-writeLines(unlist(lapply(kPoints, paste, collapse = " ")))

http://git-wip-us.apache.org/repos/asf/spark/blob/4e930420/examples/src/main/r/linear_solver_mnist.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/linear_solver_mnist.R 
b/examples/src/main/r/linear_solver_mnist.R
deleted file mode 100644
index c864a42..0000000
--- a/examples/src/main/r/linear_solver_mnist.R
+++ /dev/null
@@ -1,107 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Instructions: 
https://github.com/amplab-extras/SparkR-pkg/wiki/SparkR-Example:-Digit-Recognition-on-EC2
-
-library(SparkR)
-library(Matrix)
-
-args <- commandArgs(trailing = TRUE)
-
-# number of random features; default to 1100
-D <- ifelse(length(args) > 0, as.integer(args[[1]]), 1100)
-# number of partitions for training dataset
-trainParts <- 12
-# dimension of digits
-d <- 784
-# number of test examples
-NTrain <- 60000
-# number of training examples
-NTest <- 10000
-# scale of features
-gamma <- 4e-4
-
-sc <- sparkR.init(appName = "SparkR-LinearSolver")
-
-# You can also use HDFS path to speed things up:
-# hdfs://<master>/train-mnist-dense-with-labels.data
-file <- textFile(sc, "/data/train-mnist-dense-with-labels.data", trainParts)
-
-W <- gamma * matrix(nrow=D, ncol=d, data=rnorm(D*d))
-b <- 2 * pi * matrix(nrow=D, ncol=1, data=runif(D))
-broadcastW <- broadcast(sc, W)
-broadcastB <- broadcast(sc, b)
-
-includePackage(sc, Matrix)
-numericLines <- lapplyPartitionsWithIndex(file,
-                       function(split, part) {
-                         matList <- sapply(part, function(line) {
-                           as.numeric(strsplit(line, ",", fixed=TRUE)[[1]])
-                         }, simplify=FALSE)
-                         mat <- Matrix(ncol=d+1, data=unlist(matList, F, F),
-                                       sparse=T, byrow=T)
-                         mat
-                       })
-
-featureLabels <- cache(lapplyPartition(
-    numericLines,
-    function(part) {
-      label <- part[,1]
-      mat <- part[,-1]
-      ones <- rep(1, nrow(mat))
-      features <- cos(
-        mat %*% t(value(broadcastW)) + (matrix(ncol=1, data=ones) %*% 
t(value(broadcastB))))
-      onesMat <- Matrix(ones)
-      featuresPlus <- cBind(features, onesMat)
-      labels <- matrix(nrow=nrow(mat), ncol=10, data=-1)
-      for (i in 1:nrow(mat)) {
-        labels[i, label[i]] <- 1
-      }
-      list(label=labels, features=featuresPlus)
-  }))
-
-FTF <- Reduce("+", collect(lapplyPartition(featureLabels,
-    function(part) {
-      t(part$features) %*% part$features
-    }), flatten=F))
-
-FTY <- Reduce("+", collect(lapplyPartition(featureLabels,
-    function(part) {
-      t(part$features) %*% part$label
-    }), flatten=F))
-
-# solve for the coefficient matrix
-C <- solve(FTF, FTY)
-
-test <- Matrix(as.matrix(read.csv("/data/test-mnist-dense-with-labels.data",
-                         header=F), sparse=T))
-testData <- test[,-1]
-testLabels <- matrix(ncol=1, test[,1])
-
-err <- 0
-
-# contstruct the feature maps for all examples from this digit
-featuresTest <- cos(testData %*% t(value(broadcastW)) +
-    (matrix(ncol=1, data=rep(1, NTest)) %*% t(value(broadcastB))))
-featuresTest <- cBind(featuresTest, Matrix(rep(1, NTest)))
-
-# extract the one vs. all assignment
-results <- featuresTest %*% C
-labelsGot <- apply(results, 1, which.max)
-err <- sum(testLabels != labelsGot) / nrow(testLabels)
-
-cat("\nFinished running. The error rate is: ", err, ".\n")

http://git-wip-us.apache.org/repos/asf/spark/blob/4e930420/examples/src/main/r/logistic_regression.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/logistic_regression.R 
b/examples/src/main/r/logistic_regression.R
deleted file mode 100644
index 2a86aa9..0000000
--- a/examples/src/main/r/logistic_regression.R
+++ /dev/null
@@ -1,62 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(SparkR)
-
-args <- commandArgs(trailing = TRUE)
-
-if (length(args) != 3) {
-  print("Usage: logistic_regression <file> <iters> <dimension>")
-  q("no")
-}
-
-# Initialize Spark context
-sc <- sparkR.init(appName = "LogisticRegressionR")
-iterations <- as.integer(args[[2]])
-D <- as.integer(args[[3]])
-
-readPartition <- function(part){
-  part = strsplit(part, " ", fixed = T)
-  list(matrix(as.numeric(unlist(part)), ncol = length(part[[1]])))
-}
-
-# Read data points and convert each partition to a matrix
-points <- cache(lapplyPartition(textFile(sc, args[[1]]), readPartition))
-
-# Initialize w to a random value
-w <- runif(n=D, min = -1, max = 1)
-cat("Initial w: ", w, "\n")
-
-# Compute logistic regression gradient for a matrix of data points
-gradient <- function(partition) {
-  partition = partition[[1]]
-  Y <- partition[, 1]  # point labels (first column of input file)
-  X <- partition[, -1] # point coordinates
-
-  # For each point (x, y), compute gradient function
-  dot <- X %*% w
-  logit <- 1 / (1 + exp(-Y * dot))
-  grad <- t(X) %*% ((logit - 1) * Y)
-  list(grad)
-}
-
-for (i in 1:iterations) {
-  cat("On iteration ", i, "\n")
-  w <- w - reduce(lapplyPartition(points, gradient), "+")
-}
-
-cat("Final w: ", w, "\n")

http://git-wip-us.apache.org/repos/asf/spark/blob/4e930420/examples/src/main/r/pi.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/pi.R b/examples/src/main/r/pi.R
deleted file mode 100644
index aa7a833..0000000
--- a/examples/src/main/r/pi.R
+++ /dev/null
@@ -1,46 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(SparkR)
-
-args <- commandArgs(trailing = TRUE)
-
-sc <- sparkR.init(appName = "PiR")
-
-slices <- ifelse(length(args) > 1, as.integer(args[[2]]), 2)
-
-n <- 100000 * slices
-
-piFunc <- function(elem) {
-  rands <- runif(n = 2, min = -1, max = 1)
-  val <- ifelse((rands[1]^2 + rands[2]^2) < 1, 1.0, 0.0)
-  val
-}
-
-
-piFuncVec <- function(elems) {
-  message(length(elems))
-  rands1 <- runif(n = length(elems), min = -1, max = 1)
-  rands2 <- runif(n = length(elems), min = -1, max = 1)
-  val <- ifelse((rands1^2 + rands2^2) < 1, 1.0, 0.0)
-  sum(val)
-}
-
-rdd <- parallelize(sc, 1:n, slices)
-count <- reduce(lapplyPartition(rdd, piFuncVec), sum)
-cat("Pi is roughly", 4.0 * count / n, "\n")
-cat("Num elements in RDD ", count(rdd), "\n")

http://git-wip-us.apache.org/repos/asf/spark/blob/4e930420/examples/src/main/r/wordcount.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/wordcount.R b/examples/src/main/r/wordcount.R
deleted file mode 100644
index b734cb0..0000000
--- a/examples/src/main/r/wordcount.R
+++ /dev/null
@@ -1,42 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(SparkR)
-
-args <- commandArgs(trailing = TRUE)
-
-if (length(args) != 1) {
-  print("Usage: wordcount <file>")
-  q("no")
-}
-
-# Initialize Spark context
-sc <- sparkR.init(appName = "RwordCount")
-lines <- textFile(sc, args[[1]])
-
-words <- flatMap(lines,
-                 function(line) {
-                   strsplit(line, " ")[[1]]
-                 })
-wordCount <- lapply(words, function(word) { list(word, 1L) })
-
-counts <- reduceByKey(wordCount, "+", 2L)
-output <- collect(counts)
-
-for (wordcount in output) {
-  cat(wordcount[[1]], ": ", wordcount[[2]], "\n")
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to