This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5bc771b1fc1 [SPARK-41707][CONNECT] Implement Catalog API in Spark
Connect
5bc771b1fc1 is described below
commit 5bc771b1fc1c2d9c198a49c5a882090634a774d0
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Tue Dec 27 07:59:26 2022 +0900
[SPARK-41707][CONNECT] Implement Catalog API in Spark Connect
### What changes were proposed in this pull request?
This PR proposes to implement all catalog API in Spark Connect, except
`cache`, `uncache`, and `isCached` because of potential design issues (e.g.,
eager vs lazy). Namely it proposes the Catalog API below:
- `spark.catalog.currentDatabase`
- `spark.catalog.setCurrentDatabase`
- `spark.catalog.listDatabases`
- `spark.catalog.listTables`
- `spark.catalog.listFunctions`
- `spark.catalog.listColumns`
- `spark.catalog.getDatabase`
- `spark.catalog.getTable`
- `spark.catalog.getFunction`
- `spark.catalog.databaseExists`
- `spark.catalog.tableExists`
- `spark.catalog.functionExists`
- `spark.catalog.createExternalTable`
- `spark.catalog.createTable`
- `spark.catalog.dropTempView`
- `spark.catalog.dropGlobalTempView`
- `spark.catalog.recoverPartitions`
- `spark.catalog.clearCache`
- `spark.catalog.refreshTable`
- `spark.catalog.refreshByPath`
- `spark.catalog.currentCatalog`
- `spark.catalog.setCurrentCatalog`
- `spark.catalog.listCatalogs`
The main approach is to stick to what we have in DataFrame API. It follows
the same way as what we're doing in DataFrame API (see `DataFrame.show()` as an
example).
It puts all into a DataFrame (e.g., a bool for `tableExists`, a string for
`currentDatabase`, a list of tables, etc), and send them to the client side
through Arrow batches, which makes us to avoid having another set of Protobuf
messages for its responses.
I explicitly marked them as internal-only for now because we would likely
need more design here, for example, considering HMS thrift definitions.
In addition, this PR also adds `spark.table` (alias of `spark.read.table`)
API in order to reuse PySpark tests.
### Why are the changes needed?
For feature parity.
### Does this PR introduce _any_ user-facing change?
Yes, it adds the support of Spark Connect in `spark.catalog`.
### How was this patch tested?
Reused the unitests of the existing PySpark Catalog tests.
Closes #39214 from HyukjinKwon/SPARK-41270.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../src/main/protobuf/spark/connect/catalog.proto | 199 ++++++
.../main/protobuf/spark/connect/relations.proto | 32 +
.../sql/connect/planner/SparkConnectPlanner.scala | 311 +++++++-
dev/sparktestsupport/modules.py | 1 +
python/pyspark/sql/catalog.py | 69 ++
python/pyspark/sql/connect/catalog.py | 307 ++++++++
python/pyspark/sql/connect/plan.py | 783 +++++++++++++++++++++
python/pyspark/sql/connect/proto/__init__.py | 1 +
python/pyspark/sql/connect/proto/catalog_pb2.py | 396 +++++++++++
python/pyspark/sql/connect/proto/catalog_pb2.pyi | 722 +++++++++++++++++++
python/pyspark/sql/connect/proto/relations_pb2.py | 175 ++---
python/pyspark/sql/connect/proto/relations_pb2.pyi | 220 ++++++
python/pyspark/sql/connect/session.py | 16 +
python/pyspark/sql/session.py | 6 +
.../sql/tests/connect/test_parity_catalog.py | 69 ++
python/pyspark/sql/tests/test_catalog.py | 86 ++-
python/pyspark/testing/sqlutils.py | 6 +-
17 files changed, 3270 insertions(+), 129 deletions(-)
diff --git
a/connector/connect/common/src/main/protobuf/spark/connect/catalog.proto
b/connector/connect/common/src/main/protobuf/spark/connect/catalog.proto
new file mode 100644
index 00000000000..cf5eb403754
--- /dev/null
+++ b/connector/connect/common/src/main/protobuf/spark/connect/catalog.proto
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = 'proto3';
+
+package spark.connect;
+
+import "spark/connect/types.proto";
+
+option java_multiple_files = true;
+option java_package = "org.apache.spark.connect.proto";
+
+// See `spark.catalog.currentDatabase`
+message CurrentDatabase { }
+
+// See `spark.catalog.setCurrentDatabase`
+message SetCurrentDatabase {
+ // (Required)
+ string db_name = 1;
+}
+
+// See `spark.catalog.listDatabases`
+message ListDatabases { }
+
+// See `spark.catalog.listTables`
+message ListTables {
+ // (Optional)
+ optional string db_name = 1;
+}
+
+// See `spark.catalog.listFunctions`
+message ListFunctions {
+ // (Optional)
+ optional string db_name = 1;
+}
+
+// See `spark.catalog.listColumns`
+message ListColumns {
+ // (Required)
+ string table_name = 1;
+ // (Optional)
+ optional string db_name = 2;
+}
+
+// See `spark.catalog.getDatabase`
+message GetDatabase {
+ // (Required)
+ string db_name = 1;
+}
+
+// See `spark.catalog.getTable`
+message GetTable {
+ // (Required)
+ string table_name = 1;
+ // (Optional)
+ optional string db_name = 2;
+}
+
+// See `spark.catalog.getFunction`
+message GetFunction {
+ // (Required)
+ string function_name = 1;
+ // (Optional)
+ optional string db_name = 2;
+}
+
+// See `spark.catalog.databaseExists`
+message DatabaseExists {
+ // (Required)
+ string db_name = 1;
+}
+
+// See `spark.catalog.tableExists`
+message TableExists {
+ // (Required)
+ string table_name = 1;
+ // (Optional)
+ optional string db_name = 2;
+}
+
+// See `spark.catalog.functionExists`
+message FunctionExists {
+ // (Required)
+ string function_name = 1;
+ // (Optional)
+ optional string db_name = 2;
+}
+
+// See `spark.catalog.createExternalTable`
+message CreateExternalTable {
+ // (Required)
+ string table_name = 1;
+ // (Optional)
+ optional string path = 2;
+ // (Optional)
+ optional string source = 3;
+ // (Optional)
+ optional DataType schema = 4;
+ // Options could be empty for valid data source format.
+ // The map key is case insensitive.
+ map<string, string> options = 5;
+}
+
+// See `spark.catalog.createTable`
+message CreateTable {
+ // (Required)
+ string table_name = 1;
+ // (Optional)
+ optional string path = 2;
+ // (Optional)
+ optional string source = 3;
+ // (Optional)
+ optional string description = 4;
+ // (Optional)
+ optional DataType schema = 5;
+ // Options could be empty for valid data source format.
+ // The map key is case insensitive.
+ map<string, string> options = 6;
+}
+
+// See `spark.catalog.dropTempView`
+message DropTempView {
+ // (Required)
+ string view_name = 1;
+}
+
+// See `spark.catalog.dropGlobalTempView`
+message DropGlobalTempView {
+ // (Required)
+ string view_name = 1;
+}
+
+// See `spark.catalog.recoverPartitions`
+message RecoverPartitions {
+ // (Required)
+ string table_name = 1;
+}
+
+// TODO(SPARK-41612): Support Catalog.isCached
+//// See `spark.catalog.isCached`
+//message IsCached {
+// // (Required)
+// string table_name = 1;
+//}
+//
+// TODO(SPARK-41600): Support Catalog.cacheTable
+//// See `spark.catalog.cacheTable`
+//message CacheTable {
+// // (Required)
+// string table_name = 1;
+//}
+//
+// TODO(SPARK-41623): Support Catalog.uncacheTable
+//// See `spark.catalog.uncacheTable`
+//message UncacheTable {
+// // (Required)
+// string table_name = 1;
+//}
+
+// See `spark.catalog.clearCache`
+message ClearCache { }
+
+// See `spark.catalog.refreshTable`
+message RefreshTable {
+ // (Required)
+ string table_name = 1;
+}
+
+// See `spark.catalog.refreshByPath`
+message RefreshByPath {
+ // (Required)
+ string path = 1;
+}
+
+// See `spark.catalog.currentCatalog`
+message CurrentCatalog { }
+
+// See `spark.catalog.setCurrentCatalog`
+message SetCurrentCatalog {
+ // (Required)
+ string catalog_name = 1;
+}
+
+// See `spark.catalog.listCatalogs`
+message ListCatalogs { }
diff --git
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index b40a865cb29..ca17b220841 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -21,6 +21,7 @@ package spark.connect;
import "spark/connect/expressions.proto";
import "spark/connect/types.proto";
+import "spark/connect/catalog.proto";
option java_multiple_files = true;
option java_package = "org.apache.spark.connect.proto";
@@ -68,6 +69,37 @@ message Relation {
StatCrosstab crosstab = 101;
StatDescribe describe = 102;
+ // Catalog API (internal-only)
+ CurrentDatabase current_database = 201;
+ SetCurrentDatabase set_current_database = 202;
+ ListDatabases list_databases = 203;
+ ListTables list_tables = 204;
+ ListFunctions list_functions = 205;
+ ListColumns list_columns = 206;
+ GetDatabase get_database = 207;
+ GetTable get_table = 208;
+ GetFunction get_function = 209;
+ DatabaseExists database_exists = 210;
+ TableExists table_exists = 211;
+ FunctionExists function_exists = 212;
+ CreateExternalTable create_external_table = 213;
+ CreateTable create_table = 214;
+ DropTempView drop_temp_view = 215;
+ DropGlobalTempView drop_global_temp_view = 216;
+ RecoverPartitions recover_partitions = 217;
+// TODO(SPARK-41612): Support Catalog.isCached
+// IsCached is_cached = 218;
+// TODO(SPARK-41600): Support Catalog.cacheTable
+// CacheTable cache_table = 219;
+// TODO(SPARK-41623): Support Catalog.uncacheTable
+// UncacheTable uncache_table = 220;
+ ClearCache clear_cache = 221;
+ RefreshTable refresh_table = 222;
+ RefreshByPath refresh_by_path = 223;
+ CurrentCatalog current_catalog = 224;
+ SetCurrentCatalog set_current_catalog = 225;
+ ListCatalogs list_catalogs = 226;
+
Unknown unknown = 999;
}
}
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index cb8d30b180c..21ff96f158e 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -25,8 +25,7 @@ import com.google.common.collect.{Lists, Maps}
import org.apache.spark.TaskContext
import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
import org.apache.spark.connect.proto
-import org.apache.spark.connect.proto.WriteOperation
-import org.apache.spark.sql.{Column, Dataset, SparkSession}
+import org.apache.spark.sql.{Column, Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.{expressions, AliasIdentifier,
FunctionIdentifier}
import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView,
MultiAlias, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction,
UnresolvedRegex, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions._
@@ -41,6 +40,7 @@ import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.arrow.ArrowConverters
import org.apache.spark.sql.execution.command.CreateViewCommand
import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
+import org.apache.spark.sql.internal.CatalogImpl
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -61,6 +61,7 @@ class SparkConnectPlanner(session: SparkSession) {
// The root of the query plan is a relation and we apply the transformations
to it.
def transformRelation(rel: proto.Relation): LogicalPlan = {
rel.getRelTypeCase match {
+ // DataFrame API
case proto.Relation.RelTypeCase.SHOW_STRING =>
transformShowString(rel.getShowString)
case proto.Relation.RelTypeCase.READ => transformReadRel(rel.getRead)
case proto.Relation.RelTypeCase.PROJECT =>
transformProject(rel.getProject)
@@ -99,6 +100,50 @@ class SparkConnectPlanner(session: SparkSession) {
case proto.Relation.RelTypeCase.UNPIVOT =>
transformUnpivot(rel.getUnpivot)
case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
throw new IndexOutOfBoundsException("Expected Relation to be set, but
is empty.")
+
+ // Catalog API (internal-only)
+ case proto.Relation.RelTypeCase.CURRENT_DATABASE =>
+ transformCurrentDatabase(rel.getCurrentDatabase)
+ case proto.Relation.RelTypeCase.SET_CURRENT_DATABASE =>
+ transformSetCurrentDatabase(rel.getSetCurrentDatabase)
+ case proto.Relation.RelTypeCase.LIST_DATABASES =>
+ transformListDatabases(rel.getListDatabases)
+ case proto.Relation.RelTypeCase.LIST_TABLES =>
transformListTables(rel.getListTables)
+ case proto.Relation.RelTypeCase.LIST_FUNCTIONS =>
+ transformListFunctions(rel.getListFunctions)
+ case proto.Relation.RelTypeCase.LIST_COLUMNS =>
transformListColumns(rel.getListColumns)
+ case proto.Relation.RelTypeCase.GET_DATABASE =>
transformGetDatabase(rel.getGetDatabase)
+ case proto.Relation.RelTypeCase.GET_TABLE =>
transformGetTable(rel.getGetTable)
+ case proto.Relation.RelTypeCase.GET_FUNCTION =>
transformGetFunction(rel.getGetFunction)
+ case proto.Relation.RelTypeCase.DATABASE_EXISTS =>
+ transformDatabaseExists(rel.getDatabaseExists)
+ case proto.Relation.RelTypeCase.TABLE_EXISTS =>
transformTableExists(rel.getTableExists)
+ case proto.Relation.RelTypeCase.FUNCTION_EXISTS =>
+ transformFunctionExists(rel.getFunctionExists)
+ case proto.Relation.RelTypeCase.CREATE_EXTERNAL_TABLE =>
+ transformCreateExternalTable(rel.getCreateExternalTable)
+ case proto.Relation.RelTypeCase.CREATE_TABLE =>
transformCreateTable(rel.getCreateTable)
+ case proto.Relation.RelTypeCase.DROP_TEMP_VIEW =>
transformDropTempView(rel.getDropTempView)
+ case proto.Relation.RelTypeCase.DROP_GLOBAL_TEMP_VIEW =>
+ transformDropGlobalTempView(rel.getDropGlobalTempView)
+ case proto.Relation.RelTypeCase.RECOVER_PARTITIONS =>
+ transformRecoverPartitions(rel.getRecoverPartitions)
+// TODO(SPARK-41612): Support Catalog.isCached
+// case proto.Relation.RelTypeCase.IS_CACHED =>
transformIsCached(rel.getIsCached)
+// TODO(SPARK-41600): Support Catalog.cacheTable
+// case proto.Relation.RelTypeCase.CACHE_TABLE =>
transformCacheTable(rel.getCacheTable)
+// TODO(SPARK-41623): Support Catalog.uncacheTable
+// case proto.Relation.RelTypeCase.UNCACHE_TABLE =>
transformUncacheTable(rel.getUncacheTable)
+ case proto.Relation.RelTypeCase.CLEAR_CACHE =>
transformClearCache(rel.getClearCache)
+ case proto.Relation.RelTypeCase.REFRESH_TABLE =>
transformRefreshTable(rel.getRefreshTable)
+ case proto.Relation.RelTypeCase.REFRESH_BY_PATH =>
+ transformRefreshByPath(rel.getRefreshByPath)
+ case proto.Relation.RelTypeCase.CURRENT_CATALOG =>
+ transformCurrentCatalog(rel.getCurrentCatalog)
+ case proto.Relation.RelTypeCase.SET_CURRENT_CATALOG =>
+ transformSetCurrentCatalog(rel.getSetCurrentCatalog)
+ case proto.Relation.RelTypeCase.LIST_CATALOGS =>
transformListCatalogs(rel.getListCatalogs)
+
case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
}
}
@@ -1041,7 +1086,7 @@ class SparkConnectPlanner(session: SparkSession) {
*
* @param writeOperation
*/
- def handleWriteOperation(writeOperation: WriteOperation): Unit = {
+ def handleWriteOperation(writeOperation: proto.WriteOperation): Unit = {
// Transform the input plan into the logical plan.
val planner = new SparkConnectPlanner(session)
val plan = planner.transformRelation(writeOperation.getInput)
@@ -1092,4 +1137,264 @@ class SparkConnectPlanner(session: SparkSession) {
}
}
+ private val emptyLocalRelation = LocalRelation(
+ output = AttributeReference("value", StringType, false)() :: Nil,
+ data = Seq.empty)
+
+ private def transformCurrentDatabase(getCurrentDatabase:
proto.CurrentDatabase): LogicalPlan = {
+ session.createDataset(session.catalog.currentDatabase ::
Nil)(Encoders.STRING).logicalPlan
+ }
+
+ private def transformSetCurrentDatabase(
+ getSetCurrentDatabase: proto.SetCurrentDatabase): LogicalPlan = {
+ session.catalog.setCurrentDatabase(getSetCurrentDatabase.getDbName)
+ emptyLocalRelation
+ }
+
+ private def transformListDatabases(getListDatabases: proto.ListDatabases):
LogicalPlan = {
+ session.catalog.listDatabases().logicalPlan
+ }
+
+ private def transformListTables(getListTables: proto.ListTables):
LogicalPlan = {
+ if (getListTables.hasDbName) {
+ session.catalog.listTables(getListTables.getDbName).logicalPlan
+ } else {
+ session.catalog.listTables().logicalPlan
+ }
+ }
+
+ private def transformListFunctions(getListFunctions: proto.ListFunctions):
LogicalPlan = {
+ if (getListFunctions.hasDbName) {
+ session.catalog.listFunctions(getListFunctions.getDbName).logicalPlan
+ } else {
+ session.catalog.listFunctions().logicalPlan
+ }
+ }
+
+ private def transformListColumns(getListColumns: proto.ListColumns):
LogicalPlan = {
+ if (getListColumns.hasDbName) {
+ session.catalog
+ .listColumns(dbName = getListColumns.getDbName, tableName =
getListColumns.getTableName)
+ .logicalPlan
+ } else {
+ session.catalog.listColumns(getListColumns.getTableName).logicalPlan
+ }
+ }
+
+ private def transformGetDatabase(getGetDatabase: proto.GetDatabase):
LogicalPlan = {
+ CatalogImpl
+ .makeDataset(session.catalog.getDatabase(getGetDatabase.getDbName) ::
Nil, session)
+ .logicalPlan
+ }
+
+ private def transformGetTable(getGetTable: proto.GetTable): LogicalPlan = {
+ if (getGetTable.hasDbName) {
+ CatalogImpl
+ .makeDataset(
+ session.catalog.getTable(
+ dbName = getGetTable.getDbName,
+ tableName = getGetTable.getTableName) :: Nil,
+ session)
+ .logicalPlan
+ } else {
+ CatalogImpl
+ .makeDataset(session.catalog.getTable(getGetTable.getTableName) ::
Nil, session)
+ .logicalPlan
+ }
+ }
+
+ private def transformGetFunction(getGetFunction: proto.GetFunction):
LogicalPlan = {
+ if (getGetFunction.hasDbName) {
+ CatalogImpl
+ .makeDataset(
+ session.catalog.getFunction(
+ dbName = getGetFunction.getDbName,
+ functionName = getGetFunction.getFunctionName) :: Nil,
+ session)
+ .logicalPlan
+ } else {
+ CatalogImpl
+
.makeDataset(session.catalog.getFunction(getGetFunction.getFunctionName) ::
Nil, session)
+ .logicalPlan
+ }
+ }
+
+ private def transformDatabaseExists(getDatabaseExists:
proto.DatabaseExists): LogicalPlan = {
+ session
+
.createDataset(session.catalog.databaseExists(getDatabaseExists.getDbName) ::
Nil)(
+ Encoders.scalaBoolean)
+ .logicalPlan
+ }
+
+ private def transformTableExists(getTableExists: proto.TableExists):
LogicalPlan = {
+ if (getTableExists.hasDbName) {
+ session
+ .createDataset(
+ session.catalog.tableExists(
+ dbName = getTableExists.getDbName,
+ tableName = getTableExists.getTableName) ::
Nil)(Encoders.scalaBoolean)
+ .logicalPlan
+ } else {
+ session
+
.createDataset(session.catalog.tableExists(getTableExists.getTableName) :: Nil)(
+ Encoders.scalaBoolean)
+ .logicalPlan
+ }
+ }
+
+ private def transformFunctionExists(getFunctionExists:
proto.FunctionExists): LogicalPlan = {
+ if (getFunctionExists.hasDbName) {
+ session
+ .createDataset(
+ session.catalog.functionExists(
+ dbName = getFunctionExists.getDbName,
+ functionName = getFunctionExists.getFunctionName) ::
Nil)(Encoders.scalaBoolean)
+ .logicalPlan
+ } else {
+ session
+
.createDataset(session.catalog.functionExists(getFunctionExists.getFunctionName)
:: Nil)(
+ Encoders.scalaBoolean)
+ .logicalPlan
+ }
+ }
+
+ private def transformCreateExternalTable(
+ getCreateExternalTable: proto.CreateExternalTable): LogicalPlan = {
+ val schema = if (getCreateExternalTable.hasSchema) {
+ val struct =
DataTypeProtoConverter.toCatalystType(getCreateExternalTable.getSchema)
+ assert(struct.isInstanceOf[StructType])
+ struct.asInstanceOf[StructType]
+ } else {
+ new StructType
+ }
+
+ val source = if (getCreateExternalTable.hasSource) {
+ getCreateExternalTable.getSource
+ } else {
+ session.sessionState.conf.defaultDataSourceName
+ }
+
+ val options = if (getCreateExternalTable.hasPath) {
+ (getCreateExternalTable.getOptionsMap.asScala ++
+ Map("path" -> getCreateExternalTable.getPath)).asJava
+ } else {
+ getCreateExternalTable.getOptionsMap
+ }
+ session.catalog
+ .createTable(
+ tableName = getCreateExternalTable.getTableName,
+ source = source,
+ schema = schema,
+ options = options)
+ .logicalPlan
+ }
+
+ private def transformCreateTable(getCreateTable: proto.CreateTable):
LogicalPlan = {
+ val schema = if (getCreateTable.hasSchema) {
+ val struct =
DataTypeProtoConverter.toCatalystType(getCreateTable.getSchema)
+ assert(struct.isInstanceOf[StructType])
+ struct.asInstanceOf[StructType]
+ } else {
+ new StructType
+ }
+
+ val source = if (getCreateTable.hasSource) {
+ getCreateTable.getSource
+ } else {
+ session.sessionState.conf.defaultDataSourceName
+ }
+
+ val description = if (getCreateTable.hasDescription) {
+ getCreateTable.getDescription
+ } else {
+ ""
+ }
+
+ val options = if (getCreateTable.hasPath) {
+ (getCreateTable.getOptionsMap.asScala ++
+ Map("path" -> getCreateTable.getPath)).asJava
+ } else {
+ getCreateTable.getOptionsMap
+ }
+
+ session.catalog
+ .createTable(
+ tableName = getCreateTable.getTableName,
+ source = source,
+ schema = schema,
+ description = description,
+ options = options)
+ .logicalPlan
+ }
+
+ private def transformDropTempView(getDropTempView: proto.DropTempView):
LogicalPlan = {
+ session
+ .createDataset(session.catalog.dropTempView(getDropTempView.getViewName)
:: Nil)(
+ Encoders.scalaBoolean)
+ .logicalPlan
+ }
+
+ private def transformDropGlobalTempView(
+ getDropGlobalTempView: proto.DropGlobalTempView): LogicalPlan = {
+ session
+ .createDataset(
+ session.catalog.dropGlobalTempView(getDropGlobalTempView.getViewName)
:: Nil)(
+ Encoders.scalaBoolean)
+ .logicalPlan
+ }
+
+ private def transformRecoverPartitions(
+ getRecoverPartitions: proto.RecoverPartitions): LogicalPlan = {
+ session.catalog.recoverPartitions(getRecoverPartitions.getTableName)
+ emptyLocalRelation
+ }
+
+// TODO(SPARK-41612): Support Catalog.isCached
+// private def transformIsCached(getIsCached: proto.IsCached): LogicalPlan = {
+// session
+// .createDataset(session.catalog.isCached(getIsCached.getTableName) ::
Nil)(
+// Encoders.scalaBoolean)
+// .logicalPlan
+// }
+//
+// TODO(SPARK-41600): Support Catalog.cacheTable
+// private def transformCacheTable(getCacheTable: proto.CacheTable):
LogicalPlan = {
+// session.catalog.cacheTable(getCacheTable.getTableName)
+// emptyLocalRelation
+// }
+//
+// TODO(SPARK-41623): Support Catalog.uncacheTable
+// private def transformUncacheTable(getUncacheTable: proto.UncacheTable):
LogicalPlan = {
+// session.catalog.uncacheTable(getUncacheTable.getTableName)
+// emptyLocalRelation
+// }
+
+ private def transformClearCache(getClearCache: proto.ClearCache):
LogicalPlan = {
+ session.catalog.clearCache()
+ emptyLocalRelation
+ }
+
+ private def transformRefreshTable(getRefreshTable: proto.RefreshTable):
LogicalPlan = {
+ session.catalog.refreshTable(getRefreshTable.getTableName)
+ emptyLocalRelation
+ }
+
+ private def transformRefreshByPath(getRefreshByPath: proto.RefreshByPath):
LogicalPlan = {
+ session.catalog.refreshByPath(getRefreshByPath.getPath)
+ emptyLocalRelation
+ }
+
+ private def transformCurrentCatalog(getCurrentCatalog:
proto.CurrentCatalog): LogicalPlan = {
+ session.createDataset(session.catalog.currentCatalog() ::
Nil)(Encoders.STRING).logicalPlan
+ }
+
+ private def transformSetCurrentCatalog(
+ getSetCurrentCatalog: proto.SetCurrentCatalog): LogicalPlan = {
+ session.catalog.setCurrentCatalog(getSetCurrentCatalog.getCatalogName)
+ emptyLocalRelation
+ }
+
+ private def transformListCatalogs(getListCatalogs: proto.ListCatalogs):
LogicalPlan = {
+ session.catalog.listCatalogs().logicalPlan
+ }
}
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 51f5246d741..84e258d20e9 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -512,6 +512,7 @@ pyspark_connect = Module(
"pyspark.sql.tests.connect.test_connect_basic",
"pyspark.sql.tests.connect.test_connect_function",
"pyspark.sql.tests.connect.test_connect_column",
+ "pyspark.sql.tests.connect.test_parity_catalog",
"pyspark.sql.tests.connect.test_parity_functions",
"pyspark.sql.tests.connect.test_parity_dataframe",
],
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index 4e8c5d96d01..77dbc3e024a 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -91,6 +91,9 @@ class Catalog:
.. versionadded:: 3.4.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Examples
--------
>>> spark.catalog.currentCatalog()
@@ -103,6 +106,9 @@ class Catalog:
.. versionadded:: 3.4.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
catalogName : str
@@ -119,6 +125,9 @@ class Catalog:
.. versionadded:: 3.4.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Returns
-------
list
@@ -137,6 +146,9 @@ class Catalog:
.. versionadded:: 2.0.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Returns
-------
str
@@ -155,6 +167,9 @@ class Catalog:
.. versionadded:: 2.0.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Examples
--------
>>> spark.catalog.setCurrentDatabase("default")
@@ -167,6 +182,9 @@ class Catalog:
.. versionadded:: 2.0.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Returns
-------
list
@@ -197,6 +215,9 @@ class Catalog:
.. versionadded:: 3.4.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
dbName : str
@@ -230,6 +251,9 @@ class Catalog:
.. versionadded:: 3.3.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
dbName : str
@@ -266,6 +290,9 @@ class Catalog:
.. versionadded:: 2.0.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
dbName : str
@@ -325,6 +352,9 @@ class Catalog:
.. versionadded:: 3.4.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
tableName : str
@@ -381,6 +411,9 @@ class Catalog:
.. versionadded:: 3.4.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
dbName : str
@@ -432,6 +465,9 @@ class Catalog:
.. versionadded:: 3.3.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
functionName : str
@@ -481,6 +517,9 @@ class Catalog:
.. versionadded:: 3.4.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
functionName : str
@@ -531,6 +570,9 @@ class Catalog:
.. versionadded:: 2.0.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
tableName : str
@@ -594,6 +636,9 @@ class Catalog:
.. versionadded:: 3.3.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
tableName : str
@@ -692,6 +737,9 @@ class Catalog:
.. versionadded:: 2.0.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Returns
-------
:class:`DataFrame`
@@ -715,6 +763,9 @@ class Catalog:
.. versionadded:: 2.2.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
tableName : str
@@ -785,6 +836,9 @@ class Catalog:
.. versionadded:: 2.0.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
viewName : str
@@ -822,6 +876,9 @@ class Catalog:
.. versionadded:: 2.1.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
viewName : str
@@ -987,6 +1044,9 @@ class Catalog:
.. versionadded:: 2.0.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Examples
--------
>>> _ = spark.sql("DROP TABLE IF EXISTS tbl1")
@@ -1003,6 +1063,9 @@ class Catalog:
.. versionadded:: 2.0.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
tableName : str
@@ -1051,6 +1114,9 @@ class Catalog:
.. versionadded:: 2.1.1
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
tableName : str
@@ -1095,6 +1161,9 @@ class Catalog:
.. versionadded:: 2.2.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
path : str
diff --git a/python/pyspark/sql/connect/catalog.py
b/python/pyspark/sql/connect/catalog.py
new file mode 100644
index 00000000000..ed673f1d0d3
--- /dev/null
+++ b/python/pyspark/sql/connect/catalog.py
@@ -0,0 +1,307 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from typing import List, Optional, TYPE_CHECKING
+
+import pandas as pd
+
+from pyspark.sql.types import StructType
+from pyspark.sql.connect import DataFrame
+from pyspark.sql.catalog import (
+ Catalog as PySparkCatalog,
+ CatalogMetadata,
+ Database,
+ Table,
+ Function,
+ Column,
+)
+from pyspark.sql.connect import plan
+
+if TYPE_CHECKING:
+ from pyspark.sql.connect.session import SparkSession
+
+
+class Catalog:
+ def __init__(self, sparkSession: "SparkSession") -> None:
+ self._sparkSession = sparkSession
+
+ # TODO(SPARK-41716): Probably should factor out to
pyspark.sql.connect.client.
+ def _catalog_to_pandas(self, catalog: plan.LogicalPlan) -> pd.DataFrame:
+ pdf = DataFrame.withPlan(catalog,
session=self._sparkSession).toPandas()
+ assert pdf is not None
+ return pdf
+
+ def currentCatalog(self) -> str:
+ pdf = self._catalog_to_pandas(plan.CurrentCatalog())
+ assert pdf is not None
+ return pdf.iloc[0].iloc[0]
+
+ currentCatalog.__doc__ = PySparkCatalog.currentCatalog.__doc__
+
+ def setCurrentCatalog(self, catalogName: str) -> None:
+
self._catalog_to_pandas(plan.SetCurrentCatalog(catalog_name=catalogName))
+
+ setCurrentCatalog.__doc__ = PySparkCatalog.setCurrentCatalog.__doc__
+
+ def listCatalogs(self) -> List[CatalogMetadata]:
+ pdf = self._catalog_to_pandas(plan.ListCatalogs())
+ return [
+ CatalogMetadata(name=row.iloc[0], description=row.iloc[1]) for _,
row in pdf.iterrows()
+ ]
+
+ listCatalogs.__doc__ = PySparkCatalog.listCatalogs.__doc__
+
+ def currentDatabase(self) -> str:
+ pdf = self._catalog_to_pandas(plan.CurrentDatabase())
+ assert pdf is not None
+ return pdf.iloc[0].iloc[0]
+
+ currentDatabase.__doc__ = PySparkCatalog.currentDatabase.__doc__
+
+ def setCurrentDatabase(self, dbName: str) -> None:
+ self._catalog_to_pandas(plan.SetCurrentDatabase(db_name=dbName))
+
+ setCurrentDatabase.__doc__ = PySparkCatalog.setCurrentDatabase.__doc__
+
+ def listDatabases(self) -> List[Database]:
+ pdf = self._catalog_to_pandas(plan.ListDatabases())
+ return [
+ Database(
+ name=row.iloc[0],
+ catalog=row.iloc[1],
+ description=row.iloc[2],
+ locationUri=row.iloc[3],
+ )
+ for _, row in pdf.iterrows()
+ ]
+
+ listDatabases.__doc__ = PySparkCatalog.listDatabases.__doc__
+
+ def getDatabase(self, dbName: str) -> Database:
+ pdf = self._catalog_to_pandas(plan.GetDatabase(db_name=dbName))
+ assert pdf is not None
+ row = pdf.iloc[0]
+ return Database(
+ name=row[0],
+ catalog=row[1],
+ description=row[2],
+ locationUri=row[3],
+ )
+
+ getDatabase.__doc__ = PySparkCatalog.getDatabase.__doc__
+
+ def databaseExists(self, dbName: str) -> bool:
+ pdf = self._catalog_to_pandas(plan.DatabaseExists(db_name=dbName))
+ assert pdf is not None
+ return pdf.iloc[0].iloc[0]
+
+ databaseExists.__doc__ = PySparkCatalog.databaseExists.__doc__
+
+ def listTables(self, dbName: Optional[str] = None) -> List[Table]:
+ pdf = self._catalog_to_pandas(plan.ListTables(db_name=dbName))
+ return [
+ Table(
+ name=row.iloc[0],
+ catalog=row.iloc[1],
+ # If empty or None, returns None.
+ namespace=None if row.iloc[2] is None else list(row.iloc[2])
or None,
+ description=row.iloc[3],
+ tableType=row.iloc[4],
+ isTemporary=row.iloc[5],
+ )
+ for _, row in pdf.iterrows()
+ ]
+
+ listTables.__doc__ = PySparkCatalog.listTables.__doc__
+
+ def getTable(self, tableName: str) -> Table:
+ pdf = self._catalog_to_pandas(plan.GetTable(table_name=tableName))
+ assert pdf is not None
+ row = pdf.iloc[0]
+ return Table(
+ name=row.iloc[0],
+ catalog=row.iloc[1],
+ # If empty or None, returns None.
+ namespace=None if row.iloc[2] is None else list(row.iloc[2]) or
None,
+ description=row.iloc[3],
+ tableType=row.iloc[4],
+ isTemporary=row.iloc[5],
+ )
+
+ getTable.__doc__ = PySparkCatalog.getTable.__doc__
+
+ def listFunctions(self, dbName: Optional[str] = None) -> List[Function]:
+ pdf = self._catalog_to_pandas(plan.ListFunctions(db_name=dbName))
+ return [
+ Function(
+ name=row.iloc[0],
+ catalog=row.iloc[1],
+ # If empty or None, returns None.
+ namespace=None if row.iloc[2] is None else list(row.iloc[2])
or None,
+ description=row.iloc[3],
+ className=row.iloc[4],
+ isTemporary=row.iloc[5],
+ )
+ for _, row in pdf.iterrows()
+ ]
+
+ listFunctions.__doc__ = PySparkCatalog.listFunctions.__doc__
+
+ def functionExists(self, functionName: str, dbName: Optional[str] = None)
-> bool:
+ pdf = self._catalog_to_pandas(
+ plan.FunctionExists(function_name=functionName, db_name=dbName)
+ )
+ assert pdf is not None
+ return pdf.iloc[0].iloc[0]
+
+ functionExists.__doc__ = PySparkCatalog.functionExists.__doc__
+
+ def getFunction(self, functionName: str) -> Function:
+ pdf =
self._catalog_to_pandas(plan.GetFunction(function_name=functionName))
+ assert pdf is not None
+ row = pdf.iloc[0]
+ return Function(
+ name=row.iloc[0],
+ catalog=row.iloc[1],
+ # If empty or None, returns None.
+ namespace=None if row.iloc[2] is None else list(row.iloc[2]) or
None,
+ description=row.iloc[3],
+ className=row.iloc[4],
+ isTemporary=row.iloc[5],
+ )
+
+ getFunction.__doc__ = PySparkCatalog.getFunction.__doc__
+
+ def listColumns(self, tableName: str, dbName: Optional[str] = None) ->
List[Column]:
+ pdf = self._catalog_to_pandas(plan.ListColumns(table_name=tableName,
db_name=dbName))
+ return [
+ Column(
+ name=row.iloc[0],
+ description=row.iloc[1],
+ dataType=row.iloc[2],
+ nullable=row.iloc[3],
+ isPartition=row.iloc[4],
+ isBucket=row.iloc[5],
+ )
+ for _, row in pdf.iterrows()
+ ]
+
+ listColumns.__doc__ = PySparkCatalog.listColumns.__doc__
+
+ def tableExists(self, tableName: str, dbName: Optional[str] = None) ->
bool:
+ pdf = self._catalog_to_pandas(plan.TableExists(table_name=tableName,
db_name=dbName))
+ assert pdf is not None
+ return pdf.iloc[0].iloc[0]
+
+ tableExists.__doc__ = PySparkCatalog.tableExists.__doc__
+
+ def createExternalTable(
+ self,
+ tableName: str,
+ path: Optional[str] = None,
+ source: Optional[str] = None,
+ schema: Optional[StructType] = None,
+ **options: str,
+ ) -> DataFrame:
+ catalog = plan.CreateExternalTable(
+ table_name=tableName,
+ path=path, # type: ignore[arg-type]
+ source=source,
+ schema=schema,
+ options=options,
+ )
+ df = DataFrame.withPlan(catalog, session=self._sparkSession)
+ df.toPandas() # Eager execution.
+ return df
+
+ createExternalTable.__doc__ = PySparkCatalog.createExternalTable.__doc__
+
+ def createTable(
+ self,
+ tableName: str,
+ path: Optional[str] = None,
+ source: Optional[str] = None,
+ schema: Optional[StructType] = None,
+ description: Optional[str] = None,
+ **options: str,
+ ) -> DataFrame:
+ catalog = plan.CreateTable(
+ table_name=tableName,
+ path=path, # type: ignore[arg-type]
+ source=source,
+ schema=schema,
+ description=description,
+ options=options,
+ )
+ df = DataFrame.withPlan(catalog, session=self._sparkSession)
+ df.toPandas() # Eager execution.
+ return df
+
+ createTable.__doc__ = PySparkCatalog.createTable.__doc__
+
+ def dropTempView(self, viewName: str) -> bool:
+ pdf = self._catalog_to_pandas(plan.DropTempView(view_name=viewName))
+ assert pdf is not None
+ return pdf.iloc[0].iloc[0]
+
+ dropTempView.__doc__ = PySparkCatalog.dropTempView.__doc__
+
+ def dropGlobalTempView(self, viewName: str) -> bool:
+ pdf =
self._catalog_to_pandas(plan.DropGlobalTempView(view_name=viewName))
+ assert pdf is not None
+ return pdf.iloc[0].iloc[0]
+
+ dropGlobalTempView.__doc__ = PySparkCatalog.dropGlobalTempView.__doc__
+
+ # TODO(SPARK-41612): Support Catalog.isCached
+ # def isCached(self, tableName: str) -> bool:
+ # pdf = self._catalog_to_pandas(plan.IsCached(table_name=tableName))
+ # assert pdf is not None
+ # return pdf.iloc[0].iloc[0]
+ #
+ # isCached.__doc__ = PySparkCatalog.isCached.__doc__
+ #
+ # TODO(SPARK-41600): Support Catalog.cacheTable
+ # def cacheTable(self, tableName: str) -> None:
+ # self._catalog_to_pandas(plan.CacheTable(table_name=tableName))
+ #
+ # cacheTable.__doc__ = PySparkCatalog.cacheTable.__doc__
+ #
+ # TODO(SPARK-41623): Support Catalog.uncacheTable
+ # def uncacheTable(self, tableName: str) -> None:
+ # self._catalog_to_pandas(plan.UncacheTable(table_name=tableName))
+ #
+ # uncacheTable.__doc__ = PySparkCatalog.uncacheTable.__doc__
+
+ def clearCache(self) -> None:
+ self._catalog_to_pandas(plan.ClearCache())
+
+ clearCache.__doc__ = PySparkCatalog.clearCache.__doc__
+
+ def refreshTable(self, tableName: str) -> None:
+ self._catalog_to_pandas(plan.RefreshTable(table_name=tableName))
+
+ refreshTable.__doc__ = PySparkCatalog.refreshTable.__doc__
+
+ def recoverPartitions(self, tableName: str) -> None:
+ self._catalog_to_pandas(plan.RecoverPartitions(table_name=tableName))
+
+ recoverPartitions.__doc__ = PySparkCatalog.recoverPartitions.__doc__
+
+ def refreshByPath(self, path: str) -> None:
+ self._catalog_to_pandas(plan.RefreshByPath(path=path))
+
+ refreshByPath.__doc__ = PySparkCatalog.refreshByPath.__doc__
diff --git a/python/pyspark/sql/connect/plan.py
b/python/pyspark/sql/connect/plan.py
index fe2105d7d2c..468d028cee2 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -95,6 +95,7 @@ class LogicalPlan(object):
return plan
+ # TODO(SPARK-41717): Implement the command logic for print and _repr_html_
def print(self, indent: int = 0) -> str:
...
@@ -1569,3 +1570,785 @@ class WriteOperation(LogicalPlan):
f"</li></ul>"
)
pass
+
+
+class CurrentDatabase(LogicalPlan):
+ def __init__(self) -> None:
+ super().__init__(None)
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ return proto.Relation(current_database=proto.CurrentDatabase())
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"{i}" f"<{self.__class__.__name__}>"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b>
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class SetCurrentDatabase(LogicalPlan):
+ def __init__(self, db_name: str) -> None:
+ super().__init__(None)
+ self._db_name = db_name
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ plan.set_current_database.db_name = self._db_name
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"{i}" f"<{self.__class__.__name__} db_name='{self._db_name}'>"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ db_name: {self._db_name} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class ListDatabases(LogicalPlan):
+ def __init__(self) -> None:
+ super().__init__(None)
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ return proto.Relation(list_databases=proto.ListDatabases())
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"{i}" f"<{self.__class__.__name__}>"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b>
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class ListTables(LogicalPlan):
+ def __init__(self, db_name: Optional[str] = None) -> None:
+ super().__init__(None)
+ self._db_name = db_name
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ if self._db_name is not None:
+ plan.list_tables.db_name = self._db_name
+ else:
+ plan = proto.Relation(list_tables=proto.ListTables())
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"{i}" f"<{self.__class__.__name__} db_name='{self._db_name}'>"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ db_name: {self._db_name} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class ListFunctions(LogicalPlan):
+ def __init__(self, db_name: Optional[str] = None) -> None:
+ super().__init__(None)
+ self._db_name = db_name
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ if self._db_name is not None:
+ plan = proto.Relation()
+ plan.list_functions.db_name = self._db_name
+ else:
+ plan = proto.Relation(list_functions=proto.ListFunctions())
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"{i}" f"<{self.__class__.__name__} db_name='{self._db_name}'>"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ db_name: {self._db_name} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class ListColumns(LogicalPlan):
+ def __init__(self, table_name: str, db_name: Optional[str] = None) -> None:
+ super().__init__(None)
+ self._table_name = table_name
+ self._db_name = db_name
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ plan.list_columns.table_name = self._table_name
+ if self._db_name is not None:
+ plan.list_columns.db_name = self._db_name
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return (
+ f"{i}"
+ f"<{self.__class__.__name__}"
+ f"table_name='{self._table_name}' "
+ f"db_name='{self._db_name}'>"
+ )
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ table_name: {self._table_name} <br />
+ db_name: {self._db_name} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class GetDatabase(LogicalPlan):
+ def __init__(self, db_name: str) -> None:
+ super().__init__(None)
+ self._db_name = db_name
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ plan.get_database.db_name = self._db_name
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"{i}" f"<{self.__class__.__name__} db_name='{self._db_name}'>"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ db_name: {self._db_name} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class GetTable(LogicalPlan):
+ def __init__(self, table_name: str, db_name: Optional[str] = None) -> None:
+ super().__init__(None)
+ self._table_name = table_name
+ self._db_name = db_name
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ plan.get_table.table_name = self._table_name
+ if self._db_name is not None:
+ plan.get_table.db_name = self._db_name
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return (
+ f"{i}"
+ f"<{self.__class__.__name__}"
+ f"table_name='{self._table_name}' "
+ f"db_name='{self._db_name}'>"
+ )
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ table_name: {self._table_name} <br />
+ db_name: {self._db_name} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class GetFunction(LogicalPlan):
+ def __init__(self, function_name: str, db_name: Optional[str] = None) ->
None:
+ super().__init__(None)
+ self._function_name = function_name
+ self._db_name = db_name
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ plan.get_function.function_name = self._function_name
+ if self._db_name is not None:
+ plan.get_function.db_name = self._db_name
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return (
+ f"{i}"
+ f"<{self.__class__.__name__}"
+ f"function_name='{self._function_name}' "
+ f"db_name='{self._db_name}'>"
+ )
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ function_name: {self._function_name} <br />
+ db_name: {self._db_name} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class DatabaseExists(LogicalPlan):
+ def __init__(self, db_name: str) -> None:
+ super().__init__(None)
+ self._db_name = db_name
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ plan.database_exists.db_name = self._db_name
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"{i}" f"<{self.__class__.__name__} db_name='{self._db_name}'>"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ db_name: {self._db_name} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class TableExists(LogicalPlan):
+ def __init__(self, table_name: str, db_name: Optional[str] = None) -> None:
+ super().__init__(None)
+ self._table_name = table_name
+ self._db_name = db_name
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ plan.table_exists.table_name = self._table_name
+ if self._db_name is not None:
+ plan.table_exists.db_name = self._db_name
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return (
+ f"{i}"
+ f"<{self.__class__.__name__}"
+ f"table_name='{self._table_name}' "
+ f"db_name='{self._db_name}'>"
+ )
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ table_name: {self._table_name} <br />
+ db_name: {self._db_name} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class FunctionExists(LogicalPlan):
+ def __init__(self, function_name: str, db_name: Optional[str] = None) ->
None:
+ super().__init__(None)
+ self._function_name = function_name
+ self._db_name = db_name
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ plan.function_exists.function_name = self._function_name
+ if self._db_name is not None:
+ plan.function_exists.db_name = self._db_name
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return (
+ f"{i}"
+ f"<{self.__class__.__name__}"
+ f"function_name='{self._function_name}' "
+ f"db_name='{self._db_name}'>"
+ )
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ function_name: {self._function_name} <br />
+ db_name: {self._db_name} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class CreateExternalTable(LogicalPlan):
+ def __init__(
+ self,
+ table_name: str,
+ path: str,
+ source: Optional[str] = None,
+ schema: Optional[DataType] = None,
+ options: Mapping[str, str] = {},
+ ) -> None:
+ super().__init__(None)
+ self._table_name = table_name
+ self._path = path
+ self._source = source
+ self._schema = schema
+ self._options = options
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ plan.create_external_table.table_name = self._table_name
+ if self._path is not None:
+ plan.create_external_table.path = self._path
+ if self._source is not None:
+ plan.create_external_table.source = self._source
+ if self._schema is not None:
+
plan.create_external_table.schema.CopyFrom(pyspark_types_to_proto_types(self._schema))
+ for k in self._options.keys():
+ v = self._options.get(k)
+ if v is not None:
+ plan.create_external_table.options[k] = v
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return (
+ f"{i}"
+ f"<{self.__class__.__name__}"
+ f"table_name='{self._table_name}' "
+ f"path='{self._path}' "
+ f"source='{self._source}' "
+ f"schema='{self._schema}' "
+ f"options='{self._options}'>"
+ )
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ table_name: {self._table_name} <br />
+ path: {self._path} <br />
+ source: {self._source} <br />
+ schema: {self._schema} <br />
+ options: {self._options} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class CreateTable(LogicalPlan):
+ def __init__(
+ self,
+ table_name: str,
+ path: str,
+ source: Optional[str] = None,
+ description: Optional[str] = None,
+ schema: Optional[DataType] = None,
+ options: Mapping[str, str] = {},
+ ) -> None:
+ super().__init__(None)
+ self._table_name = table_name
+ self._path = path
+ self._source = source
+ self._description = description
+ self._schema = schema
+ self._options = options
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ plan.create_table.table_name = self._table_name
+ if self._path is not None:
+ plan.create_table.path = self._path
+ if self._source is not None:
+ plan.create_table.source = self._source
+ if self._description is not None:
+ plan.create_table.description = self._description
+ if self._schema is not None:
+
plan.create_table.schema.CopyFrom(pyspark_types_to_proto_types(self._schema))
+ for k in self._options.keys():
+ v = self._options.get(k)
+ if v is not None:
+ plan.create_table.options[k] = v
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return (
+ f"{i}"
+ f"<{self.__class__.__name__}"
+ f"table_name='{self._table_name}' "
+ f"path='{self._path}' "
+ f"source='{self._source}' "
+ f"description='{self._description}' "
+ f"schema='{self._schema}' "
+ f"options='{self._options}'>"
+ )
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ table_name: {self._table_name} <br />
+ path: {self._path} <br />
+ source: {self._source} <br />
+ description: {self._description} <br />
+ schema: {self._schema} <br />
+ options: {self._options} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class DropTempView(LogicalPlan):
+ def __init__(self, view_name: str) -> None:
+ super().__init__(None)
+ self._view_name = view_name
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ plan.drop_temp_view.view_name = self._view_name
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"{i}" f"<{self.__class__.__name__}
view_name='{self._view_name}'>"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ view_name: {self._view_name} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class DropGlobalTempView(LogicalPlan):
+ def __init__(self, view_name: str) -> None:
+ super().__init__(None)
+ self._view_name = view_name
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ plan.drop_global_temp_view.view_name = self._view_name
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"{i}" f"<{self.__class__.__name__}
view_name='{self._view_name}'>"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ view_name: {self._view_name} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class RecoverPartitions(LogicalPlan):
+ def __init__(self, table_name: str) -> None:
+ super().__init__(None)
+ self._table_name = table_name
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ plan.recover_partitions.table_name = self._table_name
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"{i}" f"<{self.__class__.__name__}
table_name='{self._table_name}'>"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ table_name: {self._table_name} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+# TODO(SPARK-41612): Support Catalog.isCached
+# class IsCached(LogicalPlan):
+# def __init__(self, table_name: str) -> None:
+# super().__init__(None)
+# self._table_name = table_name
+#
+# def plan(self, session: "SparkConnectClient") -> proto.Relation:
+# plan = proto.Relation()
+# plan.is_cached.table_name = self._table_name
+# return plan
+#
+# def print(self, indent: int = 0) -> str:
+# i = " " * indent
+# return f"{i}" f"<{self.__class__.__name__}
table_name='{self._table_name}'>"
+#
+# def _repr_html_(self) -> str:
+# return f"""
+# <ul>
+# <li>
+# <b>{self.__class__.__name__}</b><br />
+# table_name: {self._table_name} <br />
+# {self._child_repr_()}
+# </li>
+# </ul>
+# """
+#
+#
+# TODO(SPARK-41600): Support Catalog.cacheTable
+# class CacheTable(LogicalPlan):
+# def __init__(self, table_name: str) -> None:
+# super().__init__(None)
+# self._table_name = table_name
+#
+# def plan(self, session: "SparkConnectClient") -> proto.Relation:
+# plan = proto.Relation()
+# plan.cache_table.table_name = self._table_name
+# return plan
+#
+# def print(self, indent: int = 0) -> str:
+# i = " " * indent
+# return f"{i}" f"<{self.__class__.__name__}
table_name='{self._table_name}'>"
+#
+# def _repr_html_(self) -> str:
+# return f"""
+# <ul>
+# <li>
+# <b>{self.__class__.__name__}</b><br />
+# table_name: {self._table_name} <br />
+# {self._child_repr_()}
+# </li>
+# </ul>
+# """
+#
+#
+# TODO(SPARK-41623): Support Catalog.uncacheTable
+# class UncacheTable(LogicalPlan):
+# def __init__(self, table_name: str) -> None:
+# super().__init__(None)
+# self._table_name = table_name
+#
+# def plan(self, session: "SparkConnectClient") -> proto.Relation:
+# plan = proto.Relation()
+# plan.uncache_table.table_name = self._table_name
+# return plan
+#
+# def print(self, indent: int = 0) -> str:
+# i = " " * indent
+# return f"{i}" f"<{self.__class__.__name__}
table_name='{self._table_name}'>"
+#
+# def _repr_html_(self) -> str:
+# return f"""
+# <ul>
+# <li>
+# <b>{self.__class__.__name__}</b><br />
+# table_name: {self._table_name} <br />
+# {self._child_repr_()}
+# </li>
+# </ul>
+# """
+
+
+class ClearCache(LogicalPlan):
+ def __init__(self) -> None:
+ super().__init__(None)
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ return proto.Relation(clear_cache=proto.ClearCache())
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"{i}" f"<{self.__class__.__name__}>"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b>
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class RefreshTable(LogicalPlan):
+ def __init__(self, table_name: str) -> None:
+ super().__init__(None)
+ self._table_name = table_name
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ plan.refresh_table.table_name = self._table_name
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"{i}" f"<{self.__class__.__name__}
table_name='{self._table_name}'>"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ table_name: {self._table_name} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class RefreshByPath(LogicalPlan):
+ def __init__(self, path: str) -> None:
+ super().__init__(None)
+ self._path = path
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ plan.refresh_by_path.path = self._path
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"{i}" f"<{self.__class__.__name__} path='{self._path}'>"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ db_name: {self._path} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class CurrentCatalog(LogicalPlan):
+ def __init__(self) -> None:
+ super().__init__(None)
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ return proto.Relation(current_catalog=proto.CurrentCatalog())
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"{i}" f"<{self.__class__.__name__}>"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b>
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class SetCurrentCatalog(LogicalPlan):
+ def __init__(self, catalog_name: str) -> None:
+ super().__init__(None)
+ self._catalog_name = catalog_name
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = proto.Relation()
+ plan.set_current_catalog.catalog_name = self._catalog_name
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"{i}" f"<{self.__class__.__name__}
catalog_name='{self._catalog_name}'>"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b><br />
+ catalog_name: {self._catalog_name} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
+class ListCatalogs(LogicalPlan):
+ def __init__(self) -> None:
+ super().__init__(None)
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ return proto.Relation(list_catalogs=proto.ListCatalogs())
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"{i}" f"<{self.__class__.__name__}>"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>{self.__class__.__name__}</b>
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
diff --git a/python/pyspark/sql/connect/proto/__init__.py
b/python/pyspark/sql/connect/proto/__init__.py
index f00b1a74c1d..a651a844e15 100644
--- a/python/pyspark/sql/connect/proto/__init__.py
+++ b/python/pyspark/sql/connect/proto/__init__.py
@@ -21,3 +21,4 @@ from pyspark.sql.connect.proto.types_pb2 import *
from pyspark.sql.connect.proto.commands_pb2 import *
from pyspark.sql.connect.proto.expressions_pb2 import *
from pyspark.sql.connect.proto.relations_pb2 import *
+from pyspark.sql.connect.proto.catalog_pb2 import *
diff --git a/python/pyspark/sql/connect/proto/catalog_pb2.py
b/python/pyspark/sql/connect/proto/catalog_pb2.py
new file mode 100644
index 00000000000..65b3e4e584e
--- /dev/null
+++ b/python/pyspark/sql/connect/proto/catalog_pb2.py
@@ -0,0 +1,396 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler. DO NOT EDIT!
+# source: spark/connect/catalog.proto
+"""Generated protocol buffer code."""
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import descriptor_pool as _descriptor_pool
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+from pyspark.sql.connect.proto import types_pb2 as
spark_dot_connect_dot_types__pb2
+
+
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
+
b'\n\x1bspark/connect/catalog.proto\x12\rspark.connect\x1a\x19spark/connect/types.proto"\x11\n\x0f\x43urrentDatabase"-\n\x12SetCurrentDatabase\x12\x17\n\x07\x64\x62_name\x18\x01
\x01(\tR\x06\x64\x62Name"\x0f\n\rListDatabases"6\n\nListTables\x12\x1c\n\x07\x64\x62_name\x18\x01
\x01(\tH\x00R\x06\x64\x62Name\x88\x01\x01\x42\n\n\x08_db_name"9\n\rListFunctions\x12\x1c\n\x07\x64\x62_name\x18\x01
\x01(\tH\x00R\x06\x64\x62Name\x88\x01\x01\x42\n\n\x08_db_name"V\n\x0bListColumns\x12\x1d\n\ntabl
[...]
+)
+
+
+_CURRENTDATABASE = DESCRIPTOR.message_types_by_name["CurrentDatabase"]
+_SETCURRENTDATABASE = DESCRIPTOR.message_types_by_name["SetCurrentDatabase"]
+_LISTDATABASES = DESCRIPTOR.message_types_by_name["ListDatabases"]
+_LISTTABLES = DESCRIPTOR.message_types_by_name["ListTables"]
+_LISTFUNCTIONS = DESCRIPTOR.message_types_by_name["ListFunctions"]
+_LISTCOLUMNS = DESCRIPTOR.message_types_by_name["ListColumns"]
+_GETDATABASE = DESCRIPTOR.message_types_by_name["GetDatabase"]
+_GETTABLE = DESCRIPTOR.message_types_by_name["GetTable"]
+_GETFUNCTION = DESCRIPTOR.message_types_by_name["GetFunction"]
+_DATABASEEXISTS = DESCRIPTOR.message_types_by_name["DatabaseExists"]
+_TABLEEXISTS = DESCRIPTOR.message_types_by_name["TableExists"]
+_FUNCTIONEXISTS = DESCRIPTOR.message_types_by_name["FunctionExists"]
+_CREATEEXTERNALTABLE = DESCRIPTOR.message_types_by_name["CreateExternalTable"]
+_CREATEEXTERNALTABLE_OPTIONSENTRY =
_CREATEEXTERNALTABLE.nested_types_by_name["OptionsEntry"]
+_CREATETABLE = DESCRIPTOR.message_types_by_name["CreateTable"]
+_CREATETABLE_OPTIONSENTRY = _CREATETABLE.nested_types_by_name["OptionsEntry"]
+_DROPTEMPVIEW = DESCRIPTOR.message_types_by_name["DropTempView"]
+_DROPGLOBALTEMPVIEW = DESCRIPTOR.message_types_by_name["DropGlobalTempView"]
+_RECOVERPARTITIONS = DESCRIPTOR.message_types_by_name["RecoverPartitions"]
+_CLEARCACHE = DESCRIPTOR.message_types_by_name["ClearCache"]
+_REFRESHTABLE = DESCRIPTOR.message_types_by_name["RefreshTable"]
+_REFRESHBYPATH = DESCRIPTOR.message_types_by_name["RefreshByPath"]
+_CURRENTCATALOG = DESCRIPTOR.message_types_by_name["CurrentCatalog"]
+_SETCURRENTCATALOG = DESCRIPTOR.message_types_by_name["SetCurrentCatalog"]
+_LISTCATALOGS = DESCRIPTOR.message_types_by_name["ListCatalogs"]
+CurrentDatabase = _reflection.GeneratedProtocolMessageType(
+ "CurrentDatabase",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _CURRENTDATABASE,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.CurrentDatabase)
+ },
+)
+_sym_db.RegisterMessage(CurrentDatabase)
+
+SetCurrentDatabase = _reflection.GeneratedProtocolMessageType(
+ "SetCurrentDatabase",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _SETCURRENTDATABASE,
+ "__module__": "spark.connect.catalog_pb2"
+ #
@@protoc_insertion_point(class_scope:spark.connect.SetCurrentDatabase)
+ },
+)
+_sym_db.RegisterMessage(SetCurrentDatabase)
+
+ListDatabases = _reflection.GeneratedProtocolMessageType(
+ "ListDatabases",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _LISTDATABASES,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.ListDatabases)
+ },
+)
+_sym_db.RegisterMessage(ListDatabases)
+
+ListTables = _reflection.GeneratedProtocolMessageType(
+ "ListTables",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _LISTTABLES,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.ListTables)
+ },
+)
+_sym_db.RegisterMessage(ListTables)
+
+ListFunctions = _reflection.GeneratedProtocolMessageType(
+ "ListFunctions",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _LISTFUNCTIONS,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.ListFunctions)
+ },
+)
+_sym_db.RegisterMessage(ListFunctions)
+
+ListColumns = _reflection.GeneratedProtocolMessageType(
+ "ListColumns",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _LISTCOLUMNS,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.ListColumns)
+ },
+)
+_sym_db.RegisterMessage(ListColumns)
+
+GetDatabase = _reflection.GeneratedProtocolMessageType(
+ "GetDatabase",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _GETDATABASE,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.GetDatabase)
+ },
+)
+_sym_db.RegisterMessage(GetDatabase)
+
+GetTable = _reflection.GeneratedProtocolMessageType(
+ "GetTable",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _GETTABLE,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.GetTable)
+ },
+)
+_sym_db.RegisterMessage(GetTable)
+
+GetFunction = _reflection.GeneratedProtocolMessageType(
+ "GetFunction",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _GETFUNCTION,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.GetFunction)
+ },
+)
+_sym_db.RegisterMessage(GetFunction)
+
+DatabaseExists = _reflection.GeneratedProtocolMessageType(
+ "DatabaseExists",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _DATABASEEXISTS,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.DatabaseExists)
+ },
+)
+_sym_db.RegisterMessage(DatabaseExists)
+
+TableExists = _reflection.GeneratedProtocolMessageType(
+ "TableExists",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _TABLEEXISTS,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.TableExists)
+ },
+)
+_sym_db.RegisterMessage(TableExists)
+
+FunctionExists = _reflection.GeneratedProtocolMessageType(
+ "FunctionExists",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _FUNCTIONEXISTS,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.FunctionExists)
+ },
+)
+_sym_db.RegisterMessage(FunctionExists)
+
+CreateExternalTable = _reflection.GeneratedProtocolMessageType(
+ "CreateExternalTable",
+ (_message.Message,),
+ {
+ "OptionsEntry": _reflection.GeneratedProtocolMessageType(
+ "OptionsEntry",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _CREATEEXTERNALTABLE_OPTIONSENTRY,
+ "__module__": "spark.connect.catalog_pb2"
+ #
@@protoc_insertion_point(class_scope:spark.connect.CreateExternalTable.OptionsEntry)
+ },
+ ),
+ "DESCRIPTOR": _CREATEEXTERNALTABLE,
+ "__module__": "spark.connect.catalog_pb2"
+ #
@@protoc_insertion_point(class_scope:spark.connect.CreateExternalTable)
+ },
+)
+_sym_db.RegisterMessage(CreateExternalTable)
+_sym_db.RegisterMessage(CreateExternalTable.OptionsEntry)
+
+CreateTable = _reflection.GeneratedProtocolMessageType(
+ "CreateTable",
+ (_message.Message,),
+ {
+ "OptionsEntry": _reflection.GeneratedProtocolMessageType(
+ "OptionsEntry",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _CREATETABLE_OPTIONSENTRY,
+ "__module__": "spark.connect.catalog_pb2"
+ #
@@protoc_insertion_point(class_scope:spark.connect.CreateTable.OptionsEntry)
+ },
+ ),
+ "DESCRIPTOR": _CREATETABLE,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.CreateTable)
+ },
+)
+_sym_db.RegisterMessage(CreateTable)
+_sym_db.RegisterMessage(CreateTable.OptionsEntry)
+
+DropTempView = _reflection.GeneratedProtocolMessageType(
+ "DropTempView",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _DROPTEMPVIEW,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.DropTempView)
+ },
+)
+_sym_db.RegisterMessage(DropTempView)
+
+DropGlobalTempView = _reflection.GeneratedProtocolMessageType(
+ "DropGlobalTempView",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _DROPGLOBALTEMPVIEW,
+ "__module__": "spark.connect.catalog_pb2"
+ #
@@protoc_insertion_point(class_scope:spark.connect.DropGlobalTempView)
+ },
+)
+_sym_db.RegisterMessage(DropGlobalTempView)
+
+RecoverPartitions = _reflection.GeneratedProtocolMessageType(
+ "RecoverPartitions",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _RECOVERPARTITIONS,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.RecoverPartitions)
+ },
+)
+_sym_db.RegisterMessage(RecoverPartitions)
+
+ClearCache = _reflection.GeneratedProtocolMessageType(
+ "ClearCache",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _CLEARCACHE,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.ClearCache)
+ },
+)
+_sym_db.RegisterMessage(ClearCache)
+
+RefreshTable = _reflection.GeneratedProtocolMessageType(
+ "RefreshTable",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _REFRESHTABLE,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.RefreshTable)
+ },
+)
+_sym_db.RegisterMessage(RefreshTable)
+
+RefreshByPath = _reflection.GeneratedProtocolMessageType(
+ "RefreshByPath",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _REFRESHBYPATH,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.RefreshByPath)
+ },
+)
+_sym_db.RegisterMessage(RefreshByPath)
+
+CurrentCatalog = _reflection.GeneratedProtocolMessageType(
+ "CurrentCatalog",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _CURRENTCATALOG,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.CurrentCatalog)
+ },
+)
+_sym_db.RegisterMessage(CurrentCatalog)
+
+SetCurrentCatalog = _reflection.GeneratedProtocolMessageType(
+ "SetCurrentCatalog",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _SETCURRENTCATALOG,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.SetCurrentCatalog)
+ },
+)
+_sym_db.RegisterMessage(SetCurrentCatalog)
+
+ListCatalogs = _reflection.GeneratedProtocolMessageType(
+ "ListCatalogs",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _LISTCATALOGS,
+ "__module__": "spark.connect.catalog_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.ListCatalogs)
+ },
+)
+_sym_db.RegisterMessage(ListCatalogs)
+
+if _descriptor._USE_C_DESCRIPTORS == False:
+
+ DESCRIPTOR._options = None
+ DESCRIPTOR._serialized_options =
b"\n\036org.apache.spark.connect.protoP\001"
+ _CREATEEXTERNALTABLE_OPTIONSENTRY._options = None
+ _CREATEEXTERNALTABLE_OPTIONSENTRY._serialized_options = b"8\001"
+ _CREATETABLE_OPTIONSENTRY._options = None
+ _CREATETABLE_OPTIONSENTRY._serialized_options = b"8\001"
+ _CURRENTDATABASE._serialized_start = 73
+ _CURRENTDATABASE._serialized_end = 90
+ _SETCURRENTDATABASE._serialized_start = 92
+ _SETCURRENTDATABASE._serialized_end = 137
+ _LISTDATABASES._serialized_start = 139
+ _LISTDATABASES._serialized_end = 154
+ _LISTTABLES._serialized_start = 156
+ _LISTTABLES._serialized_end = 210
+ _LISTFUNCTIONS._serialized_start = 212
+ _LISTFUNCTIONS._serialized_end = 269
+ _LISTCOLUMNS._serialized_start = 271
+ _LISTCOLUMNS._serialized_end = 357
+ _GETDATABASE._serialized_start = 359
+ _GETDATABASE._serialized_end = 397
+ _GETTABLE._serialized_start = 399
+ _GETTABLE._serialized_end = 482
+ _GETFUNCTION._serialized_start = 484
+ _GETFUNCTION._serialized_end = 576
+ _DATABASEEXISTS._serialized_start = 578
+ _DATABASEEXISTS._serialized_end = 619
+ _TABLEEXISTS._serialized_start = 621
+ _TABLEEXISTS._serialized_end = 707
+ _FUNCTIONEXISTS._serialized_start = 709
+ _FUNCTIONEXISTS._serialized_end = 804
+ _CREATEEXTERNALTABLE._serialized_start = 807
+ _CREATEEXTERNALTABLE._serialized_end = 1133
+ _CREATEEXTERNALTABLE_OPTIONSENTRY._serialized_start = 1044
+ _CREATEEXTERNALTABLE_OPTIONSENTRY._serialized_end = 1102
+ _CREATETABLE._serialized_start = 1136
+ _CREATETABLE._serialized_end = 1501
+ _CREATETABLE_OPTIONSENTRY._serialized_start = 1044
+ _CREATETABLE_OPTIONSENTRY._serialized_end = 1102
+ _DROPTEMPVIEW._serialized_start = 1503
+ _DROPTEMPVIEW._serialized_end = 1546
+ _DROPGLOBALTEMPVIEW._serialized_start = 1548
+ _DROPGLOBALTEMPVIEW._serialized_end = 1597
+ _RECOVERPARTITIONS._serialized_start = 1599
+ _RECOVERPARTITIONS._serialized_end = 1649
+ _CLEARCACHE._serialized_start = 1651
+ _CLEARCACHE._serialized_end = 1663
+ _REFRESHTABLE._serialized_start = 1665
+ _REFRESHTABLE._serialized_end = 1710
+ _REFRESHBYPATH._serialized_start = 1712
+ _REFRESHBYPATH._serialized_end = 1747
+ _CURRENTCATALOG._serialized_start = 1749
+ _CURRENTCATALOG._serialized_end = 1765
+ _SETCURRENTCATALOG._serialized_start = 1767
+ _SETCURRENTCATALOG._serialized_end = 1821
+ _LISTCATALOGS._serialized_start = 1823
+ _LISTCATALOGS._serialized_end = 1837
+# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/catalog_pb2.pyi
b/python/pyspark/sql/connect/proto/catalog_pb2.pyi
new file mode 100644
index 00000000000..1322d41f472
--- /dev/null
+++ b/python/pyspark/sql/connect/proto/catalog_pb2.pyi
@@ -0,0 +1,722 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+@generated by mypy-protobuf. Do not edit manually!
+isort:skip_file
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import builtins
+import collections.abc
+import google.protobuf.descriptor
+import google.protobuf.internal.containers
+import google.protobuf.message
+import pyspark.sql.connect.proto.types_pb2
+import sys
+import typing
+
+if sys.version_info >= (3, 8):
+ import typing as typing_extensions
+else:
+ import typing_extensions
+
+DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
+
+class CurrentDatabase(google.protobuf.message.Message):
+ """See `spark.catalog.currentDatabase`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ def __init__(
+ self,
+ ) -> None: ...
+
+global___CurrentDatabase = CurrentDatabase
+
+class SetCurrentDatabase(google.protobuf.message.Message):
+ """See `spark.catalog.setCurrentDatabase`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ DB_NAME_FIELD_NUMBER: builtins.int
+ db_name: builtins.str
+ """(Required)"""
+ def __init__(
+ self,
+ *,
+ db_name: builtins.str = ...,
+ ) -> None: ...
+ def ClearField(self, field_name: typing_extensions.Literal["db_name",
b"db_name"]) -> None: ...
+
+global___SetCurrentDatabase = SetCurrentDatabase
+
+class ListDatabases(google.protobuf.message.Message):
+ """See `spark.catalog.listDatabases`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ def __init__(
+ self,
+ ) -> None: ...
+
+global___ListDatabases = ListDatabases
+
+class ListTables(google.protobuf.message.Message):
+ """See `spark.catalog.listTables`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ DB_NAME_FIELD_NUMBER: builtins.int
+ db_name: builtins.str
+ """(Optional)"""
+ def __init__(
+ self,
+ *,
+ db_name: builtins.str | None = ...,
+ ) -> None: ...
+ def HasField(
+ self, field_name: typing_extensions.Literal["_db_name", b"_db_name",
"db_name", b"db_name"]
+ ) -> builtins.bool: ...
+ def ClearField(
+ self, field_name: typing_extensions.Literal["_db_name", b"_db_name",
"db_name", b"db_name"]
+ ) -> None: ...
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_db_name", b"_db_name"]
+ ) -> typing_extensions.Literal["db_name"] | None: ...
+
+global___ListTables = ListTables
+
+class ListFunctions(google.protobuf.message.Message):
+ """See `spark.catalog.listFunctions`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ DB_NAME_FIELD_NUMBER: builtins.int
+ db_name: builtins.str
+ """(Optional)"""
+ def __init__(
+ self,
+ *,
+ db_name: builtins.str | None = ...,
+ ) -> None: ...
+ def HasField(
+ self, field_name: typing_extensions.Literal["_db_name", b"_db_name",
"db_name", b"db_name"]
+ ) -> builtins.bool: ...
+ def ClearField(
+ self, field_name: typing_extensions.Literal["_db_name", b"_db_name",
"db_name", b"db_name"]
+ ) -> None: ...
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_db_name", b"_db_name"]
+ ) -> typing_extensions.Literal["db_name"] | None: ...
+
+global___ListFunctions = ListFunctions
+
+class ListColumns(google.protobuf.message.Message):
+ """See `spark.catalog.listColumns`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ TABLE_NAME_FIELD_NUMBER: builtins.int
+ DB_NAME_FIELD_NUMBER: builtins.int
+ table_name: builtins.str
+ """(Required)"""
+ db_name: builtins.str
+ """(Optional)"""
+ def __init__(
+ self,
+ *,
+ table_name: builtins.str = ...,
+ db_name: builtins.str | None = ...,
+ ) -> None: ...
+ def HasField(
+ self, field_name: typing_extensions.Literal["_db_name", b"_db_name",
"db_name", b"db_name"]
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_db_name", b"_db_name", "db_name", b"db_name", "table_name",
b"table_name"
+ ],
+ ) -> None: ...
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_db_name", b"_db_name"]
+ ) -> typing_extensions.Literal["db_name"] | None: ...
+
+global___ListColumns = ListColumns
+
+class GetDatabase(google.protobuf.message.Message):
+ """See `spark.catalog.getDatabase`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ DB_NAME_FIELD_NUMBER: builtins.int
+ db_name: builtins.str
+ """(Required)"""
+ def __init__(
+ self,
+ *,
+ db_name: builtins.str = ...,
+ ) -> None: ...
+ def ClearField(self, field_name: typing_extensions.Literal["db_name",
b"db_name"]) -> None: ...
+
+global___GetDatabase = GetDatabase
+
+class GetTable(google.protobuf.message.Message):
+ """See `spark.catalog.getTable`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ TABLE_NAME_FIELD_NUMBER: builtins.int
+ DB_NAME_FIELD_NUMBER: builtins.int
+ table_name: builtins.str
+ """(Required)"""
+ db_name: builtins.str
+ """(Optional)"""
+ def __init__(
+ self,
+ *,
+ table_name: builtins.str = ...,
+ db_name: builtins.str | None = ...,
+ ) -> None: ...
+ def HasField(
+ self, field_name: typing_extensions.Literal["_db_name", b"_db_name",
"db_name", b"db_name"]
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_db_name", b"_db_name", "db_name", b"db_name", "table_name",
b"table_name"
+ ],
+ ) -> None: ...
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_db_name", b"_db_name"]
+ ) -> typing_extensions.Literal["db_name"] | None: ...
+
+global___GetTable = GetTable
+
+class GetFunction(google.protobuf.message.Message):
+ """See `spark.catalog.getFunction`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ FUNCTION_NAME_FIELD_NUMBER: builtins.int
+ DB_NAME_FIELD_NUMBER: builtins.int
+ function_name: builtins.str
+ """(Required)"""
+ db_name: builtins.str
+ """(Optional)"""
+ def __init__(
+ self,
+ *,
+ function_name: builtins.str = ...,
+ db_name: builtins.str | None = ...,
+ ) -> None: ...
+ def HasField(
+ self, field_name: typing_extensions.Literal["_db_name", b"_db_name",
"db_name", b"db_name"]
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_db_name", b"_db_name", "db_name", b"db_name", "function_name",
b"function_name"
+ ],
+ ) -> None: ...
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_db_name", b"_db_name"]
+ ) -> typing_extensions.Literal["db_name"] | None: ...
+
+global___GetFunction = GetFunction
+
+class DatabaseExists(google.protobuf.message.Message):
+ """See `spark.catalog.databaseExists`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ DB_NAME_FIELD_NUMBER: builtins.int
+ db_name: builtins.str
+ """(Required)"""
+ def __init__(
+ self,
+ *,
+ db_name: builtins.str = ...,
+ ) -> None: ...
+ def ClearField(self, field_name: typing_extensions.Literal["db_name",
b"db_name"]) -> None: ...
+
+global___DatabaseExists = DatabaseExists
+
+class TableExists(google.protobuf.message.Message):
+ """See `spark.catalog.tableExists`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ TABLE_NAME_FIELD_NUMBER: builtins.int
+ DB_NAME_FIELD_NUMBER: builtins.int
+ table_name: builtins.str
+ """(Required)"""
+ db_name: builtins.str
+ """(Optional)"""
+ def __init__(
+ self,
+ *,
+ table_name: builtins.str = ...,
+ db_name: builtins.str | None = ...,
+ ) -> None: ...
+ def HasField(
+ self, field_name: typing_extensions.Literal["_db_name", b"_db_name",
"db_name", b"db_name"]
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_db_name", b"_db_name", "db_name", b"db_name", "table_name",
b"table_name"
+ ],
+ ) -> None: ...
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_db_name", b"_db_name"]
+ ) -> typing_extensions.Literal["db_name"] | None: ...
+
+global___TableExists = TableExists
+
+class FunctionExists(google.protobuf.message.Message):
+ """See `spark.catalog.functionExists`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ FUNCTION_NAME_FIELD_NUMBER: builtins.int
+ DB_NAME_FIELD_NUMBER: builtins.int
+ function_name: builtins.str
+ """(Required)"""
+ db_name: builtins.str
+ """(Optional)"""
+ def __init__(
+ self,
+ *,
+ function_name: builtins.str = ...,
+ db_name: builtins.str | None = ...,
+ ) -> None: ...
+ def HasField(
+ self, field_name: typing_extensions.Literal["_db_name", b"_db_name",
"db_name", b"db_name"]
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_db_name", b"_db_name", "db_name", b"db_name", "function_name",
b"function_name"
+ ],
+ ) -> None: ...
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_db_name", b"_db_name"]
+ ) -> typing_extensions.Literal["db_name"] | None: ...
+
+global___FunctionExists = FunctionExists
+
+class CreateExternalTable(google.protobuf.message.Message):
+ """See `spark.catalog.createExternalTable`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ class OptionsEntry(google.protobuf.message.Message):
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ KEY_FIELD_NUMBER: builtins.int
+ VALUE_FIELD_NUMBER: builtins.int
+ key: builtins.str
+ value: builtins.str
+ def __init__(
+ self,
+ *,
+ key: builtins.str = ...,
+ value: builtins.str = ...,
+ ) -> None: ...
+ def ClearField(
+ self, field_name: typing_extensions.Literal["key", b"key",
"value", b"value"]
+ ) -> None: ...
+
+ TABLE_NAME_FIELD_NUMBER: builtins.int
+ PATH_FIELD_NUMBER: builtins.int
+ SOURCE_FIELD_NUMBER: builtins.int
+ SCHEMA_FIELD_NUMBER: builtins.int
+ OPTIONS_FIELD_NUMBER: builtins.int
+ table_name: builtins.str
+ """(Required)"""
+ path: builtins.str
+ """(Optional)"""
+ source: builtins.str
+ """(Optional)"""
+ @property
+ def schema(self) -> pyspark.sql.connect.proto.types_pb2.DataType:
+ """(Optional)"""
+ @property
+ def options(self) ->
google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]:
+ """Options could be empty for valid data source format.
+ The map key is case insensitive.
+ """
+ def __init__(
+ self,
+ *,
+ table_name: builtins.str = ...,
+ path: builtins.str | None = ...,
+ source: builtins.str | None = ...,
+ schema: pyspark.sql.connect.proto.types_pb2.DataType | None = ...,
+ options: collections.abc.Mapping[builtins.str, builtins.str] | None =
...,
+ ) -> None: ...
+ def HasField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_path",
+ b"_path",
+ "_schema",
+ b"_schema",
+ "_source",
+ b"_source",
+ "path",
+ b"path",
+ "schema",
+ b"schema",
+ "source",
+ b"source",
+ ],
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_path",
+ b"_path",
+ "_schema",
+ b"_schema",
+ "_source",
+ b"_source",
+ "options",
+ b"options",
+ "path",
+ b"path",
+ "schema",
+ b"schema",
+ "source",
+ b"source",
+ "table_name",
+ b"table_name",
+ ],
+ ) -> None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_path", b"_path"]
+ ) -> typing_extensions.Literal["path"] | None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_schema", b"_schema"]
+ ) -> typing_extensions.Literal["schema"] | None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_source", b"_source"]
+ ) -> typing_extensions.Literal["source"] | None: ...
+
+global___CreateExternalTable = CreateExternalTable
+
+class CreateTable(google.protobuf.message.Message):
+ """See `spark.catalog.createTable`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ class OptionsEntry(google.protobuf.message.Message):
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ KEY_FIELD_NUMBER: builtins.int
+ VALUE_FIELD_NUMBER: builtins.int
+ key: builtins.str
+ value: builtins.str
+ def __init__(
+ self,
+ *,
+ key: builtins.str = ...,
+ value: builtins.str = ...,
+ ) -> None: ...
+ def ClearField(
+ self, field_name: typing_extensions.Literal["key", b"key",
"value", b"value"]
+ ) -> None: ...
+
+ TABLE_NAME_FIELD_NUMBER: builtins.int
+ PATH_FIELD_NUMBER: builtins.int
+ SOURCE_FIELD_NUMBER: builtins.int
+ DESCRIPTION_FIELD_NUMBER: builtins.int
+ SCHEMA_FIELD_NUMBER: builtins.int
+ OPTIONS_FIELD_NUMBER: builtins.int
+ table_name: builtins.str
+ """(Required)"""
+ path: builtins.str
+ """(Optional)"""
+ source: builtins.str
+ """(Optional)"""
+ description: builtins.str
+ """(Optional)"""
+ @property
+ def schema(self) -> pyspark.sql.connect.proto.types_pb2.DataType:
+ """(Optional)"""
+ @property
+ def options(self) ->
google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]:
+ """Options could be empty for valid data source format.
+ The map key is case insensitive.
+ """
+ def __init__(
+ self,
+ *,
+ table_name: builtins.str = ...,
+ path: builtins.str | None = ...,
+ source: builtins.str | None = ...,
+ description: builtins.str | None = ...,
+ schema: pyspark.sql.connect.proto.types_pb2.DataType | None = ...,
+ options: collections.abc.Mapping[builtins.str, builtins.str] | None =
...,
+ ) -> None: ...
+ def HasField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_description",
+ b"_description",
+ "_path",
+ b"_path",
+ "_schema",
+ b"_schema",
+ "_source",
+ b"_source",
+ "description",
+ b"description",
+ "path",
+ b"path",
+ "schema",
+ b"schema",
+ "source",
+ b"source",
+ ],
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_description",
+ b"_description",
+ "_path",
+ b"_path",
+ "_schema",
+ b"_schema",
+ "_source",
+ b"_source",
+ "description",
+ b"description",
+ "options",
+ b"options",
+ "path",
+ b"path",
+ "schema",
+ b"schema",
+ "source",
+ b"source",
+ "table_name",
+ b"table_name",
+ ],
+ ) -> None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_description",
b"_description"]
+ ) -> typing_extensions.Literal["description"] | None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_path", b"_path"]
+ ) -> typing_extensions.Literal["path"] | None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_schema", b"_schema"]
+ ) -> typing_extensions.Literal["schema"] | None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_source", b"_source"]
+ ) -> typing_extensions.Literal["source"] | None: ...
+
+global___CreateTable = CreateTable
+
+class DropTempView(google.protobuf.message.Message):
+ """See `spark.catalog.dropTempView`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ VIEW_NAME_FIELD_NUMBER: builtins.int
+ view_name: builtins.str
+ """(Required)"""
+ def __init__(
+ self,
+ *,
+ view_name: builtins.str = ...,
+ ) -> None: ...
+ def ClearField(
+ self, field_name: typing_extensions.Literal["view_name", b"view_name"]
+ ) -> None: ...
+
+global___DropTempView = DropTempView
+
+class DropGlobalTempView(google.protobuf.message.Message):
+ """See `spark.catalog.dropGlobalTempView`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ VIEW_NAME_FIELD_NUMBER: builtins.int
+ view_name: builtins.str
+ """(Required)"""
+ def __init__(
+ self,
+ *,
+ view_name: builtins.str = ...,
+ ) -> None: ...
+ def ClearField(
+ self, field_name: typing_extensions.Literal["view_name", b"view_name"]
+ ) -> None: ...
+
+global___DropGlobalTempView = DropGlobalTempView
+
+class RecoverPartitions(google.protobuf.message.Message):
+ """See `spark.catalog.recoverPartitions`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ TABLE_NAME_FIELD_NUMBER: builtins.int
+ table_name: builtins.str
+ """(Required)"""
+ def __init__(
+ self,
+ *,
+ table_name: builtins.str = ...,
+ ) -> None: ...
+ def ClearField(
+ self, field_name: typing_extensions.Literal["table_name",
b"table_name"]
+ ) -> None: ...
+
+global___RecoverPartitions = RecoverPartitions
+
+class ClearCache(google.protobuf.message.Message):
+ """TODO(SPARK-41612): Support Catalog.isCached
+ // See `spark.catalog.isCached`
+ message IsCached {
+ // (Required)
+ string table_name = 1;
+ }
+
+ TODO(SPARK-41600): Support Catalog.cacheTable
+ // See `spark.catalog.cacheTable`
+ message CacheTable {
+ // (Required)
+ string table_name = 1;
+ }
+
+ TODO(SPARK-41623): Support Catalog.uncacheTable
+ // See `spark.catalog.uncacheTable`
+ message UncacheTable {
+ // (Required)
+ string table_name = 1;
+ }
+
+ See `spark.catalog.clearCache`
+ """
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ def __init__(
+ self,
+ ) -> None: ...
+
+global___ClearCache = ClearCache
+
+class RefreshTable(google.protobuf.message.Message):
+ """See `spark.catalog.refreshTable`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ TABLE_NAME_FIELD_NUMBER: builtins.int
+ table_name: builtins.str
+ """(Required)"""
+ def __init__(
+ self,
+ *,
+ table_name: builtins.str = ...,
+ ) -> None: ...
+ def ClearField(
+ self, field_name: typing_extensions.Literal["table_name",
b"table_name"]
+ ) -> None: ...
+
+global___RefreshTable = RefreshTable
+
+class RefreshByPath(google.protobuf.message.Message):
+ """See `spark.catalog.refreshByPath`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ PATH_FIELD_NUMBER: builtins.int
+ path: builtins.str
+ """(Required)"""
+ def __init__(
+ self,
+ *,
+ path: builtins.str = ...,
+ ) -> None: ...
+ def ClearField(self, field_name: typing_extensions.Literal["path",
b"path"]) -> None: ...
+
+global___RefreshByPath = RefreshByPath
+
+class CurrentCatalog(google.protobuf.message.Message):
+ """See `spark.catalog.currentCatalog`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ def __init__(
+ self,
+ ) -> None: ...
+
+global___CurrentCatalog = CurrentCatalog
+
+class SetCurrentCatalog(google.protobuf.message.Message):
+ """See `spark.catalog.setCurrentCatalog`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ CATALOG_NAME_FIELD_NUMBER: builtins.int
+ catalog_name: builtins.str
+ """(Required)"""
+ def __init__(
+ self,
+ *,
+ catalog_name: builtins.str = ...,
+ ) -> None: ...
+ def ClearField(
+ self, field_name: typing_extensions.Literal["catalog_name",
b"catalog_name"]
+ ) -> None: ...
+
+global___SetCurrentCatalog = SetCurrentCatalog
+
+class ListCatalogs(google.protobuf.message.Message):
+ """See `spark.catalog.listCatalogs`"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ def __init__(
+ self,
+ ) -> None: ...
+
+global___ListCatalogs = ListCatalogs
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py
b/python/pyspark/sql/connect/proto/relations_pb2.py
index 6c9f9552e06..576a9b7ba1e 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.py
+++ b/python/pyspark/sql/connect/proto/relations_pb2.py
@@ -31,10 +31,11 @@ _sym_db = _symbol_database.Default()
from pyspark.sql.connect.proto import expressions_pb2 as
spark_dot_connect_dot_expressions__pb2
from pyspark.sql.connect.proto import types_pb2 as
spark_dot_connect_dot_types__pb2
+from pyspark.sql.connect.proto import catalog_pb2 as
spark_dot_connect_dot_catalog__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto"\xf7\x0e\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66ilter\x18\x04
\x01(\x0b\x32\x15.spark.connect.FilterH\x00R\x06\x66ilte [...]
+
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\x85\x1c\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66ilter\x18\x04
\x01(\x0b\x32\x15.spa [...]
)
@@ -525,90 +526,90 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_READ_DATASOURCE_OPTIONSENTRY._serialized_options = b"8\001"
_RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._options = None
_RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_options =
b"8\001"
- _RELATION._serialized_start = 109
- _RELATION._serialized_end = 2020
- _UNKNOWN._serialized_start = 2022
- _UNKNOWN._serialized_end = 2031
- _RELATIONCOMMON._serialized_start = 2033
- _RELATIONCOMMON._serialized_end = 2082
- _SQL._serialized_start = 2084
- _SQL._serialized_end = 2111
- _READ._serialized_start = 2114
- _READ._serialized_end = 2540
- _READ_NAMEDTABLE._serialized_start = 2256
- _READ_NAMEDTABLE._serialized_end = 2317
- _READ_DATASOURCE._serialized_start = 2320
- _READ_DATASOURCE._serialized_end = 2527
- _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 2458
- _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 2516
- _PROJECT._serialized_start = 2542
- _PROJECT._serialized_end = 2659
- _FILTER._serialized_start = 2661
- _FILTER._serialized_end = 2773
- _JOIN._serialized_start = 2776
- _JOIN._serialized_end = 3247
- _JOIN_JOINTYPE._serialized_start = 3039
- _JOIN_JOINTYPE._serialized_end = 3247
- _SETOPERATION._serialized_start = 3250
- _SETOPERATION._serialized_end = 3646
- _SETOPERATION_SETOPTYPE._serialized_start = 3509
- _SETOPERATION_SETOPTYPE._serialized_end = 3623
- _LIMIT._serialized_start = 3648
- _LIMIT._serialized_end = 3724
- _OFFSET._serialized_start = 3726
- _OFFSET._serialized_end = 3805
- _TAIL._serialized_start = 3807
- _TAIL._serialized_end = 3882
- _AGGREGATE._serialized_start = 3885
- _AGGREGATE._serialized_end = 4467
- _AGGREGATE_PIVOT._serialized_start = 4224
- _AGGREGATE_PIVOT._serialized_end = 4335
- _AGGREGATE_GROUPTYPE._serialized_start = 4338
- _AGGREGATE_GROUPTYPE._serialized_end = 4467
- _SORT._serialized_start = 4470
- _SORT._serialized_end = 4630
- _DROP._serialized_start = 4632
- _DROP._serialized_end = 4732
- _DEDUPLICATE._serialized_start = 4735
- _DEDUPLICATE._serialized_end = 4906
- _LOCALRELATION._serialized_start = 4909
- _LOCALRELATION._serialized_end = 5046
- _SAMPLE._serialized_start = 5049
- _SAMPLE._serialized_end = 5344
- _RANGE._serialized_start = 5347
- _RANGE._serialized_end = 5492
- _SUBQUERYALIAS._serialized_start = 5494
- _SUBQUERYALIAS._serialized_end = 5608
- _REPARTITION._serialized_start = 5611
- _REPARTITION._serialized_end = 5753
- _SHOWSTRING._serialized_start = 5756
- _SHOWSTRING._serialized_end = 5898
- _STATSUMMARY._serialized_start = 5900
- _STATSUMMARY._serialized_end = 5992
- _STATDESCRIBE._serialized_start = 5994
- _STATDESCRIBE._serialized_end = 6075
- _STATCROSSTAB._serialized_start = 6077
- _STATCROSSTAB._serialized_end = 6178
- _NAFILL._serialized_start = 6181
- _NAFILL._serialized_end = 6315
- _NADROP._serialized_start = 6318
- _NADROP._serialized_end = 6452
- _NAREPLACE._serialized_start = 6455
- _NAREPLACE._serialized_end = 6751
- _NAREPLACE_REPLACEMENT._serialized_start = 6610
- _NAREPLACE_REPLACEMENT._serialized_end = 6751
- _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_start = 6753
- _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_end = 6867
- _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_start = 6870
- _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_end = 7129
- _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_start =
7062
- _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_end = 7129
- _WITHCOLUMNS._serialized_start = 7132
- _WITHCOLUMNS._serialized_end = 7263
- _HINT._serialized_start = 7266
- _HINT._serialized_end = 7406
- _UNPIVOT._serialized_start = 7409
- _UNPIVOT._serialized_end = 7655
- _TOSCHEMA._serialized_start = 7657
- _TOSCHEMA._serialized_end = 7763
+ _RELATION._serialized_start = 138
+ _RELATION._serialized_end = 3727
+ _UNKNOWN._serialized_start = 3729
+ _UNKNOWN._serialized_end = 3738
+ _RELATIONCOMMON._serialized_start = 3740
+ _RELATIONCOMMON._serialized_end = 3789
+ _SQL._serialized_start = 3791
+ _SQL._serialized_end = 3818
+ _READ._serialized_start = 3821
+ _READ._serialized_end = 4247
+ _READ_NAMEDTABLE._serialized_start = 3963
+ _READ_NAMEDTABLE._serialized_end = 4024
+ _READ_DATASOURCE._serialized_start = 4027
+ _READ_DATASOURCE._serialized_end = 4234
+ _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 4165
+ _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 4223
+ _PROJECT._serialized_start = 4249
+ _PROJECT._serialized_end = 4366
+ _FILTER._serialized_start = 4368
+ _FILTER._serialized_end = 4480
+ _JOIN._serialized_start = 4483
+ _JOIN._serialized_end = 4954
+ _JOIN_JOINTYPE._serialized_start = 4746
+ _JOIN_JOINTYPE._serialized_end = 4954
+ _SETOPERATION._serialized_start = 4957
+ _SETOPERATION._serialized_end = 5353
+ _SETOPERATION_SETOPTYPE._serialized_start = 5216
+ _SETOPERATION_SETOPTYPE._serialized_end = 5330
+ _LIMIT._serialized_start = 5355
+ _LIMIT._serialized_end = 5431
+ _OFFSET._serialized_start = 5433
+ _OFFSET._serialized_end = 5512
+ _TAIL._serialized_start = 5514
+ _TAIL._serialized_end = 5589
+ _AGGREGATE._serialized_start = 5592
+ _AGGREGATE._serialized_end = 6174
+ _AGGREGATE_PIVOT._serialized_start = 5931
+ _AGGREGATE_PIVOT._serialized_end = 6042
+ _AGGREGATE_GROUPTYPE._serialized_start = 6045
+ _AGGREGATE_GROUPTYPE._serialized_end = 6174
+ _SORT._serialized_start = 6177
+ _SORT._serialized_end = 6337
+ _DROP._serialized_start = 6339
+ _DROP._serialized_end = 6439
+ _DEDUPLICATE._serialized_start = 6442
+ _DEDUPLICATE._serialized_end = 6613
+ _LOCALRELATION._serialized_start = 6616
+ _LOCALRELATION._serialized_end = 6753
+ _SAMPLE._serialized_start = 6756
+ _SAMPLE._serialized_end = 7051
+ _RANGE._serialized_start = 7054
+ _RANGE._serialized_end = 7199
+ _SUBQUERYALIAS._serialized_start = 7201
+ _SUBQUERYALIAS._serialized_end = 7315
+ _REPARTITION._serialized_start = 7318
+ _REPARTITION._serialized_end = 7460
+ _SHOWSTRING._serialized_start = 7463
+ _SHOWSTRING._serialized_end = 7605
+ _STATSUMMARY._serialized_start = 7607
+ _STATSUMMARY._serialized_end = 7699
+ _STATDESCRIBE._serialized_start = 7701
+ _STATDESCRIBE._serialized_end = 7782
+ _STATCROSSTAB._serialized_start = 7784
+ _STATCROSSTAB._serialized_end = 7885
+ _NAFILL._serialized_start = 7888
+ _NAFILL._serialized_end = 8022
+ _NADROP._serialized_start = 8025
+ _NADROP._serialized_end = 8159
+ _NAREPLACE._serialized_start = 8162
+ _NAREPLACE._serialized_end = 8458
+ _NAREPLACE_REPLACEMENT._serialized_start = 8317
+ _NAREPLACE_REPLACEMENT._serialized_end = 8458
+ _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_start = 8460
+ _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_end = 8574
+ _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_start = 8577
+ _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_end = 8836
+ _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_start =
8769
+ _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_end = 8836
+ _WITHCOLUMNS._serialized_start = 8839
+ _WITHCOLUMNS._serialized_end = 8970
+ _HINT._serialized_start = 8973
+ _HINT._serialized_end = 9113
+ _UNPIVOT._serialized_start = 9116
+ _UNPIVOT._serialized_end = 9362
+ _TOSCHEMA._serialized_start = 9364
+ _TOSCHEMA._serialized_end = 9470
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi
b/python/pyspark/sql/connect/proto/relations_pb2.pyi
index 57f1177d885..22a0c778a3b 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi
@@ -39,6 +39,7 @@ import google.protobuf.descriptor
import google.protobuf.internal.containers
import google.protobuf.internal.enum_type_wrapper
import google.protobuf.message
+import pyspark.sql.connect.proto.catalog_pb2
import pyspark.sql.connect.proto.expressions_pb2
import pyspark.sql.connect.proto.types_pb2
import sys
@@ -92,6 +93,29 @@ class Relation(google.protobuf.message.Message):
SUMMARY_FIELD_NUMBER: builtins.int
CROSSTAB_FIELD_NUMBER: builtins.int
DESCRIBE_FIELD_NUMBER: builtins.int
+ CURRENT_DATABASE_FIELD_NUMBER: builtins.int
+ SET_CURRENT_DATABASE_FIELD_NUMBER: builtins.int
+ LIST_DATABASES_FIELD_NUMBER: builtins.int
+ LIST_TABLES_FIELD_NUMBER: builtins.int
+ LIST_FUNCTIONS_FIELD_NUMBER: builtins.int
+ LIST_COLUMNS_FIELD_NUMBER: builtins.int
+ GET_DATABASE_FIELD_NUMBER: builtins.int
+ GET_TABLE_FIELD_NUMBER: builtins.int
+ GET_FUNCTION_FIELD_NUMBER: builtins.int
+ DATABASE_EXISTS_FIELD_NUMBER: builtins.int
+ TABLE_EXISTS_FIELD_NUMBER: builtins.int
+ FUNCTION_EXISTS_FIELD_NUMBER: builtins.int
+ CREATE_EXTERNAL_TABLE_FIELD_NUMBER: builtins.int
+ CREATE_TABLE_FIELD_NUMBER: builtins.int
+ DROP_TEMP_VIEW_FIELD_NUMBER: builtins.int
+ DROP_GLOBAL_TEMP_VIEW_FIELD_NUMBER: builtins.int
+ RECOVER_PARTITIONS_FIELD_NUMBER: builtins.int
+ CLEAR_CACHE_FIELD_NUMBER: builtins.int
+ REFRESH_TABLE_FIELD_NUMBER: builtins.int
+ REFRESH_BY_PATH_FIELD_NUMBER: builtins.int
+ CURRENT_CATALOG_FIELD_NUMBER: builtins.int
+ SET_CURRENT_CATALOG_FIELD_NUMBER: builtins.int
+ LIST_CATALOGS_FIELD_NUMBER: builtins.int
UNKNOWN_FIELD_NUMBER: builtins.int
@property
def common(self) -> global___RelationCommon: ...
@@ -160,6 +184,62 @@ class Relation(google.protobuf.message.Message):
@property
def describe(self) -> global___StatDescribe: ...
@property
+ def current_database(self) ->
pyspark.sql.connect.proto.catalog_pb2.CurrentDatabase:
+ """Catalog API (internal-only)"""
+ @property
+ def set_current_database(self) ->
pyspark.sql.connect.proto.catalog_pb2.SetCurrentDatabase: ...
+ @property
+ def list_databases(self) ->
pyspark.sql.connect.proto.catalog_pb2.ListDatabases: ...
+ @property
+ def list_tables(self) -> pyspark.sql.connect.proto.catalog_pb2.ListTables:
...
+ @property
+ def list_functions(self) ->
pyspark.sql.connect.proto.catalog_pb2.ListFunctions: ...
+ @property
+ def list_columns(self) ->
pyspark.sql.connect.proto.catalog_pb2.ListColumns: ...
+ @property
+ def get_database(self) ->
pyspark.sql.connect.proto.catalog_pb2.GetDatabase: ...
+ @property
+ def get_table(self) -> pyspark.sql.connect.proto.catalog_pb2.GetTable: ...
+ @property
+ def get_function(self) ->
pyspark.sql.connect.proto.catalog_pb2.GetFunction: ...
+ @property
+ def database_exists(self) ->
pyspark.sql.connect.proto.catalog_pb2.DatabaseExists: ...
+ @property
+ def table_exists(self) ->
pyspark.sql.connect.proto.catalog_pb2.TableExists: ...
+ @property
+ def function_exists(self) ->
pyspark.sql.connect.proto.catalog_pb2.FunctionExists: ...
+ @property
+ def create_external_table(
+ self,
+ ) -> pyspark.sql.connect.proto.catalog_pb2.CreateExternalTable: ...
+ @property
+ def create_table(self) ->
pyspark.sql.connect.proto.catalog_pb2.CreateTable: ...
+ @property
+ def drop_temp_view(self) ->
pyspark.sql.connect.proto.catalog_pb2.DropTempView: ...
+ @property
+ def drop_global_temp_view(self) ->
pyspark.sql.connect.proto.catalog_pb2.DropGlobalTempView: ...
+ @property
+ def recover_partitions(self) ->
pyspark.sql.connect.proto.catalog_pb2.RecoverPartitions: ...
+ @property
+ def clear_cache(self) -> pyspark.sql.connect.proto.catalog_pb2.ClearCache:
+ """TODO(SPARK-41612): Support Catalog.isCached
+ IsCached is_cached = 218;
+ TODO(SPARK-41600): Support Catalog.cacheTable
+ CacheTable cache_table = 219;
+ TODO(SPARK-41623): Support Catalog.uncacheTable
+ UncacheTable uncache_table = 220;
+ """
+ @property
+ def refresh_table(self) ->
pyspark.sql.connect.proto.catalog_pb2.RefreshTable: ...
+ @property
+ def refresh_by_path(self) ->
pyspark.sql.connect.proto.catalog_pb2.RefreshByPath: ...
+ @property
+ def current_catalog(self) ->
pyspark.sql.connect.proto.catalog_pb2.CurrentCatalog: ...
+ @property
+ def set_current_catalog(self) ->
pyspark.sql.connect.proto.catalog_pb2.SetCurrentCatalog: ...
+ @property
+ def list_catalogs(self) ->
pyspark.sql.connect.proto.catalog_pb2.ListCatalogs: ...
+ @property
def unknown(self) -> global___Unknown: ...
def __init__(
self,
@@ -196,6 +276,31 @@ class Relation(google.protobuf.message.Message):
summary: global___StatSummary | None = ...,
crosstab: global___StatCrosstab | None = ...,
describe: global___StatDescribe | None = ...,
+ current_database:
pyspark.sql.connect.proto.catalog_pb2.CurrentDatabase | None = ...,
+ set_current_database:
pyspark.sql.connect.proto.catalog_pb2.SetCurrentDatabase | None = ...,
+ list_databases: pyspark.sql.connect.proto.catalog_pb2.ListDatabases |
None = ...,
+ list_tables: pyspark.sql.connect.proto.catalog_pb2.ListTables | None =
...,
+ list_functions: pyspark.sql.connect.proto.catalog_pb2.ListFunctions |
None = ...,
+ list_columns: pyspark.sql.connect.proto.catalog_pb2.ListColumns | None
= ...,
+ get_database: pyspark.sql.connect.proto.catalog_pb2.GetDatabase | None
= ...,
+ get_table: pyspark.sql.connect.proto.catalog_pb2.GetTable | None = ...,
+ get_function: pyspark.sql.connect.proto.catalog_pb2.GetFunction | None
= ...,
+ database_exists: pyspark.sql.connect.proto.catalog_pb2.DatabaseExists
| None = ...,
+ table_exists: pyspark.sql.connect.proto.catalog_pb2.TableExists | None
= ...,
+ function_exists: pyspark.sql.connect.proto.catalog_pb2.FunctionExists
| None = ...,
+ create_external_table:
pyspark.sql.connect.proto.catalog_pb2.CreateExternalTable
+ | None = ...,
+ create_table: pyspark.sql.connect.proto.catalog_pb2.CreateTable | None
= ...,
+ drop_temp_view: pyspark.sql.connect.proto.catalog_pb2.DropTempView |
None = ...,
+ drop_global_temp_view:
pyspark.sql.connect.proto.catalog_pb2.DropGlobalTempView
+ | None = ...,
+ recover_partitions:
pyspark.sql.connect.proto.catalog_pb2.RecoverPartitions | None = ...,
+ clear_cache: pyspark.sql.connect.proto.catalog_pb2.ClearCache | None =
...,
+ refresh_table: pyspark.sql.connect.proto.catalog_pb2.RefreshTable |
None = ...,
+ refresh_by_path: pyspark.sql.connect.proto.catalog_pb2.RefreshByPath |
None = ...,
+ current_catalog: pyspark.sql.connect.proto.catalog_pb2.CurrentCatalog
| None = ...,
+ set_current_catalog:
pyspark.sql.connect.proto.catalog_pb2.SetCurrentCatalog | None = ...,
+ list_catalogs: pyspark.sql.connect.proto.catalog_pb2.ListCatalogs |
None = ...,
unknown: global___Unknown | None = ...,
) -> None: ...
def HasField(
@@ -203,28 +308,62 @@ class Relation(google.protobuf.message.Message):
field_name: typing_extensions.Literal[
"aggregate",
b"aggregate",
+ "clear_cache",
+ b"clear_cache",
"common",
b"common",
+ "create_external_table",
+ b"create_external_table",
+ "create_table",
+ b"create_table",
"crosstab",
b"crosstab",
+ "current_catalog",
+ b"current_catalog",
+ "current_database",
+ b"current_database",
+ "database_exists",
+ b"database_exists",
"deduplicate",
b"deduplicate",
"describe",
b"describe",
"drop",
b"drop",
+ "drop_global_temp_view",
+ b"drop_global_temp_view",
"drop_na",
b"drop_na",
+ "drop_temp_view",
+ b"drop_temp_view",
"fill_na",
b"fill_na",
"filter",
b"filter",
+ "function_exists",
+ b"function_exists",
+ "get_database",
+ b"get_database",
+ "get_function",
+ b"get_function",
+ "get_table",
+ b"get_table",
"hint",
b"hint",
"join",
b"join",
"limit",
b"limit",
+ "list_catalogs",
+ b"list_catalogs",
+ "list_columns",
+ b"list_columns",
+ "list_databases",
+ b"list_databases",
+ "list_functions",
+ b"list_functions",
+ "list_tables",
+ b"list_tables",
"local_relation",
b"local_relation",
"offset",
@@ -235,6 +374,12 @@ class Relation(google.protobuf.message.Message):
b"range",
"read",
b"read",
+ "recover_partitions",
+ b"recover_partitions",
+ "refresh_by_path",
+ b"refresh_by_path",
+ "refresh_table",
+ b"refresh_table",
"rel_type",
b"rel_type",
"rename_columns_by_name_to_name_map",
@@ -247,6 +392,10 @@ class Relation(google.protobuf.message.Message):
b"replace",
"sample",
b"sample",
+ "set_current_catalog",
+ b"set_current_catalog",
+ "set_current_database",
+ b"set_current_database",
"set_op",
b"set_op",
"show_string",
@@ -259,6 +408,8 @@ class Relation(google.protobuf.message.Message):
b"subquery_alias",
"summary",
b"summary",
+ "table_exists",
+ b"table_exists",
"tail",
b"tail",
"to_schema",
@@ -276,28 +427,62 @@ class Relation(google.protobuf.message.Message):
field_name: typing_extensions.Literal[
"aggregate",
b"aggregate",
+ "clear_cache",
+ b"clear_cache",
"common",
b"common",
+ "create_external_table",
+ b"create_external_table",
+ "create_table",
+ b"create_table",
"crosstab",
b"crosstab",
+ "current_catalog",
+ b"current_catalog",
+ "current_database",
+ b"current_database",
+ "database_exists",
+ b"database_exists",
"deduplicate",
b"deduplicate",
"describe",
b"describe",
"drop",
b"drop",
+ "drop_global_temp_view",
+ b"drop_global_temp_view",
"drop_na",
b"drop_na",
+ "drop_temp_view",
+ b"drop_temp_view",
"fill_na",
b"fill_na",
"filter",
b"filter",
+ "function_exists",
+ b"function_exists",
+ "get_database",
+ b"get_database",
+ "get_function",
+ b"get_function",
+ "get_table",
+ b"get_table",
"hint",
b"hint",
"join",
b"join",
"limit",
b"limit",
+ "list_catalogs",
+ b"list_catalogs",
+ "list_columns",
+ b"list_columns",
+ "list_databases",
+ b"list_databases",
+ "list_functions",
+ b"list_functions",
+ "list_tables",
+ b"list_tables",
"local_relation",
b"local_relation",
"offset",
@@ -308,6 +493,12 @@ class Relation(google.protobuf.message.Message):
b"range",
"read",
b"read",
+ "recover_partitions",
+ b"recover_partitions",
+ "refresh_by_path",
+ b"refresh_by_path",
+ "refresh_table",
+ b"refresh_table",
"rel_type",
b"rel_type",
"rename_columns_by_name_to_name_map",
@@ -320,6 +511,10 @@ class Relation(google.protobuf.message.Message):
b"replace",
"sample",
b"sample",
+ "set_current_catalog",
+ b"set_current_catalog",
+ "set_current_database",
+ b"set_current_database",
"set_op",
b"set_op",
"show_string",
@@ -332,6 +527,8 @@ class Relation(google.protobuf.message.Message):
b"subquery_alias",
"summary",
b"summary",
+ "table_exists",
+ b"table_exists",
"tail",
b"tail",
"to_schema",
@@ -378,6 +575,29 @@ class Relation(google.protobuf.message.Message):
"summary",
"crosstab",
"describe",
+ "current_database",
+ "set_current_database",
+ "list_databases",
+ "list_tables",
+ "list_functions",
+ "list_columns",
+ "get_database",
+ "get_table",
+ "get_function",
+ "database_exists",
+ "table_exists",
+ "function_exists",
+ "create_external_table",
+ "create_table",
+ "drop_temp_view",
+ "drop_global_temp_view",
+ "recover_partitions",
+ "clear_cache",
+ "refresh_table",
+ "refresh_by_path",
+ "current_catalog",
+ "set_current_catalog",
+ "list_catalogs",
"unknown",
] | None: ...
diff --git a/python/pyspark/sql/connect/session.py
b/python/pyspark/sql/connect/session.py
index f003a1244db..a220e3824ea 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -46,6 +46,7 @@ from typing import (
if TYPE_CHECKING:
from pyspark.sql.connect._typing import OptionalPrimitiveType
+ from pyspark.sql.connect.catalog import Catalog
class SparkSession(object):
@@ -120,6 +121,11 @@ class SparkSession(object):
# Parse the connection string.
self._client = SparkConnectClient(connectionString)
+ def table(self, tableName: str) -> DataFrame:
+ return self.read.table(tableName)
+
+ table.__doc__ = PySparkSession.table.__doc__
+
@property
def read(self) -> "DataFrameReader":
return DataFrameReader(self)
@@ -224,6 +230,16 @@ class SparkSession(object):
range.__doc__ = PySparkSession.__doc__
+ @property
+ def catalog(self) -> "Catalog":
+ from pyspark.sql.connect.catalog import Catalog
+
+ if not hasattr(self, "_catalog"):
+ self._catalog = Catalog(self)
+ return self._catalog
+
+ catalog.__doc__ = PySparkSession.__doc__
+
# SparkConnect-specific API
@property
def client(self) -> "SparkConnectClient":
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index de93b34a272..7ecb7e61fb6 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -666,6 +666,9 @@ class SparkSession(SparkConversionMixin):
.. versionadded:: 2.0.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Returns
-------
:class:`Catalog`
@@ -1411,6 +1414,9 @@ class SparkSession(SparkConversionMixin):
.. versionadded:: 2.0.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
tableName : str
diff --git a/python/pyspark/sql/tests/connect/test_parity_catalog.py
b/python/pyspark/sql/tests/connect/test_parity_catalog.py
new file mode 100644
index 00000000000..b832c5e1ef9
--- /dev/null
+++ b/python/pyspark/sql/tests/connect/test_parity_catalog.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import os
+
+from pyspark.sql import SparkSession
+from pyspark.sql.tests.test_catalog import CatalogTestsMixin
+from pyspark.testing.connectutils import should_test_connect,
connect_requirement_message
+from pyspark.testing.sqlutils import ReusedSQLTestCase
+
+
[email protected](not should_test_connect, connect_requirement_message)
+class CatalogParityTests(CatalogTestsMixin, ReusedSQLTestCase):
+ @classmethod
+ def setUpClass(cls):
+ super(CatalogParityTests, cls).setUpClass()
+ cls._spark = cls.spark # Assign existing Spark session to run the
server
+ # Sets the remote address. Now, we create a remote Spark Session.
+ # Note that this is only allowed in testing.
+ os.environ["SPARK_REMOTE"] = "sc://localhost"
+ cls.spark = SparkSession.builder.remote("sc://localhost").getOrCreate()
+
+ @classmethod
+ def tearDownClass(cls):
+ # TODO(SPARK-41529): Implement stop in RemoteSparkSession.
+ # Stop the regular Spark session (server) too.
+ cls.spark = cls._spark
+ super(CatalogParityTests, cls).tearDownClass()
+ del os.environ["SPARK_REMOTE"]
+
+ # TODO(SPARK-41612): Support Catalog.isCached
+ # TODO(SPARK-41600): Support Catalog.cacheTable
+ # TODO(SPARK-41623): Support Catalog.uncacheTable
+ @unittest.skip("Fails in Spark Connect, should enable.")
+ def test_table_cache(self):
+ super().test_table_cache()
+
+ # TODO(SPARK-41600): Support Catalog.cacheTable
+ @unittest.skip("Fails in Spark Connect, should enable.")
+ def test_refresh_table(self):
+ super().test_refresh_table()
+
+
+if __name__ == "__main__":
+ import unittest
+ from pyspark.sql.tests.connect.test_parity_catalog import * # noqa: F401
+
+ try:
+ import xmlrunner # type: ignore[import]
+
+ testRunner = xmlrunner.XMLTestRunner(output="target/test-reports",
verbosity=2)
+ except ImportError:
+ testRunner = None
+ unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/sql/tests/test_catalog.py
b/python/pyspark/sql/tests/test_catalog.py
index 2eccfab72fd..d60d53e5877 100644
--- a/python/pyspark/sql/tests/test_catalog.py
+++ b/python/pyspark/sql/tests/test_catalog.py
@@ -16,20 +16,21 @@
#
from pyspark.sql.types import StructType, StructField, IntegerType
-from pyspark.sql.utils import AnalysisException
from pyspark.testing.sqlutils import ReusedSQLTestCase
-class CatalogTests(ReusedSQLTestCase):
+class CatalogTestsMixin:
def test_current_database(self):
spark = self.spark
with self.database("some_db"):
self.assertEqual(spark.catalog.currentDatabase(), "default")
- spark.sql("CREATE DATABASE some_db")
+ spark.sql("CREATE DATABASE some_db").collect()
spark.catalog.setCurrentDatabase("some_db")
self.assertEqual(spark.catalog.currentDatabase(), "some_db")
self.assertRaisesRegex(
- AnalysisException,
+ # TODO(SPARK-41715): Should catch specific exceptions for both
+ # Spark Connect and PySpark
+ Exception,
"does_not_exist",
lambda: spark.catalog.setCurrentDatabase("does_not_exist"),
)
@@ -39,7 +40,7 @@ class CatalogTests(ReusedSQLTestCase):
with self.database("some_db"):
databases = [db.name for db in spark.catalog.listDatabases()]
self.assertEqual(databases, ["default"])
- spark.sql("CREATE DATABASE some_db")
+ spark.sql("CREATE DATABASE some_db").collect()
databases = [db.name for db in spark.catalog.listDatabases()]
self.assertEqual(sorted(databases), ["default", "some_db"])
@@ -48,7 +49,7 @@ class CatalogTests(ReusedSQLTestCase):
spark = self.spark
with self.database("some_db"):
self.assertFalse(spark.catalog.databaseExists("some_db"))
- spark.sql("CREATE DATABASE some_db")
+ spark.sql("CREATE DATABASE some_db").collect()
self.assertTrue(spark.catalog.databaseExists("some_db"))
self.assertTrue(spark.catalog.databaseExists("spark_catalog.some_db"))
self.assertFalse(spark.catalog.databaseExists("spark_catalog.some_db2"))
@@ -56,7 +57,7 @@ class CatalogTests(ReusedSQLTestCase):
def test_get_database(self):
spark = self.spark
with self.database("some_db"):
- spark.sql("CREATE DATABASE some_db")
+ spark.sql("CREATE DATABASE some_db").collect()
db = spark.catalog.getDatabase("spark_catalog.some_db")
self.assertEqual(db.name, "some_db")
self.assertEqual(db.catalog, "spark_catalog")
@@ -66,14 +67,16 @@ class CatalogTests(ReusedSQLTestCase):
spark = self.spark
with self.database("some_db"):
- spark.sql("CREATE DATABASE some_db")
+ spark.sql("CREATE DATABASE some_db").collect()
with self.table("tab1", "some_db.tab2", "tab3_via_catalog"):
with self.tempView("temp_tab"):
self.assertEqual(spark.catalog.listTables(), [])
self.assertEqual(spark.catalog.listTables("some_db"), [])
spark.createDataFrame([(1,
1)]).createOrReplaceTempView("temp_tab")
- spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING
parquet")
- spark.sql("CREATE TABLE some_db.tab2 (name STRING, age
INT) USING parquet")
+ spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING
parquet").collect()
+ spark.sql(
+ "CREATE TABLE some_db.tab2 (name STRING, age INT)
USING parquet"
+ ).collect()
schema = StructType([StructField("a", IntegerType(),
True)])
description = "this a table created via
Catalog.createTable()"
@@ -178,7 +181,7 @@ class CatalogTests(ReusedSQLTestCase):
)
)
self.assertRaisesRegex(
- AnalysisException,
+ Exception,
"does_not_exist",
lambda: spark.catalog.listTables("does_not_exist"),
)
@@ -186,7 +189,7 @@ class CatalogTests(ReusedSQLTestCase):
def test_list_functions(self):
spark = self.spark
with self.database("some_db"):
- spark.sql("CREATE DATABASE some_db")
+ spark.sql("CREATE DATABASE some_db").collect()
functions = dict((f.name, f) for f in
spark.catalog.listFunctions())
functionsDefault = dict((f.name, f) for f in
spark.catalog.listFunctions("default"))
self.assertTrue(len(functions) > 200)
@@ -206,23 +209,28 @@ class CatalogTests(ReusedSQLTestCase):
self.assertEqual(functions, functionsDefault)
with self.function("func1", "some_db.func2"):
- spark.udf.register("temp_func", lambda x: str(x))
- spark.sql("CREATE FUNCTION func1 AS
'org.apache.spark.data.bricks'")
- spark.sql("CREATE FUNCTION some_db.func2 AS
'org.apache.spark.data.bricks'")
+ if hasattr(spark, "udf"):
+ spark.udf.register("temp_func", lambda x: str(x))
+ spark.sql("CREATE FUNCTION func1 AS
'org.apache.spark.data.bricks'").collect()
+ spark.sql(
+ "CREATE FUNCTION some_db.func2 AS
'org.apache.spark.data.bricks'"
+ ).collect()
newFunctions = dict((f.name, f) for f in
spark.catalog.listFunctions())
newFunctionsSomeDb = dict(
(f.name, f) for f in spark.catalog.listFunctions("some_db")
)
self.assertTrue(set(functions).issubset(set(newFunctions)))
self.assertTrue(set(functions).issubset(set(newFunctionsSomeDb)))
- self.assertTrue("temp_func" in newFunctions)
+ if hasattr(spark, "udf"):
+ self.assertTrue("temp_func" in newFunctions)
self.assertTrue("func1" in newFunctions)
self.assertTrue("func2" not in newFunctions)
- self.assertTrue("temp_func" in newFunctionsSomeDb)
+ if hasattr(spark, "udf"):
+ self.assertTrue("temp_func" in newFunctionsSomeDb)
self.assertTrue("func1" not in newFunctionsSomeDb)
self.assertTrue("func2" in newFunctionsSomeDb)
self.assertRaisesRegex(
- AnalysisException,
+ Exception,
"does_not_exist",
lambda: spark.catalog.listFunctions("does_not_exist"),
)
@@ -235,7 +243,7 @@ class CatalogTests(ReusedSQLTestCase):
self.assertFalse(spark.catalog.functionExists("default.func1"))
self.assertFalse(spark.catalog.functionExists("spark_catalog.default.func1"))
self.assertFalse(spark.catalog.functionExists("func1", "default"))
- spark.sql("CREATE FUNCTION func1 AS
'org.apache.spark.data.bricks'")
+ spark.sql("CREATE FUNCTION func1 AS
'org.apache.spark.data.bricks'").collect()
self.assertTrue(spark.catalog.functionExists("func1"))
self.assertTrue(spark.catalog.functionExists("default.func1"))
self.assertTrue(spark.catalog.functionExists("spark_catalog.default.func1"))
@@ -244,7 +252,7 @@ class CatalogTests(ReusedSQLTestCase):
def test_get_function(self):
spark = self.spark
with self.function("func1"):
- spark.sql("CREATE FUNCTION func1 AS
'org.apache.spark.data.bricks'")
+ spark.sql("CREATE FUNCTION func1 AS
'org.apache.spark.data.bricks'").collect()
func1 = spark.catalog.getFunction("spark_catalog.default.func1")
self.assertTrue(func1.name == "func1")
self.assertTrue(func1.namespace == ["default"])
@@ -257,12 +265,12 @@ class CatalogTests(ReusedSQLTestCase):
spark = self.spark
with self.database("some_db"):
- spark.sql("CREATE DATABASE some_db")
+ spark.sql("CREATE DATABASE some_db").collect()
with self.table("tab1", "some_db.tab2"):
- spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING
parquet")
+ spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING
parquet").collect()
spark.sql(
"CREATE TABLE some_db.tab2 (nickname STRING, tolerance
FLOAT) USING parquet"
- )
+ ).collect()
columns = sorted(
spark.catalog.listColumns("spark_catalog.default.tab1"),
key=lambda c: c.name
)
@@ -319,11 +327,9 @@ class CatalogTests(ReusedSQLTestCase):
isBucket=False,
),
)
+ self.assertRaisesRegex(Exception, "tab2", lambda:
spark.catalog.listColumns("tab2"))
self.assertRaisesRegex(
- AnalysisException, "tab2", lambda:
spark.catalog.listColumns("tab2")
- )
- self.assertRaisesRegex(
- AnalysisException,
+ Exception,
"does_not_exist",
lambda: spark.catalog.listColumns("does_not_exist"),
)
@@ -331,9 +337,11 @@ class CatalogTests(ReusedSQLTestCase):
def test_table_cache(self):
spark = self.spark
with self.database("some_db"):
- spark.sql("CREATE DATABASE some_db")
+ spark.sql("CREATE DATABASE some_db").collect()
with self.table("tab1"):
- spark.sql("CREATE TABLE some_db.tab1 (name STRING, age INT)
USING parquet")
+ spark.sql(
+ "CREATE TABLE some_db.tab1 (name STRING, age INT) USING
parquet"
+ ).collect()
self.assertFalse(spark.catalog.isCached("some_db.tab1"))
self.assertFalse(spark.catalog.isCached("spark_catalog.some_db.tab1"))
spark.catalog.cacheTable("spark_catalog.some_db.tab1")
@@ -347,16 +355,18 @@ class CatalogTests(ReusedSQLTestCase):
# SPARK-36176: testing that table_exists returns correct boolean
spark = self.spark
with self.database("some_db"):
- spark.sql("CREATE DATABASE some_db")
+ spark.sql("CREATE DATABASE some_db").collect()
with self.table("tab1", "some_db.tab2"):
self.assertFalse(spark.catalog.tableExists("tab1"))
self.assertFalse(spark.catalog.tableExists("tab2", "some_db"))
- spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING
parquet")
+ spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING
parquet").collect()
self.assertTrue(spark.catalog.tableExists("tab1"))
self.assertTrue(spark.catalog.tableExists("default.tab1"))
self.assertTrue(spark.catalog.tableExists("spark_catalog.default.tab1"))
self.assertTrue(spark.catalog.tableExists("tab1", "default"))
- spark.sql("CREATE TABLE some_db.tab2 (name STRING, age INT)
USING parquet")
+ spark.sql(
+ "CREATE TABLE some_db.tab2 (name STRING, age INT) USING
parquet"
+ ).collect()
self.assertFalse(spark.catalog.tableExists("tab2"))
self.assertTrue(spark.catalog.tableExists("some_db.tab2"))
self.assertTrue(spark.catalog.tableExists("spark_catalog.some_db.tab2"))
@@ -365,9 +375,9 @@ class CatalogTests(ReusedSQLTestCase):
def test_get_table(self):
spark = self.spark
with self.database("some_db"):
- spark.sql("CREATE DATABASE some_db")
+ spark.sql("CREATE DATABASE some_db").collect()
with self.table("tab1"):
- spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING
parquet")
+ spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING
parquet").collect()
self.assertEqual(spark.catalog.getTable("tab1").database,
"default")
self.assertEqual(spark.catalog.getTable("default.tab1").catalog,
"spark_catalog")
self.assertEqual(spark.catalog.getTable("spark_catalog.default.tab1").name,
"tab1")
@@ -381,8 +391,8 @@ class CatalogTests(ReusedSQLTestCase):
with self.table("my_tab"):
spark.sql(
"CREATE TABLE my_tab (col STRING) USING TEXT LOCATION
'{}'".format(tmp_dir)
- )
- spark.sql("INSERT INTO my_tab SELECT 'abc'")
+ ).collect()
+ spark.sql("INSERT INTO my_tab SELECT 'abc'").collect()
spark.catalog.cacheTable("my_tab")
self.assertEqual(spark.table("my_tab").count(), 1)
@@ -393,6 +403,10 @@ class CatalogTests(ReusedSQLTestCase):
self.assertEqual(spark.table("my_tab").count(), 0)
+class CatalogTests(ReusedSQLTestCase):
+ pass
+
+
if __name__ == "__main__":
import unittest
from pyspark.sql.tests.test_catalog import * # noqa: F401
diff --git a/python/pyspark/testing/sqlutils.py
b/python/pyspark/testing/sqlutils.py
index 92fcd08091c..b5ad7b64f91 100644
--- a/python/pyspark/testing/sqlutils.py
+++ b/python/pyspark/testing/sqlutils.py
@@ -202,7 +202,7 @@ class SQLTestUtils:
yield
finally:
for db in databases:
- self.spark.sql("DROP DATABASE IF EXISTS %s CASCADE" % db)
+ self.spark.sql("DROP DATABASE IF EXISTS %s CASCADE" %
db).collect()
self.spark.catalog.setCurrentDatabase("default")
@contextmanager
@@ -217,7 +217,7 @@ class SQLTestUtils:
yield
finally:
for t in tables:
- self.spark.sql("DROP TABLE IF EXISTS %s" % t)
+ self.spark.sql("DROP TABLE IF EXISTS %s" % t).collect()
@contextmanager
def tempView(self, *views):
@@ -245,7 +245,7 @@ class SQLTestUtils:
yield
finally:
for f in functions:
- self.spark.sql("DROP FUNCTION IF EXISTS %s" % f)
+ self.spark.sql("DROP FUNCTION IF EXISTS %s" % f).collect()
@staticmethod
def assert_close(a, b):
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]