Repository: spark Updated Branches: refs/heads/master 3abc0d512 -> af0e3125c
[SPARK-8951] [SPARKR] support Unicode characters in collect() Spark gives an error message and does not show the output when a field of the result DataFrame contains characters in CJK. I changed SerDe.scala in order that Spark support Unicode characters when writes a string to R. Author: CHOIJAEHONG <[email protected]> Closes #7494 from CHOIJAEHONG1/SPARK-8951. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af0e3125 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af0e3125 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af0e3125 Branch: refs/heads/master Commit: af0e3125cb1d48b1fc0e44c42b6880d67a9f1a85 Parents: 3abc0d5 Author: CHOIJAEHONG <[email protected]> Authored: Thu Sep 3 13:38:26 2015 -0700 Committer: Shivaram Venkataraman <[email protected]> Committed: Thu Sep 3 13:38:26 2015 -0700 ---------------------------------------------------------------------- R/pkg/R/deserialize.R | 6 +++-- R/pkg/R/serialize.R | 2 +- R/pkg/inst/tests/test_sparkSQL.R | 26 ++++++++++++++++++++ .../scala/org/apache/spark/api/r/SerDe.scala | 9 +++---- 4 files changed, 35 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/af0e3125/R/pkg/R/deserialize.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R index 6cf628e..88f1861 100644 --- a/R/pkg/R/deserialize.R +++ b/R/pkg/R/deserialize.R @@ -57,8 +57,10 @@ readTypedObject <- function(con, type) { readString <- function(con) { stringLen <- readInt(con) - string <- readBin(con, raw(), stringLen, endian = "big") - rawToChar(string) + raw <- readBin(con, raw(), stringLen, endian = "big") + string <- rawToChar(raw) + Encoding(string) <- "UTF-8" + string } readInt <- function(con) { http://git-wip-us.apache.org/repos/asf/spark/blob/af0e3125/R/pkg/R/serialize.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/serialize.R b/R/pkg/R/serialize.R index e3676f5..91e6b3e 100644 --- a/R/pkg/R/serialize.R +++ b/R/pkg/R/serialize.R @@ -79,7 +79,7 @@ writeJobj <- function(con, value) { writeString <- function(con, value) { utfVal <- enc2utf8(value) writeInt(con, as.integer(nchar(utfVal, type = "bytes") + 1)) - writeBin(utfVal, con, endian = "big") + writeBin(utfVal, con, endian = "big", useBytes=TRUE) } writeInt <- function(con, value) { http://git-wip-us.apache.org/repos/asf/spark/blob/af0e3125/R/pkg/inst/tests/test_sparkSQL.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index 0da5e38..6d331f9 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -431,6 +431,32 @@ test_that("collect() and take() on a DataFrame return the same number of rows an expect_equal(ncol(collect(df)), ncol(take(df, 10))) }) +test_that("collect() support Unicode characters", { + markUtf8 <- function(s) { + Encoding(s) <- "UTF-8" + s + } + + lines <- c("{\"name\":\"ìë íì¸ì\"}", + "{\"name\":\"æ¨å¥½\", \"age\":30}", + "{\"name\":\"ããã«ã¡ã¯\", \"age\":19}", + "{\"name\":\"Xin chà o\"}") + + jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") + writeLines(lines, jsonPath) + + df <- read.df(sqlContext, jsonPath, "json") + rdf <- collect(df) + expect_true(is.data.frame(rdf)) + expect_equal(rdf$name[1], markUtf8("ìë íì¸ì")) + expect_equal(rdf$name[2], markUtf8("æ¨å¥½")) + expect_equal(rdf$name[3], markUtf8("ããã«ã¡ã¯")) + expect_equal(rdf$name[4], markUtf8("Xin chà o")) + + df1 <- createDataFrame(sqlContext, rdf) + expect_equal(collect(where(df1, df1$name == markUtf8("æ¨å¥½")))$name, markUtf8("æ¨å¥½")) +}) + test_that("multiple pipeline transformations result in an RDD with the correct values", { df <- jsonFile(sqlContext, jsonPath) first <- lapply(df, function(row) { http://git-wip-us.apache.org/repos/asf/spark/blob/af0e3125/core/src/main/scala/org/apache/spark/api/r/SerDe.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index 26ad4f1..190e193 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -329,12 +329,11 @@ private[spark] object SerDe { out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9) } - // NOTE: Only works for ASCII right now def writeString(out: DataOutputStream, value: String): Unit = { - val len = value.length - out.writeInt(len + 1) // For the \0 - out.writeBytes(value) - out.writeByte(0) + val utf8 = value.getBytes("UTF-8") + val len = utf8.length + out.writeInt(len) + out.write(utf8, 0, len) } def writeBytes(out: DataOutputStream, value: Array[Byte]): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
