Repository: spark Updated Branches: refs/heads/branch-2.4 f91247f81 -> 3f203050a
[SPARK-24324][PYTHON][FOLLOW-UP] Rename the Conf to spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName ## What changes were proposed in this pull request? Add the legacy prefix for spark.sql.execution.pandas.groupedMap.assignColumnsByPosition and rename it to spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName ## How was this patch tested? The existing tests. Closes #22540 from gatorsmile/renameAssignColumnsByPosition. Lead-authored-by: gatorsmile <gatorsm...@gmail.com> Co-authored-by: Hyukjin Kwon <gurwls...@gmail.com> Signed-off-by: hyukjinkwon <gurwls...@apache.org> (cherry picked from commit 8c2edf46d0f89e5ec54968218d89f30a3f8190bc) Signed-off-by: hyukjinkwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f203050 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f203050 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f203050 Branch: refs/heads/branch-2.4 Commit: 3f203050ac764516e68fb43628bba0df5963e44d Parents: f91247f Author: gatorsmile <gatorsm...@gmail.com> Authored: Wed Sep 26 09:32:51 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Wed Sep 26 09:33:13 2018 +0800 ---------------------------------------------------------------------- python/pyspark/sql/tests.py | 3 ++- python/pyspark/worker.py | 7 ++++--- .../org/apache/spark/sql/internal/SQLConf.scala | 18 +++++++++--------- .../spark/sql/execution/arrow/ArrowUtils.scala | 9 +++------ 4 files changed, 18 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3f203050/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 9fa1577..cb186de 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5799,7 +5799,8 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase): import pandas as pd from pyspark.sql.functions import pandas_udf, PandasUDFType - with self.sql_conf({"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition": True}): + with self.sql_conf({ + "spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}): @pandas_udf("a string, b float", PandasUDFType.GROUPED_MAP) def foo(_): http://git-wip-us.apache.org/repos/asf/spark/blob/3f203050/python/pyspark/worker.py ---------------------------------------------------------------------- diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 974344f..8c59f1f 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -97,8 +97,9 @@ def wrap_scalar_pandas_udf(f, return_type): def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf): - assign_cols_by_pos = runner_conf.get( - "spark.sql.execution.pandas.groupedMap.assignColumnsByPosition", False) + assign_cols_by_name = runner_conf.get( + "spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", "true") + assign_cols_by_name = assign_cols_by_name.lower() == "true" def wrapped(key_series, value_series): import pandas as pd @@ -119,7 +120,7 @@ def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf): "Expected: {} Actual: {}".format(len(return_type), len(result.columns))) # Assign result columns by schema name if user labeled with strings, else use position - if not assign_cols_by_pos and any(isinstance(name, basestring) for name in result.columns): + if assign_cols_by_name and any(isinstance(name, basestring) for name in result.columns): return [(result[field.name], to_arrow_type(field.dataType)) for field in return_type] else: return [(result[result.columns[i]], to_arrow_type(field.dataType)) http://git-wip-us.apache.org/repos/asf/spark/blob/3f203050/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2788402..68daf9d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1275,15 +1275,15 @@ object SQLConf { .booleanConf .createWithDefault(true) - val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION = - buildConf("spark.sql.execution.pandas.groupedMap.assignColumnsByPosition") + val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME = + buildConf("spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName") .internal() - .doc("When true, a grouped map Pandas UDF will assign columns from the returned " + - "Pandas DataFrame based on position, regardless of column label type. When false, " + - "columns will be looked up by name if labeled with a string and fallback to use " + - "position if not. This configuration will be deprecated in future releases.") + .doc("When true, columns will be looked up by name if labeled with a string and fallback " + + "to use position if not. When false, a grouped map Pandas UDF will assign columns from " + + "the returned Pandas DataFrame based on position, regardless of column label type. " + + "This configuration will be deprecated in future releases.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val REPLACE_EXCEPT_WITH_FILTER = buildConf("spark.sql.optimizer.replaceExceptWithFilter") .internal() @@ -1884,8 +1884,8 @@ class SQLConf extends Serializable with Logging { def pandasRespectSessionTimeZone: Boolean = getConf(PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE) - def pandasGroupedMapAssignColumnssByPosition: Boolean = - getConf(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION) + def pandasGroupedMapAssignColumnsByName: Boolean = + getConf(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME) def replaceExceptWithFilter: Boolean = getConf(REPLACE_EXCEPT_WITH_FILTER) http://git-wip-us.apache.org/repos/asf/spark/blob/3f203050/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala index 533097a..b1e8fb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala @@ -131,11 +131,8 @@ object ArrowUtils { } else { Nil } - val pandasColsByPosition = if (conf.pandasGroupedMapAssignColumnssByPosition) { - Seq(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION.key -> "true") - } else { - Nil - } - Map(timeZoneConf ++ pandasColsByPosition: _*) + val pandasColsByName = Seq(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME.key -> + conf.pandasGroupedMapAssignColumnsByName.toString) + Map(timeZoneConf ++ pandasColsByName: _*) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org