This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2ddb6be4724 [SPARK-46298][PYTHON][CONNECT] Match deprecation warning,
test case, and error of Catalog.createExternalTable
2ddb6be4724 is described below
commit 2ddb6be472431feceecd3daece8bafc8c80d7eb1
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Thu Dec 7 14:00:41 2023 +0900
[SPARK-46298][PYTHON][CONNECT] Match deprecation warning, test case, and
error of Catalog.createExternalTable
### What changes were proposed in this pull request?
This PR adds tests for catalog error cases for `createExternalTable`.
Also, this PR includes several minor cleanups:
- Show a deprecation for `spark.catalog.createExternalTable` (to match with
the non-Spark Connect)
- Remove `_reset` at `Catalog` which is not used anywhere.
- Switch the implementation of Spark Connect
`spark.catalog.createExternalTable` to directly call
`spark.catalog.createTable`, and remove the corresponding Python protobuf
definition.
- this PR does not remove the protobuf message definition itself for
potential compatibility concern.
### Why are the changes needed?
- For feature parity.
- To improve the test coverage.
See
https://app.codecov.io/gh/apache/spark/commit/1a651753f4e760643d719add3b16acd311454c76/blob/python/pyspark/sql/catalog.py
This is not being tested.
### Does this PR introduce _any_ user-facing change?
Virtually no (except the ones descried above)
### How was this patch tested?
Manually ran the new unittest.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44226 from HyukjinKwon/SPARK-46298.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/catalog.py | 8 --------
python/pyspark/sql/connect/catalog.py | 22 ++++++++++++---------
python/pyspark/sql/connect/plan.py | 34 --------------------------------
python/pyspark/sql/tests/test_catalog.py | 9 ++++++++-
4 files changed, 21 insertions(+), 52 deletions(-)
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index b5337734b3b..6595659a4da 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -1237,14 +1237,6 @@ class Catalog:
"""
self._jcatalog.refreshByPath(path)
- def _reset(self) -> None:
- """(Internal use only) Drop all existing databases (except "default"),
tables,
- partitions and functions, and set the current database to "default".
-
- This is mainly used for tests.
- """
- self._jsparkSession.sessionState().catalog().reset()
-
def _test() -> None:
import os
diff --git a/python/pyspark/sql/connect/catalog.py
b/python/pyspark/sql/connect/catalog.py
index 9143a03d324..ef1bff9d28c 100644
--- a/python/pyspark/sql/connect/catalog.py
+++ b/python/pyspark/sql/connect/catalog.py
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+from pyspark.errors import PySparkTypeError
from pyspark.sql.connect.utils import check_dependencies
check_dependencies(__name__)
@@ -215,16 +216,11 @@ class Catalog:
schema: Optional[StructType] = None,
**options: str,
) -> DataFrame:
- catalog = plan.CreateExternalTable(
- table_name=tableName,
- path=path, # type: ignore[arg-type]
- source=source,
- schema=schema,
- options=options,
+ warnings.warn(
+ "createExternalTable is deprecated since Spark 4.0, please use
createTable instead.",
+ FutureWarning,
)
- df = DataFrame(catalog, session=self._sparkSession)
- df._to_table() # Eager execution.
- return df
+ return self.createTable(tableName, path, source, schema, **options)
createExternalTable.__doc__ = PySparkCatalog.createExternalTable.__doc__
@@ -237,6 +233,14 @@ class Catalog:
description: Optional[str] = None,
**options: str,
) -> DataFrame:
+ if schema is not None and not isinstance(schema, StructType):
+ raise PySparkTypeError(
+ error_class="NOT_STRUCT",
+ message_parameters={
+ "arg_name": "schema",
+ "arg_type": type(schema).__name__,
+ },
+ )
catalog = plan.CreateTable(
table_name=tableName,
path=path, # type: ignore[arg-type]
diff --git a/python/pyspark/sql/connect/plan.py
b/python/pyspark/sql/connect/plan.py
index 67a33c2b6cf..cdc06b0f31c 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -1997,40 +1997,6 @@ class FunctionExists(LogicalPlan):
return plan
-class CreateExternalTable(LogicalPlan):
- def __init__(
- self,
- table_name: str,
- path: str,
- source: Optional[str] = None,
- schema: Optional[DataType] = None,
- options: Mapping[str, str] = {},
- ) -> None:
- super().__init__(None)
- self._table_name = table_name
- self._path = path
- self._source = source
- self._schema = schema
- self._options = options
-
- def plan(self, session: "SparkConnectClient") -> proto.Relation:
- plan = self._create_proto_relation()
- plan.catalog.create_external_table.table_name = self._table_name
- if self._path is not None:
- plan.catalog.create_external_table.path = self._path
- if self._source is not None:
- plan.catalog.create_external_table.source = self._source
- if self._schema is not None:
- plan.catalog.create_external_table.schema.CopyFrom(
- pyspark_types_to_proto_types(self._schema)
- )
- for k in self._options.keys():
- v = self._options.get(k)
- if v is not None:
- plan.catalog.create_external_table.options[k] = v
- return plan
-
-
class CreateTable(LogicalPlan):
def __init__(
self,
diff --git a/python/pyspark/sql/tests/test_catalog.py
b/python/pyspark/sql/tests/test_catalog.py
index b72172a402b..278fbbb2ba5 100644
--- a/python/pyspark/sql/tests/test_catalog.py
+++ b/python/pyspark/sql/tests/test_catalog.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
from pyspark import StorageLevel
-from pyspark.errors import AnalysisException
+from pyspark.errors import AnalysisException, PySparkTypeError
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.testing.sqlutils import ReusedSQLTestCase
@@ -81,6 +81,13 @@ class CatalogTestsMixin:
schema = StructType([StructField("a", IntegerType(),
True)])
description = "this a table created via
Catalog.createTable()"
+
+ with self.assertRaisesRegex(PySparkTypeError, "should be a
struct type"):
+ # Test deprecated API and negative error case.
+ spark.catalog.createExternalTable(
+ "invalid_table_creation", schema=IntegerType(),
description=description
+ )
+
spark.catalog.createTable(
"tab3_via_catalog", schema=schema,
description=description
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]