This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7fb9f68 [SPARK-32799][R][SQL] Add allowMissingColumns to SparkR
unionByName
7fb9f68 is described below
commit 7fb9f6884f5e085e97b60fe45055247c2d17245c
Author: zero323 <[email protected]>
AuthorDate: Mon Sep 21 09:39:34 2020 +0900
[SPARK-32799][R][SQL] Add allowMissingColumns to SparkR unionByName
### What changes were proposed in this pull request?
Add optional `allowMissingColumns` argument to SparkR `unionByName`.
### Why are the changes needed?
Feature parity.
### Does this PR introduce _any_ user-facing change?
`unionByName` supports `allowMissingColumns`.
### How was this patch tested?
Existing unit tests. New unit tests targeting this feature.
Closes #29813 from zero323/SPARK-32799.
Authored-by: zero323 <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
---
R/pkg/R/DataFrame.R | 14 ++++++++++++--
R/pkg/R/generics.R | 2 +-
R/pkg/tests/fulltests/test_sparkSQL.R | 13 +++++++++++++
python/pyspark/sql/dataframe.py | 9 ++++-----
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 8 ++++----
5 files changed, 34 insertions(+), 12 deletions(-)
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 089e1f2..2ce5378 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2863,11 +2863,18 @@ setMethod("unionAll",
#' \code{UNION ALL} and \code{UNION DISTINCT} in SQL as column positions are
not taken
#' into account. Input SparkDataFrames can have different data types in the
schema.
#'
+#' When the parameter allowMissingColumns is `TRUE`, the set of column names
+#' in x and y can differ; missing columns will be filled as null.
+#' Further, the missing columns of x will be added at the end
+#' in the schema of the union result.
+#'
#' Note: This does not remove duplicate rows across the two SparkDataFrames.
#' This function resolves columns by name (not by position).
#'
#' @param x A SparkDataFrame
#' @param y A SparkDataFrame
+#' @param allowMissingColumns logical
+#' @param ... further arguments to be passed to or from other methods.
#' @return A SparkDataFrame containing the result of the union.
#' @family SparkDataFrame functions
#' @rdname unionByName
@@ -2880,12 +2887,15 @@ setMethod("unionAll",
#' df1 <- select(createDataFrame(mtcars), "carb", "am", "gear")
#' df2 <- select(createDataFrame(mtcars), "am", "gear", "carb")
#' head(unionByName(df1, df2))
+#'
+#' df3 <- select(createDataFrame(mtcars), "carb")
+#' head(unionByName(df1, df3, allowMissingColumns = TRUE))
#' }
#' @note unionByName since 2.3.0
setMethod("unionByName",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
- function(x, y) {
- unioned <- callJMethod(x@sdf, "unionByName", y@sdf)
+ function(x, y, allowMissingColumns=FALSE) {
+ unioned <- callJMethod(x@sdf, "unionByName", y@sdf,
allowMissingColumns)
dataFrame(unioned)
})
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 839c00c..a6a7166 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -638,7 +638,7 @@ setGeneric("union", function(x, y) {
standardGeneric("union") })
setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })
#' @rdname unionByName
-setGeneric("unionByName", function(x, y) { standardGeneric("unionByName") })
+setGeneric("unionByName", function(x, y, ...) { standardGeneric("unionByName")
})
#' @rdname unpersist
setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R
b/R/pkg/tests/fulltests/test_sparkSQL.R
index e008bc5..5008d30 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -2696,6 +2696,19 @@ test_that("union(), unionByName(), rbind(), except(),
and intersect() on a DataF
expect_error(rbind(df, df2, df3),
"Names of input data frames are different.")
+
+ df4 <- unionByName(df2, select(df2, "age"), TRUE)
+
+ expect_equal(
+ sum(collect(
+ select(df4, alias(isNull(df4$name), "missing_name")
+ ))$missing_name),
+ 3
+ )
+
+ testthat::expect_error(unionByName(df2, select(df2, "age"), FALSE))
+ testthat::expect_error(unionByName(df2, select(df2, "age")))
+
excepted <- arrange(except(df, df2), desc(df$age))
expect_is(unioned, "SparkDataFrame")
expect_equal(count(excepted), 2)
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index db2ddde..94a7df3 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1569,11 +1569,10 @@ class DataFrame(PandasMapOpsMixin,
PandasConversionMixin):
| 6| 4| 5|
+----+----+----+
- When the parameter `allowMissingColumns` is ``True``,
- this function allows different set of column names between two
:class:`DataFrame`\\s.
- Missing columns at each side, will be filled with null values.
- The missing columns at left :class:`DataFrame` will be added at the
end in the schema
- of the union result:
+ When the parameter `allowMissingColumns` is ``True``, the set of
column names
+ in this and other :class:`DataFrame` can differ; missing columns will
be filled with null.
+ Further, the missing columns of this :class:`DataFrame` will be added
at the end
+ in the schema of the union result:
>>> df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
>>> df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col3"])
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 4cb923d..87b9aea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2038,10 +2038,10 @@ class Dataset[T] private[sql](
* The difference between this function and [[union]] is that this function
* resolves columns by name (not by position).
*
- * When the parameter `allowMissingColumns` is true, this function allows
different set
- * of column names between two Datasets. Missing columns at each side, will
be filled with
- * null values. The missing columns at left Dataset will be added at the end
in the schema
- * of the union result:
+ * When the parameter `allowMissingColumns` is `true`, the set of column
names
+ * in this and other `Dataset` can differ; missing columns will be filled
with null.
+ * Further, the missing columns of this `Dataset` will be added at the end
+ * in the schema of the union result:
*
* {{{
* val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]