http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/R/sparkR.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/R/sparkR.R b/sparkr-interpreter/src/main/resources/R/pkg/R/sparkR.R deleted file mode 100644 index c445d1b..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/R/sparkR.R +++ /dev/null @@ -1,360 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -.sparkREnv <- new.env() - -# Utility function that returns TRUE if we have an active connection to the -# backend and FALSE otherwise -connExists <- function(env) { - tryCatch({ - exists(".sparkRCon", envir = env) && isOpen(env[[".sparkRCon"]]) - }, - error = function(err) { - return(FALSE) - }) -} - -#' Stop the Spark context. -#' -#' Also terminates the backend this R session is connected to -sparkR.stop <- function() { - env <- .sparkREnv - if (exists(".sparkRCon", envir = env)) { - # cat("Stopping SparkR\n") - if (exists(".sparkRjsc", envir = env)) { - sc <- get(".sparkRjsc", envir = env) - callJMethod(sc, "stop") - rm(".sparkRjsc", envir = env) - } - - if (exists(".backendLaunched", envir = env)) { - callJStatic("SparkRHandler", "stopBackend") - } - - # Also close the connection and remove it from our env - conn <- get(".sparkRCon", envir = env) - close(conn) - - rm(".sparkRCon", envir = env) - rm(".scStartTime", envir = env) - } - - if (exists(".monitorConn", envir = env)) { - conn <- get(".monitorConn", envir = env) - close(conn) - rm(".monitorConn", envir = env) - } - - # Clear all broadcast variables we have - # as the jobj will not be valid if we restart the JVM - clearBroadcastVariables() - - # Clear jobj maps - clearJobjs() -} - -#' Initialize a new Spark Context. -#' -#' This function initializes a new SparkContext. -#' -#' @param master The Spark master URL. -#' @param appName Application name to register with cluster manager -#' @param sparkHome Spark Home directory -#' @param sparkEnvir Named list of environment variables to set on worker nodes. -#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors. -#' @param sparkJars Character string vector of jar files to pass to the worker nodes. -#' @param sparkPackages Character string vector of packages from spark-packages.org -#' @export -#' @examples -#'\dontrun{ -#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark") -#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark", -#' list(spark.executor.memory="1g")) -#' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark", -#' list(spark.executor.memory="1g"), -#' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"), -#' c("jarfile1.jar","jarfile2.jar")) -#'} - -sparkR.init <- function( - master = "", - appName = "SparkR", - sparkHome = Sys.getenv("SPARK_HOME"), - sparkEnvir = list(), - sparkExecutorEnv = list(), - sparkJars = "", - sparkPackages = "") { - - if (exists(".sparkRjsc", envir = .sparkREnv)) { - cat(paste("Re-using existing Spark Context.", - "Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n")) - return(get(".sparkRjsc", envir = .sparkREnv)) - } - - jars <- suppressWarnings(normalizePath(as.character(sparkJars))) - - # Classpath separator is ";" on Windows - # URI needs four /// as from http://stackoverflow.com/a/18522792 - if (.Platform$OS.type == "unix") { - uriSep <- "//" - } else { - uriSep <- "////" - } - - existingPort <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "") - if (existingPort != "") { - backendPort <- existingPort - } else { - path <- tempfile(pattern = "backend_port") - launchBackend( - args = path, - sparkHome = sparkHome, - jars = jars, - sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"), - packages = sparkPackages) - # wait atmost 100 seconds for JVM to launch - wait <- 0.1 - for (i in 1:25) { - Sys.sleep(wait) - if (file.exists(path)) { - break - } - wait <- wait * 1.25 - } - if (!file.exists(path)) { - stop("JVM is not ready after 10 seconds") - } - f <- file(path, open="rb") - backendPort <- readInt(f) - monitorPort <- readInt(f) - close(f) - file.remove(path) - if (length(backendPort) == 0 || backendPort == 0 || - length(monitorPort) == 0 || monitorPort == 0) { - stop("JVM failed to launch") - } - assign(".monitorConn", socketConnection(port = monitorPort), envir = .sparkREnv) - assign(".backendLaunched", 1, envir = .sparkREnv) - } - - .sparkREnv$backendPort <- backendPort - tryCatch({ - connectBackend("localhost", backendPort) - }, - error = function(err) { - stop("Failed to connect JVM\n") - }) - - if (nchar(sparkHome) != 0) { - sparkHome <- suppressWarnings(normalizePath(sparkHome)) - } - - sparkEnvirMap <- new.env() - for (varname in names(sparkEnvir)) { - sparkEnvirMap[[varname]] <- sparkEnvir[[varname]] - } - - sparkExecutorEnvMap <- new.env() - if (!any(names(sparkExecutorEnv) == "LD_LIBRARY_PATH")) { - sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <- - paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH")) - } - for (varname in names(sparkExecutorEnv)) { - sparkExecutorEnvMap[[varname]] <- sparkExecutorEnv[[varname]] - } - - nonEmptyJars <- Filter(function(x) { x != "" }, jars) - localJarPaths <- sapply(nonEmptyJars, - function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) }) - - # Set the start time to identify jobjs - # Seconds resolution is good enough for this purpose, so use ints - assign(".scStartTime", as.integer(Sys.time()), envir = .sparkREnv) - - assign( - ".sparkRjsc", - callJStatic( - "org.apache.spark.api.r.RRDD", - "createSparkContext", - master, - appName, - as.character(sparkHome), - as.list(localJarPaths), - sparkEnvirMap, - sparkExecutorEnvMap), - envir = .sparkREnv - ) - - sc <- get(".sparkRjsc", envir = .sparkREnv) - - # Register a finalizer to sleep 1 seconds on R exit to make RStudio happy - reg.finalizer(.sparkREnv, function(x) { Sys.sleep(1) }, onexit = TRUE) - - sc -} - -#' Connect to the R backend. -#' -#' This function establishes a connection with the R backend without creating -#' a new SparkContext. -#' -#' @export -#' @examples -#'\dontrun{ -#' sparkR.connect() - -sparkR.connect <- function() { - if (connExists(.sparkREnv)) { - print("Connection to SparkR backend has already been established!") - return() - } - - # Only allow connecting to an existing backend - existingPort <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "") - if (existingPort != "") { - backendPort <- existingPort - } else { - stop("No existing backend port found!") - } - - # Connect to the backend service - .sparkREnv$backendPort <- backendPort - tryCatch({ - connectBackend("localhost", backendPort) - }, error = function(err) { - stop("Failed to connect JVM: ", err) - }) - - # Set the start time to identify jobjs - # Seconds resolution is good enough for this purpose, so use ints - assign(".scStartTime", as.integer(Sys.time()), envir = .sparkREnv) - - # Register a finalizer to sleep 1 seconds on R exit to make RStudio happy - reg.finalizer(.sparkREnv, function(x) { Sys.sleep(1) }, onexit = TRUE) -} - -#' Initialize a new SQLContext. -#' -#' This function creates a SparkContext from an existing JavaSparkContext and -#' then uses it to initialize a new SQLContext -#' -#' @param jsc The existing JavaSparkContext created with SparkR.init() -#' @export -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' sqlContext <- sparkRSQL.init(sc) -#'} - -sparkRSQL.init <- function(jsc = NULL) { - if (exists(".sparkRSQLsc", envir = .sparkREnv)) { - return(get(".sparkRSQLsc", envir = .sparkREnv)) - } - - # If jsc is NULL, create a Spark Context - sc <- if (is.null(jsc)) { - sparkR.init() - } else { - jsc - } - - sqlContext <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", - "createSQLContext", - sc) - assign(".sparkRSQLsc", sqlContext, envir = .sparkREnv) - sqlContext -} - -#' Initialize a new HiveContext. -#' -#' This function creates a HiveContext from an existing JavaSparkContext -#' -#' @param jsc The existing JavaSparkContext created with SparkR.init() -#' @export -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' sqlContext <- sparkRHive.init(sc) -#'} - -sparkRHive.init <- function(jsc = NULL) { - if (exists(".sparkRHivesc", envir = .sparkREnv)) { - return(get(".sparkRHivesc", envir = .sparkREnv)) - } - - # If jsc is NULL, create a Spark Context - sc <- if (is.null(jsc)) { - sparkR.init() - } else { - jsc - } - - ssc <- callJMethod(sc, "sc") - hiveCtx <- tryCatch({ - newJObject("org.apache.spark.sql.hive.HiveContext", ssc) - }, - error = function(err) { - stop("Spark SQL is not built with Hive support") - }) - - assign(".sparkRHivesc", hiveCtx, envir = .sparkREnv) - hiveCtx -} - -#' Assigns a group ID to all the jobs started by this thread until the group ID is set to a -#' different value or cleared. -#' -#' @param sc existing spark context -#' @param groupid the ID to be assigned to job groups -#' @param description description for the the job group ID -#' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' setJobGroup(sc, "myJobGroup", "My job group description", TRUE) -#'} - -setJobGroup <- function(sc, groupId, description, interruptOnCancel) { - callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel) -} - -#' Clear current job group ID and its description -#' -#' @param sc existing spark context -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' clearJobGroup(sc) -#'} - -clearJobGroup <- function(sc) { - callJMethod(sc, "clearJobGroup") -} - -#' Cancel active jobs for the specified group -#' -#' @param sc existing spark context -#' @param groupId the ID of job group to be cancelled -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' cancelJobGroup(sc, "myJobGroup") -#'} - -cancelJobGroup <- function(sc, groupId) { - callJMethod(sc, "cancelJobGroup", groupId) -}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/R/utils.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/R/utils.R b/sparkr-interpreter/src/main/resources/R/pkg/R/utils.R deleted file mode 100644 index 3babcb5..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/R/utils.R +++ /dev/null @@ -1,600 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Utilities and Helpers - -# Given a JList<T>, returns an R list containing the same elements, the number -# of which is optionally upper bounded by `logicalUpperBound` (by default, -# return all elements). Takes care of deserializations and type conversions. -convertJListToRList <- function(jList, flatten, logicalUpperBound = NULL, - serializedMode = "byte") { - arrSize <- callJMethod(jList, "size") - - # Datasets with serializedMode == "string" (such as an RDD directly generated by textFile()): - # each partition is not dense-packed into one Array[Byte], and `arrSize` - # here corresponds to number of logical elements. Thus we can prune here. - if (serializedMode == "string" && !is.null(logicalUpperBound)) { - arrSize <- min(arrSize, logicalUpperBound) - } - - results <- if (arrSize > 0) { - lapply(0 : (arrSize - 1), - function(index) { - obj <- callJMethod(jList, "get", as.integer(index)) - - # Assume it is either an R object or a Java obj ref. - if (inherits(obj, "jobj")) { - if (isInstanceOf(obj, "scala.Tuple2")) { - # JavaPairRDD[Array[Byte], Array[Byte]]. - - keyBytes <- callJMethod(obj, "_1") - valBytes <- callJMethod(obj, "_2") - res <- list(unserialize(keyBytes), - unserialize(valBytes)) - } else { - stop(paste("utils.R: convertJListToRList only supports", - "RDD[Array[Byte]] and", - "JavaPairRDD[Array[Byte], Array[Byte]] for now")) - } - } else { - if (inherits(obj, "raw")) { - if (serializedMode == "byte") { - # RDD[Array[Byte]]. `obj` is a whole partition. - res <- unserialize(obj) - # For serialized datasets, `obj` (and `rRaw`) here corresponds to - # one whole partition dense-packed together. We deserialize the - # whole partition first, then cap the number of elements to be returned. - } else if (serializedMode == "row") { - res <- readRowList(obj) - # For DataFrames that have been converted to RRDDs, we call readRowList - # which will read in each row of the RRDD as a list and deserialize - # each element. - flatten <<- FALSE - # Use global assignment to change the flatten flag. This means - # we don't have to worry about the default argument in other functions - # e.g. collect - } - # TODO: is it possible to distinguish element boundary so that we can - # unserialize only what we need? - if (!is.null(logicalUpperBound)) { - res <- head(res, n = logicalUpperBound) - } - } else { - # obj is of a primitive Java type, is simplified to R's - # corresponding type. - res <- list(obj) - } - } - res - }) - } else { - list() - } - - if (flatten) { - as.list(unlist(results, recursive = FALSE)) - } else { - as.list(results) - } -} - -# Returns TRUE if `name` refers to an RDD in the given environment `env` -isRDD <- function(name, env) { - obj <- get(name, envir = env) - inherits(obj, "RDD") -} - -#' Compute the hashCode of an object -#' -#' Java-style function to compute the hashCode for the given object. Returns -#' an integer value. -#' -#' @details -#' This only works for integer, numeric and character types right now. -#' -#' @param key the object to be hashed -#' @return the hash code as an integer -#' @export -#' @examples -#' hashCode(1L) # 1 -#' hashCode(1.0) # 1072693248 -#' hashCode("1") # 49 -hashCode <- function(key) { - if (class(key) == "integer") { - as.integer(key[[1]]) - } else if (class(key) == "numeric") { - # Convert the double to long and then calculate the hash code - rawVec <- writeBin(key[[1]], con = raw()) - intBits <- packBits(rawToBits(rawVec), "integer") - as.integer(bitwXor(intBits[2], intBits[1])) - } else if (class(key) == "character") { - # TODO: SPARK-7839 means we might not have the native library available - if (is.loaded("stringHashCode")) { - .Call("stringHashCode", key) - } else { - n <- nchar(key) - if (n == 0) { - 0L - } else { - asciiVals <- sapply(charToRaw(key), function(x) { strtoi(x, 16L) }) - hashC <- 0 - for (k in 1:length(asciiVals)) { - hashC <- mult31AndAdd(hashC, asciiVals[k]) - } - as.integer(hashC) - } - } - } else { - warning(paste("Could not hash object, returning 0", sep = "")) - as.integer(0) - } -} - -# Helper function used to wrap a 'numeric' value to integer bounds. -# Useful for implementing C-like integer arithmetic -wrapInt <- function(value) { - if (value > .Machine$integer.max) { - value <- value - 2 * .Machine$integer.max - 2 - } else if (value < -1 * .Machine$integer.max) { - value <- 2 * .Machine$integer.max + value + 2 - } - value -} - -# Multiply `val` by 31 and add `addVal` to the result. Ensures that -# integer-overflows are handled at every step. -mult31AndAdd <- function(val, addVal) { - vec <- c(bitwShiftL(val, c(4,3,2,1,0)), addVal) - Reduce(function(a, b) { - wrapInt(as.numeric(a) + as.numeric(b)) - }, - vec) -} - -# Create a new RDD with serializedMode == "byte". -# Return itself if already in "byte" format. -serializeToBytes <- function(rdd) { - if (!inherits(rdd, "RDD")) { - stop("Argument 'rdd' is not an RDD type.") - } - if (getSerializedMode(rdd) != "byte") { - ser.rdd <- lapply(rdd, function(x) { x }) - return(ser.rdd) - } else { - return(rdd) - } -} - -# Create a new RDD with serializedMode == "string". -# Return itself if already in "string" format. -serializeToString <- function(rdd) { - if (!inherits(rdd, "RDD")) { - stop("Argument 'rdd' is not an RDD type.") - } - if (getSerializedMode(rdd) != "string") { - ser.rdd <- lapply(rdd, function(x) { toString(x) }) - # force it to create jrdd using "string" - getJRDD(ser.rdd, serializedMode = "string") - return(ser.rdd) - } else { - return(rdd) - } -} - -# Fast append to list by using an accumulator. -# http://stackoverflow.com/questions/17046336/here-we-go-again-append-an-element-to-a-list-in-r -# -# The accumulator should has three fields size, counter and data. -# This function amortizes the allocation cost by doubling -# the size of the list every time it fills up. -addItemToAccumulator <- function(acc, item) { - if(acc$counter == acc$size) { - acc$size <- acc$size * 2 - length(acc$data) <- acc$size - } - acc$counter <- acc$counter + 1 - acc$data[[acc$counter]] <- item -} - -initAccumulator <- function() { - acc <- new.env() - acc$counter <- 0 - acc$data <- list(NULL) - acc$size <- 1 - acc -} - -# Utility function to sort a list of key value pairs -# Used in unit tests -sortKeyValueList <- function(kv_list, decreasing = FALSE) { - keys <- sapply(kv_list, function(x) x[[1]]) - kv_list[order(keys, decreasing = decreasing)] -} - -# Utility function to generate compact R lists from grouped rdd -# Used in Join-family functions -# param: -# tagged_list R list generated via groupByKey with tags(1L, 2L, ...) -# cnull Boolean list where each element determines whether the corresponding list should -# be converted to list(NULL) -genCompactLists <- function(tagged_list, cnull) { - len <- length(tagged_list) - lists <- list(vector("list", len), vector("list", len)) - index <- list(1, 1) - - for (x in tagged_list) { - tag <- x[[1]] - idx <- index[[tag]] - lists[[tag]][[idx]] <- x[[2]] - index[[tag]] <- idx + 1 - } - - len <- lapply(index, function(x) x - 1) - for (i in (1:2)) { - if (cnull[[i]] && len[[i]] == 0) { - lists[[i]] <- list(NULL) - } else { - length(lists[[i]]) <- len[[i]] - } - } - - lists -} - -# Utility function to merge compact R lists -# Used in Join-family functions -# param: -# left/right Two compact lists ready for Cartesian product -mergeCompactLists <- function(left, right) { - result <- list() - length(result) <- length(left) * length(right) - index <- 1 - for (i in left) { - for (j in right) { - result[[index]] <- list(i, j) - index <- index + 1 - } - } - result -} - -# Utility function to wrapper above two operations -# Used in Join-family functions -# param (same as genCompactLists): -# tagged_list R list generated via groupByKey with tags(1L, 2L, ...) -# cnull Boolean list where each element determines whether the corresponding list should -# be converted to list(NULL) -joinTaggedList <- function(tagged_list, cnull) { - lists <- genCompactLists(tagged_list, cnull) - mergeCompactLists(lists[[1]], lists[[2]]) -} - -# Utility function to reduce a key-value list with predicate -# Used in *ByKey functions -# param -# pair key-value pair -# keys/vals env of key/value with hashes -# updateOrCreatePred predicate function -# updateFn update or merge function for existing pair, similar with `mergeVal` @combineByKey -# createFn create function for new pair, similar with `createCombiner` @combinebykey -updateOrCreatePair <- function(pair, keys, vals, updateOrCreatePred, updateFn, createFn) { - # assume hashVal bind to `$hash`, key/val with index 1/2 - hashVal <- pair$hash - key <- pair[[1]] - val <- pair[[2]] - if (updateOrCreatePred(pair)) { - assign(hashVal, do.call(updateFn, list(get(hashVal, envir = vals), val)), envir = vals) - } else { - assign(hashVal, do.call(createFn, list(val)), envir = vals) - assign(hashVal, key, envir = keys) - } -} - -# Utility function to convert key&values envs into key-val list -convertEnvsToList <- function(keys, vals) { - lapply(ls(keys), - function(name) { - list(keys[[name]], vals[[name]]) - }) -} - -# Utility function to capture the varargs into environment object -varargsToEnv <- function(...) { - # Based on http://stackoverflow.com/a/3057419/4577954 - pairs <- list(...) - env <- new.env() - for (name in names(pairs)) { - env[[name]] <- pairs[[name]] - } - env -} - -getStorageLevel <- function(newLevel = c("DISK_ONLY", - "DISK_ONLY_2", - "MEMORY_AND_DISK", - "MEMORY_AND_DISK_2", - "MEMORY_AND_DISK_SER", - "MEMORY_AND_DISK_SER_2", - "MEMORY_ONLY", - "MEMORY_ONLY_2", - "MEMORY_ONLY_SER", - "MEMORY_ONLY_SER_2", - "OFF_HEAP")) { - match.arg(newLevel) - storageLevelClass <- "org.apache.spark.storage.StorageLevel" - storageLevel <- switch(newLevel, - "DISK_ONLY" = callJStatic(storageLevelClass, "DISK_ONLY"), - "DISK_ONLY_2" = callJStatic(storageLevelClass, "DISK_ONLY_2"), - "MEMORY_AND_DISK" = callJStatic(storageLevelClass, "MEMORY_AND_DISK"), - "MEMORY_AND_DISK_2" = callJStatic(storageLevelClass, "MEMORY_AND_DISK_2"), - "MEMORY_AND_DISK_SER" = callJStatic(storageLevelClass, - "MEMORY_AND_DISK_SER"), - "MEMORY_AND_DISK_SER_2" = callJStatic(storageLevelClass, - "MEMORY_AND_DISK_SER_2"), - "MEMORY_ONLY" = callJStatic(storageLevelClass, "MEMORY_ONLY"), - "MEMORY_ONLY_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_2"), - "MEMORY_ONLY_SER" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER"), - "MEMORY_ONLY_SER_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER_2"), - "OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP")) -} - -# Utility function for functions where an argument needs to be integer but we want to allow -# the user to type (for example) `5` instead of `5L` to avoid a confusing error message. -numToInt <- function(num) { - if (as.integer(num) != num) { - warning(paste("Coercing", as.list(sys.call())[[2]], "to integer.")) - } - as.integer(num) -} - -# create a Seq in JVM -toSeq <- function(...) { - callJStatic("org.apache.spark.sql.api.r.SQLUtils", "toSeq", list(...)) -} - -# create a Seq in JVM from a list -listToSeq <- function(l) { - callJStatic("org.apache.spark.sql.api.r.SQLUtils", "toSeq", l) -} - -# Utility function to recursively traverse the Abstract Syntax Tree (AST) of a -# user defined function (UDF), and to examine variables in the UDF to decide -# if their values should be included in the new function environment. -# param -# node The current AST node in the traversal. -# oldEnv The original function environment. -# defVars An Accumulator of variables names defined in the function's calling environment, -# including function argument and local variable names. -# checkedFunc An environment of function objects examined during cleanClosure. It can -# be considered as a "name"-to-"list of functions" mapping. -# newEnv A new function environment to store necessary function dependencies, an output argument. -processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) { - nodeLen <- length(node) - - if (nodeLen > 1 && typeof(node) == "language") { - # Recursive case: current AST node is an internal node, check for its children. - if (length(node[[1]]) > 1) { - for (i in 1:nodeLen) { - processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv) - } - } else { - # if node[[1]] is length of 1, check for some R special functions. - nodeChar <- as.character(node[[1]]) - if (nodeChar == "{" || nodeChar == "(") { - # Skip start symbol. - for (i in 2:nodeLen) { - processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv) - } - } else if (nodeChar == "<-" || nodeChar == "=" || - nodeChar == "<<-") { - # Assignment Ops. - defVar <- node[[2]] - if (length(defVar) == 1 && typeof(defVar) == "symbol") { - # Add the defined variable name into defVars. - addItemToAccumulator(defVars, as.character(defVar)) - } else { - processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv) - } - for (i in 3:nodeLen) { - processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv) - } - } else if (nodeChar == "function") { - # Function definition. - # Add parameter names. - newArgs <- names(node[[2]]) - lapply(newArgs, function(arg) { addItemToAccumulator(defVars, arg) }) - for (i in 3:nodeLen) { - processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv) - } - } else if (nodeChar == "$") { - # Skip the field. - processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv) - } else if (nodeChar == "::" || nodeChar == ":::") { - processClosure(node[[3]], oldEnv, defVars, checkedFuncs, newEnv) - } else { - for (i in 1:nodeLen) { - processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv) - } - } - } - } else if (nodeLen == 1 && - (typeof(node) == "symbol" || typeof(node) == "language")) { - # Base case: current AST node is a leaf node and a symbol or a function call. - nodeChar <- as.character(node) - if (!nodeChar %in% defVars$data) { - # Not a function parameter or local variable. - func.env <- oldEnv - topEnv <- parent.env(.GlobalEnv) - # Search in function environment, and function's enclosing environments - # up to global environment. There is no need to look into package environments - # above the global or namespace environment that is not SparkR below the global, - # as they are assumed to be loaded on workers. - while (!identical(func.env, topEnv)) { - # Namespaces other than "SparkR" will not be searched. - if (!isNamespace(func.env) || - (getNamespaceName(func.env) == "SparkR" && - !(nodeChar %in% getNamespaceExports("SparkR")))) { - # Only include SparkR internals. - - # Set parameter 'inherits' to FALSE since we do not need to search in - # attached package environments. - if (tryCatch(exists(nodeChar, envir = func.env, inherits = FALSE), - error = function(e) { FALSE })) { - obj <- get(nodeChar, envir = func.env, inherits = FALSE) - if (is.function(obj)) { - # If the node is a function call. - funcList <- mget(nodeChar, envir = checkedFuncs, inherits = F, - ifnotfound = list(list(NULL)))[[1]] - found <- sapply(funcList, function(func) { - ifelse(identical(func, obj), TRUE, FALSE) - }) - if (sum(found) > 0) { - # If function has been examined, ignore. - break - } - # Function has not been examined, record it and recursively clean its closure. - assign(nodeChar, - if (is.null(funcList[[1]])) { - list(obj) - } else { - append(funcList, obj) - }, - envir = checkedFuncs) - obj <- cleanClosure(obj, checkedFuncs) - } - assign(nodeChar, obj, envir = newEnv) - break - } - } - - # Continue to search in enclosure. - func.env <- parent.env(func.env) - } - } - } -} - -# Utility function to get user defined function (UDF) dependencies (closure). -# More specifically, this function captures the values of free variables defined -# outside a UDF, and stores them in the function's environment. -# param -# func A function whose closure needs to be captured. -# checkedFunc An environment of function objects examined during cleanClosure. It can be -# considered as a "name"-to-"list of functions" mapping. -# return value -# a new version of func that has an correct environment (closure). -cleanClosure <- function(func, checkedFuncs = new.env()) { - if (is.function(func)) { - newEnv <- new.env(parent = .GlobalEnv) - func.body <- body(func) - oldEnv <- environment(func) - # defVars is an Accumulator of variables names defined in the function's calling - # environment. First, function's arguments are added to defVars. - defVars <- initAccumulator() - argNames <- names(as.list(args(func))) - for (i in 1:(length(argNames) - 1)) { - # Remove the ending NULL in pairlist. - addItemToAccumulator(defVars, argNames[i]) - } - # Recursively examine variables in the function body. - processClosure(func.body, oldEnv, defVars, checkedFuncs, newEnv) - environment(func) <- newEnv - } - func -} - -# Append partition lengths to each partition in two input RDDs if needed. -# param -# x An RDD. -# Other An RDD. -# return value -# A list of two result RDDs. -appendPartitionLengths <- function(x, other) { - if (getSerializedMode(x) != getSerializedMode(other) || - getSerializedMode(x) == "byte") { - # Append the number of elements in each partition to that partition so that we can later - # know the boundary of elements from x and other. - # - # Note that this appending also serves the purpose of reserialization, because even if - # any RDD is serialized, we need to reserialize it to make sure its partitions are encoded - # as a single byte array. For example, partitions of an RDD generated from partitionBy() - # may be encoded as multiple byte arrays. - appendLength <- function(part) { - len <- length(part) - part[[len + 1]] <- len + 1 - part - } - x <- lapplyPartition(x, appendLength) - other <- lapplyPartition(other, appendLength) - } - list (x, other) -} - -# Perform zip or cartesian between elements from two RDDs in each partition -# param -# rdd An RDD. -# zip A boolean flag indicating this call is for zip operation or not. -# return value -# A result RDD. -mergePartitions <- function(rdd, zip) { - serializerMode <- getSerializedMode(rdd) - partitionFunc <- function(partIndex, part) { - len <- length(part) - if (len > 0) { - if (serializerMode == "byte") { - lengthOfValues <- part[[len]] - lengthOfKeys <- part[[len - lengthOfValues]] - stopifnot(len == lengthOfKeys + lengthOfValues) - - # For zip operation, check if corresponding partitions - # of both RDDs have the same number of elements. - if (zip && lengthOfKeys != lengthOfValues) { - stop(paste("Can only zip RDDs with same number of elements", - "in each pair of corresponding partitions.")) - } - - if (lengthOfKeys > 1) { - keys <- part[1 : (lengthOfKeys - 1)] - } else { - keys <- list() - } - if (lengthOfValues > 1) { - values <- part[ (lengthOfKeys + 1) : (len - 1) ] - } else { - values <- list() - } - - if (!zip) { - return(mergeCompactLists(keys, values)) - } - } else { - keys <- part[c(TRUE, FALSE)] - values <- part[c(FALSE, TRUE)] - } - mapply( - function(k, v) { list(k, v) }, - keys, - values, - SIMPLIFY = FALSE, - USE.NAMES = FALSE) - } else { - part - } - } - - PipelinedRDD(rdd, partitionFunc) -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/profile/general.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/profile/general.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/profile/general.R deleted file mode 100644 index 2a8a821..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/inst/profile/general.R +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -.First <- function() { - packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR") - .libPaths(c(packageDir, .libPaths())) - Sys.setenv(NOAWT=1) -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/profile/shell.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/profile/shell.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/profile/shell.R deleted file mode 100644 index 7189f1a..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/inst/profile/shell.R +++ /dev/null @@ -1,47 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -.First <- function() { - home <- Sys.getenv("SPARK_HOME") - .libPaths(c(file.path(home, "R", "lib"), .libPaths())) - Sys.setenv(NOAWT=1) - - # Make sure SparkR package is the last loaded one - old <- getOption("defaultPackages") - options(defaultPackages = c(old, "SparkR")) - - sc <- SparkR::sparkR.init() - assign("sc", sc, envir=.GlobalEnv) - sqlContext <- SparkR::sparkRSQL.init(sc) - sparkVer <- SparkR:::callJMethod(sc, "version") - assign("sqlContext", sqlContext, envir=.GlobalEnv) - cat("\n Welcome to") - cat("\n") - cat(" ____ __", "\n") - cat(" / __/__ ___ _____/ /__", "\n") - cat(" _\\ \\/ _ \\/ _ `/ __/ '_/", "\n") - cat(" /___/ .__/\\_,_/_/ /_/\\_\\") - if (nchar(sparkVer) == 0) { - cat("\n") - } else { - cat(" version ", sparkVer, "\n") - } - cat(" /_/", "\n") - cat("\n") - - cat("\n Spark context is available as sc, SQL context is available as sqlContext\n") -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/test_support/sparktestjar_2.10-1.0.jar ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/test_support/sparktestjar_2.10-1.0.jar b/sparkr-interpreter/src/main/resources/R/pkg/inst/test_support/sparktestjar_2.10-1.0.jar deleted file mode 100644 index 1d5c2af..0000000 Binary files a/sparkr-interpreter/src/main/resources/R/pkg/inst/test_support/sparktestjar_2.10-1.0.jar and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/jarTest.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/jarTest.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/jarTest.R deleted file mode 100644 index d68bb20..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/jarTest.R +++ /dev/null @@ -1,32 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -library(SparkR) - -sc <- sparkR.init() - -helloTest <- SparkR:::callJStatic("sparkR.test.hello", - "helloWorld", - "Dave") - -basicFunction <- SparkR:::callJStatic("sparkR.test.basicFunction", - "addStuff", - 2L, - 2L) - -sparkR.stop() -output <- c(helloTest, basicFunction) -writeLines(output) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/packageInAJarTest.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/packageInAJarTest.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/packageInAJarTest.R deleted file mode 100644 index 207a37a..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/packageInAJarTest.R +++ /dev/null @@ -1,30 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -library(SparkR) -library(sparkPackageTest) - -sc <- sparkR.init() - -run1 <- myfunc(5L) - -run2 <- myfunc(-4L) - -sparkR.stop() - -if(run1 != 6) quit(save = "no", status = 1) - -if(run2 != -3) quit(save = "no", status = 1) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_binaryFile.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_binaryFile.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_binaryFile.R deleted file mode 100644 index f2452ed..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_binaryFile.R +++ /dev/null @@ -1,89 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -context("functions on binary files") - -# JavaSparkContext handle -sc <- sparkR.init() - -mockFile <- c("Spark is pretty.", "Spark is awesome.") - -test_that("saveAsObjectFile()/objectFile() following textFile() works", { - fileName1 <- tempfile(pattern="spark-test", fileext=".tmp") - fileName2 <- tempfile(pattern="spark-test", fileext=".tmp") - writeLines(mockFile, fileName1) - - rdd <- textFile(sc, fileName1, 1) - saveAsObjectFile(rdd, fileName2) - rdd <- objectFile(sc, fileName2) - expect_equal(collect(rdd), as.list(mockFile)) - - unlink(fileName1) - unlink(fileName2, recursive = TRUE) -}) - -test_that("saveAsObjectFile()/objectFile() works on a parallelized list", { - fileName <- tempfile(pattern="spark-test", fileext=".tmp") - - l <- list(1, 2, 3) - rdd <- parallelize(sc, l, 1) - saveAsObjectFile(rdd, fileName) - rdd <- objectFile(sc, fileName) - expect_equal(collect(rdd), l) - - unlink(fileName, recursive = TRUE) -}) - -test_that("saveAsObjectFile()/objectFile() following RDD transformations works", { - fileName1 <- tempfile(pattern="spark-test", fileext=".tmp") - fileName2 <- tempfile(pattern="spark-test", fileext=".tmp") - writeLines(mockFile, fileName1) - - rdd <- textFile(sc, fileName1) - - words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] }) - wordCount <- lapply(words, function(word) { list(word, 1L) }) - - counts <- reduceByKey(wordCount, "+", 2L) - - saveAsObjectFile(counts, fileName2) - counts <- objectFile(sc, fileName2) - - output <- collect(counts) - expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1), - list("is", 2)) - expect_equal(sortKeyValueList(output), sortKeyValueList(expected)) - - unlink(fileName1) - unlink(fileName2, recursive = TRUE) -}) - -test_that("saveAsObjectFile()/objectFile() works with multiple paths", { - fileName1 <- tempfile(pattern="spark-test", fileext=".tmp") - fileName2 <- tempfile(pattern="spark-test", fileext=".tmp") - - rdd1 <- parallelize(sc, "Spark is pretty.") - saveAsObjectFile(rdd1, fileName1) - rdd2 <- parallelize(sc, "Spark is awesome.") - saveAsObjectFile(rdd2, fileName2) - - rdd <- objectFile(sc, c(fileName1, fileName2)) - expect_equal(count(rdd), 2) - - unlink(fileName1, recursive = TRUE) - unlink(fileName2, recursive = TRUE) -}) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_binary_function.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_binary_function.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_binary_function.R deleted file mode 100644 index f054ac9..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_binary_function.R +++ /dev/null @@ -1,101 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -context("binary functions") - -# JavaSparkContext handle -sc <- sparkR.init() - -# Data -nums <- 1:10 -rdd <- parallelize(sc, nums, 2L) - -# File content -mockFile <- c("Spark is pretty.", "Spark is awesome.") - -test_that("union on two RDDs", { - actual <- collect(unionRDD(rdd, rdd)) - expect_equal(actual, as.list(rep(nums, 2))) - - fileName <- tempfile(pattern="spark-test", fileext=".tmp") - writeLines(mockFile, fileName) - - text.rdd <- textFile(sc, fileName) - union.rdd <- unionRDD(rdd, text.rdd) - actual <- collect(union.rdd) - expect_equal(actual, c(as.list(nums), mockFile)) - expect_equal(getSerializedMode(union.rdd), "byte") - - rdd <- map(text.rdd, function(x) {x}) - union.rdd <- unionRDD(rdd, text.rdd) - actual <- collect(union.rdd) - expect_equal(actual, as.list(c(mockFile, mockFile))) - expect_equal(getSerializedMode(union.rdd), "byte") - - unlink(fileName) -}) - -test_that("cogroup on two RDDs", { - rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) - rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) - cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L) - actual <- collect(cogroup.rdd) - expect_equal(actual, - list(list(1, list(list(1), list(2, 3))), list(2, list(list(4), list())))) - - rdd1 <- parallelize(sc, list(list("a", 1), list("a", 4))) - rdd2 <- parallelize(sc, list(list("b", 2), list("a", 3))) - cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L) - actual <- collect(cogroup.rdd) - - expected <- list(list("b", list(list(), list(2))), list("a", list(list(1, 4), list(3)))) - expect_equal(sortKeyValueList(actual), - sortKeyValueList(expected)) -}) - -test_that("zipPartitions() on RDDs", { - rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2 - rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4 - rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6 - actual <- collect(zipPartitions(rdd1, rdd2, rdd3, - func = function(x, y, z) { list(list(x, y, z))} )) - expect_equal(actual, - list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))) - - mockFile <- c("Spark is pretty.", "Spark is awesome.") - fileName <- tempfile(pattern="spark-test", fileext=".tmp") - writeLines(mockFile, fileName) - - rdd <- textFile(sc, fileName, 1) - actual <- collect(zipPartitions(rdd, rdd, - func = function(x, y) { list(paste(x, y, sep = "\n")) })) - expected <- list(paste(mockFile, mockFile, sep = "\n")) - expect_equal(actual, expected) - - rdd1 <- parallelize(sc, 0:1, 1) - actual <- collect(zipPartitions(rdd1, rdd, - func = function(x, y) { list(x + nchar(y)) })) - expected <- list(0:1 + nchar(mockFile)) - expect_equal(actual, expected) - - rdd <- map(rdd, function(x) { x }) - actual <- collect(zipPartitions(rdd, rdd1, - func = function(x, y) { list(y + nchar(x)) })) - expect_equal(actual, expected) - - unlink(fileName) -}) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_broadcast.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_broadcast.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_broadcast.R deleted file mode 100644 index bb86a5c..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_broadcast.R +++ /dev/null @@ -1,48 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -context("broadcast variables") - -# JavaSparkContext handle -sc <- sparkR.init() - -# Partitioned data -nums <- 1:2 -rrdd <- parallelize(sc, nums, 2L) - -test_that("using broadcast variable", { - randomMat <- matrix(nrow=10, ncol=10, data=rnorm(100)) - randomMatBr <- broadcast(sc, randomMat) - - useBroadcast <- function(x) { - sum(SparkR:::value(randomMatBr) * x) - } - actual <- collect(lapply(rrdd, useBroadcast)) - expected <- list(sum(randomMat) * 1, sum(randomMat) * 2) - expect_equal(actual, expected) -}) - -test_that("without using broadcast variable", { - randomMat <- matrix(nrow=10, ncol=10, data=rnorm(100)) - - useBroadcast <- function(x) { - sum(randomMat * x) - } - actual <- collect(lapply(rrdd, useBroadcast)) - expected <- list(sum(randomMat) * 1, sum(randomMat) * 2) - expect_equal(actual, expected) -}) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_client.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_client.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_client.R deleted file mode 100644 index 8a20991..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_client.R +++ /dev/null @@ -1,36 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -context("functions in client.R") - -test_that("adding spark-testing-base as a package works", { - args <- generateSparkSubmitArgs("", "", "", "", - "holdenk:spark-testing-base:1.3.0_0.0.5") - expect_equal(gsub("[[:space:]]", "", args), - gsub("[[:space:]]", "", - "--packages holdenk:spark-testing-base:1.3.0_0.0.5")) -}) - -test_that("no package specified doesn't add packages flag", { - args <- generateSparkSubmitArgs("", "", "", "", "") - expect_equal(gsub("[[:space:]]", "", args), - "") -}) - -test_that("multiple packages don't produce a warning", { - expect_that(generateSparkSubmitArgs("", "", "", "", c("A", "B")), not(gives_warning())) -}) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_context.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_context.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_context.R deleted file mode 100644 index 513bbc8..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_context.R +++ /dev/null @@ -1,57 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -context("test functions in sparkR.R") - -test_that("repeatedly starting and stopping SparkR", { - for (i in 1:4) { - sc <- sparkR.init() - rdd <- parallelize(sc, 1:20, 2L) - expect_equal(count(rdd), 20) - sparkR.stop() - } -}) - -test_that("rdd GC across sparkR.stop", { - sparkR.stop() - sc <- sparkR.init() # sc should get id 0 - rdd1 <- parallelize(sc, 1:20, 2L) # rdd1 should get id 1 - rdd2 <- parallelize(sc, 1:10, 2L) # rdd2 should get id 2 - sparkR.stop() - - sc <- sparkR.init() # sc should get id 0 again - - # GC rdd1 before creating rdd3 and rdd2 after - rm(rdd1) - gc() - - rdd3 <- parallelize(sc, 1:20, 2L) # rdd3 should get id 1 now - rdd4 <- parallelize(sc, 1:10, 2L) # rdd4 should get id 2 now - - rm(rdd2) - gc() - - count(rdd3) - count(rdd4) -}) - -test_that("job group functions can be called", { - sc <- sparkR.init() - setJobGroup(sc, "groupId", "job description", TRUE) - cancelJobGroup(sc, "groupId") - clearJobGroup(sc) -}) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_includeJAR.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_includeJAR.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_includeJAR.R deleted file mode 100644 index cc1faea..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_includeJAR.R +++ /dev/null @@ -1,37 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -context("include an external JAR in SparkContext") - -runScript <- function() { - sparkHome <- Sys.getenv("SPARK_HOME") - sparkTestJarPath <- "R/lib/SparkR/test_support/sparktestjar_2.10-1.0.jar" - jarPath <- paste("--jars", shQuote(file.path(sparkHome, sparkTestJarPath))) - scriptPath <- file.path(sparkHome, "R/lib/SparkR/tests/jarTest.R") - submitPath <- file.path(sparkHome, "bin/spark-submit") - res <- system2(command = submitPath, - args = c(jarPath, scriptPath), - stdout = TRUE) - tail(res, 2) -} - -test_that("sparkJars tag in SparkContext", { - testOutput <- runScript() - helloTest <- testOutput[1] - expect_equal(helloTest, "Hello, Dave") - basicFunction <- testOutput[2] - expect_equal(basicFunction, "4") -}) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_includePackage.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_includePackage.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_includePackage.R deleted file mode 100644 index 8152b44..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_includePackage.R +++ /dev/null @@ -1,57 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -context("include R packages") - -# JavaSparkContext handle -sc <- sparkR.init() - -# Partitioned data -nums <- 1:2 -rdd <- parallelize(sc, nums, 2L) - -test_that("include inside function", { - # Only run the test if plyr is installed. - if ("plyr" %in% rownames(installed.packages())) { - suppressPackageStartupMessages(library(plyr)) - generateData <- function(x) { - suppressPackageStartupMessages(library(plyr)) - attach(airquality) - result <- transform(Ozone, logOzone = log(Ozone)) - result - } - - data <- lapplyPartition(rdd, generateData) - actual <- collect(data) - } -}) - -test_that("use include package", { - # Only run the test if plyr is installed. - if ("plyr" %in% rownames(installed.packages())) { - suppressPackageStartupMessages(library(plyr)) - generateData <- function(x) { - attach(airquality) - result <- transform(Ozone, logOzone = log(Ozone)) - result - } - - includePackage(sc, plyr) - data <- lapplyPartition(rdd, generateData) - actual <- collect(data) - } -}) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_mllib.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_mllib.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_mllib.R deleted file mode 100644 index f272de7..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_mllib.R +++ /dev/null @@ -1,61 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -library(testthat) - -context("MLlib functions") - -# Tests for MLlib functions in SparkR - -sc <- sparkR.init() - -sqlContext <- sparkRSQL.init(sc) - -test_that("glm and predict", { - training <- createDataFrame(sqlContext, iris) - test <- select(training, "Sepal_Length") - model <- glm(Sepal_Width ~ Sepal_Length, training, family = "gaussian") - prediction <- predict(model, test) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") -}) - -test_that("predictions match with native glm", { - training <- createDataFrame(sqlContext, iris) - model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) -}) - -test_that("dot minus and intercept vs native glm", { - training <- createDataFrame(sqlContext, iris) - model <- glm(Sepal_Width ~ . - Species + 0, data = training) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) -}) - -test_that("summary coefficients match with native glm", { - training <- createDataFrame(sqlContext, iris) - stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training)) - coefs <- as.vector(stats$coefficients) - rCoefs <- as.vector(coef(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))) - expect_true(all(abs(rCoefs - coefs) < 1e-6)) - expect_true(all( - as.character(stats$features) == - c("(Intercept)", "Sepal_Length", "Species__versicolor", "Species__virginica"))) -}) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_parallelize_collect.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_parallelize_collect.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_parallelize_collect.R deleted file mode 100644 index 2552127..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_parallelize_collect.R +++ /dev/null @@ -1,109 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -context("parallelize() and collect()") - -# Mock data -numVector <- c(-10:97) -numList <- list(sqrt(1), sqrt(2), sqrt(3), 4 ** 10) -strVector <- c("Dexter Morgan: I suppose I should be upset, even feel", - "violated, but I'm not. No, in fact, I think this is a friendly", - "message, like \"Hey, wanna play?\" and yes, I want to play. ", - "I really, really do.") -strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ", - "other times it helps me control the chaos.", - "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ", - "raising me. But they're both dead now. I didn't kill them. Honest.") - -numPairs <- list(list(1, 1), list(1, 2), list(2, 2), list(2, 3)) -strPairs <- list(list(strList, strList), list(strList, strList)) - -# JavaSparkContext handle -jsc <- sparkR.init() - -# Tests - -test_that("parallelize() on simple vectors and lists returns an RDD", { - numVectorRDD <- parallelize(jsc, numVector, 1) - numVectorRDD2 <- parallelize(jsc, numVector, 10) - numListRDD <- parallelize(jsc, numList, 1) - numListRDD2 <- parallelize(jsc, numList, 4) - strVectorRDD <- parallelize(jsc, strVector, 2) - strVectorRDD2 <- parallelize(jsc, strVector, 3) - strListRDD <- parallelize(jsc, strList, 4) - strListRDD2 <- parallelize(jsc, strList, 1) - - rdds <- c(numVectorRDD, - numVectorRDD2, - numListRDD, - numListRDD2, - strVectorRDD, - strVectorRDD2, - strListRDD, - strListRDD2) - - for (rdd in rdds) { - expect_is(rdd, "RDD") - expect_true(.hasSlot(rdd, "jrdd") - && inherits(rdd@jrdd, "jobj") - && isInstanceOf(rdd@jrdd, "org.apache.spark.api.java.JavaRDD")) - } -}) - -test_that("collect(), following a parallelize(), gives back the original collections", { - numVectorRDD <- parallelize(jsc, numVector, 10) - expect_equal(collect(numVectorRDD), as.list(numVector)) - - numListRDD <- parallelize(jsc, numList, 1) - numListRDD2 <- parallelize(jsc, numList, 4) - expect_equal(collect(numListRDD), as.list(numList)) - expect_equal(collect(numListRDD2), as.list(numList)) - - strVectorRDD <- parallelize(jsc, strVector, 2) - strVectorRDD2 <- parallelize(jsc, strVector, 3) - expect_equal(collect(strVectorRDD), as.list(strVector)) - expect_equal(collect(strVectorRDD2), as.list(strVector)) - - strListRDD <- parallelize(jsc, strList, 4) - strListRDD2 <- parallelize(jsc, strList, 1) - expect_equal(collect(strListRDD), as.list(strList)) - expect_equal(collect(strListRDD2), as.list(strList)) -}) - -test_that("regression: collect() following a parallelize() does not drop elements", { - # 10 %/% 6 = 1, ceiling(10 / 6) = 2 - collLen <- 10 - numPart <- 6 - expected <- runif(collLen) - actual <- collect(parallelize(jsc, expected, numPart)) - expect_equal(actual, as.list(expected)) -}) - -test_that("parallelize() and collect() work for lists of pairs (pairwise data)", { - # use the pairwise logical to indicate pairwise data - numPairsRDDD1 <- parallelize(jsc, numPairs, 1) - numPairsRDDD2 <- parallelize(jsc, numPairs, 2) - numPairsRDDD3 <- parallelize(jsc, numPairs, 3) - expect_equal(collect(numPairsRDDD1), numPairs) - expect_equal(collect(numPairsRDDD2), numPairs) - expect_equal(collect(numPairsRDDD3), numPairs) - # can also leave out the parameter name, if the params are supplied in order - strPairsRDDD1 <- parallelize(jsc, strPairs, 1) - strPairsRDDD2 <- parallelize(jsc, strPairs, 2) - expect_equal(collect(strPairsRDDD1), strPairs) - expect_equal(collect(strPairsRDDD2), strPairs) -})